Tag Archives: Amazon Athena

Simplify operational data processing in data lakes using AWS Glue and Apache Hudi

Post Syndicated from Ravi Itha original https://aws.amazon.com/blogs/big-data/simplify-operational-data-processing-in-data-lakes-using-aws-glue-and-apache-hudi/

The Analytics specialty practice of AWS Professional Services (AWS ProServe) helps customers across the globe with modern data architecture implementations on the AWS Cloud. A modern data architecture is an evolutionary architecture pattern designed to integrate a data lake, data warehouse, and purpose-built stores with a unified governance model. It focuses on defining standards and patterns to integrate data producers and consumers and move data between data lakes and purpose-built data stores securely and efficiently. Out of the many data producer systems that feed data to a data lake, operational databases are most prevalent, where operational data is stored, transformed, analyzed, and finally used to enhance business operations of an organization. With the emergence of open storage formats such as Apache Hudi and its native support from AWS Glue for Apache Spark, many AWS customers have started adding transactional and incremental data processing capabilities to their data lakes.

AWS has invested in native service integration with Apache Hudi and published technical contents to enable you to use Apache Hudi with AWS Glue (for example, refer to Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 1: Getting Started). In AWS ProServe-led customer engagements, the use cases we work on usually come with technical complexity and scalability requirements. In this post, we discuss a common use case in relation to operational data processing and the solution we built using Apache Hudi and AWS Glue.

Use case overview

AnyCompany Travel and Hospitality wanted to build a data processing framework to seamlessly ingest and process data coming from operational databases (used by reservation and booking systems) in a data lake before applying machine learning (ML) techniques to provide a personalized experience to its users. Due to the sheer volume of direct and indirect sales channels the company has, its booking and promotions data are organized in hundreds of operational databases with thousands of tables. Of those tables, some are larger (such as in terms of record volume) than others, and some are updated more frequently than others. In the data lake, the data to be organized in the following storage zones:

  1. Source-aligned datasets – These have an identical structure to their counterparts at the source
  2. Aggregated datasets – These datasets are created based on one or more source-aligned datasets
  3. Consumer-aligned datasets – These are derived from a combination of source-aligned, aggregated, and reference datasets enriched with relevant business and transformation logics, usually fed as inputs to ML pipelines or any consumer applications

The following are the data ingestion and processing requirements:

  1. Replicate data from operational databases to the data lake, including insert, update, and delete operations.
  2. Keep the source-aligned datasets up to date (typically within the range of 10 minutes to a day) in relation to their counterparts in the operational databases, ensuring analytics pipelines refresh consumer-aligned datasets for downstream ML pipelines in a timely fashion. Moreover, the framework should consume compute resources as optimally as possible per the size of the operational tables.
  3. To minimize DevOps and operational overhead, the company wanted to templatize the source code wherever possible. For example, to create source-aligned datasets in the data lake for 3,000 operational tables, the company didn’t want to deploy 3,000 separate data processing jobs. The smaller the number of jobs and scripts, the better.
  4. The company wanted the ability to continue processing operational data in the secondary Region in the rare event of primary Region failure.

As you can guess, the Apache Hudi framework can solve the first requirement. Therefore, we will put our emphasis on the other requirements. We begin with a Data lake reference architecture followed by an overview of operational data processing framework. By showing you our open-source solution on GitHub, we delve into framework components and walk through their design and implementation aspects. Finally, by testing the framework, we summarize how it meets the aforementioned requirements.

Data lake reference architecture

Let’s begin with a big picture: a data lake solves a variety of analytics and ML use cases dealing with internal and external data producers and consumers. The following diagram represents a generic data lake architecture. To ingest data from operational databases to an Amazon Simple Storage Service (Amazon S3) staging bucket of the data lake, either AWS Database Migration Service (AWS DMS) or any AWS partner solution from AWS Marketplace that has support for change data capture (CDC) can fulfill the requirement. AWS Glue is used to create source-aligned and consumer-aligned datasets and separate AWS Glue jobs to do feature engineering part of ML engineering and operations. Amazon Athena is used for interactive querying and AWS Lake Formation is used for access controls.

Data Lake Reference Architecture

Operational data processing framework

The operational data processing (ODP) framework contains three components: File Manager, File Processor, and Configuration Manager. Each component runs independently to solve a portion of the operational data processing use case. We have open-sourced this framework on GitHub—you can clone the code repo and inspect it while we walk you through the design and implementation of the framework components. The source code is organized in three folders, one for each component, and if you customize and adopt this framework for your use case, we recommend promoting these folders as separate code repositories in your version control system. Consider using the following repository names:

  1. aws-glue-hudi-odp-framework-file-manager
  2. aws-glue-hudi-odp-framework-file-processor
  3. aws-glue-hudi-odp-framework-config-manager

With this modular approach, you can independently deploy the components to your data lake environment by following your preferred CI/CD processes. As illustrated in the preceding diagram, these components are deployed in conjunction with a CDC solution.

Component 1: File Manager

File Manager detects files emitted by a CDC process such as AWS DMS and tracks them in an Amazon DynamoDB table. As shown in the following diagram, it consists of an Amazon EventBridge event rule, an Amazon Simple Queue Service (Amazon SQS) queue, an AWS Lambda function, and a DynamoDB table. The EventBridge rule uses Amazon S3 Event Notifications to detect the arrival of CDC files in the S3 bucket. The event rule forwards the object event notifications to the SQS queue as messages. The File Manager Lambda function consumes those messages, parses the metadata, and inserts the metadata to the DynamoDB table odpf_file_tracker. These records will then be processed by File Processor, which we discuss in the next section.

ODPF Component: File Manager

Component 2: File Processor

File Processor is the workhorse of the ODP framework. It processes files from the S3 staging bucket, creates source-aligned datasets in the raw S3 bucket, and adds or updates metadata for the datasets (AWS Glue tables) in the AWS Glue Data Catalog.

We use the following terminology when discussing File Processor:

  1. Refresh cadence – This represents the data ingestion frequency (for example, 10 minutes). It usually goes with AWS Glue worker type (one of G.1X, G.2X, G.4X, G.8X, G.025X, and so on) and batch size.
  2. Table configuration – This includes the Hudi configuration (primary key, partition key, pre-combined key, and table type (Copy on Write or Merge on Read)), table data storage mode (historical or current snapshot), S3 bucket used to store source-aligned datasets, AWS Glue database name, AWS Glue table name, and refresh cadence.
  3. Batch size – This numeric value is used to split tables into smaller batches and process their respective CDC files in parallel. For example, a configuration of 50 tables with a 10-minute refresh cadence and a batch size of 5 results in a total of 10 AWS Glue job runs, each processing CDC files for 5 tables.
  4. Table data storage mode – There are two options:
    • Historical – This table in the data lake stores historical updates to records (always append).
    • Current snapshot – This table in the data lake stores latest versioned records (upserts) with the ability to use Hudi time travel for historical updates.
  5. File processing state machine – It processes CDC files that belong to tables that share a common refresh cadence.
  6. EventBridge rule association with the file processing state machine – We use a dedicated EventBridge rule for each refresh cadence with the file processing state machine as target.
  7. File processing AWS Glue job – This is a configuration-driven AWS Glue extract, transform, and load (ETL) job that processes CDC files for one or more tables.

File Processor is implemented as a state machine using AWS Step Functions. Let’s use an example to understand this. The following diagram illustrates running File Processor state machine with a configuration that includes 18 operational tables, a refresh cadence of 10 minutes, a batch size of 5, and an AWS Glue worker type of G.1X.

ODP framework component: File Processor

The workflow includes the following steps:

  1. The EventBridge rule triggers the File Processor state machine every 10 minutes.
  2. Being the first state in the state machine, the Batch Manager Lambda function reads configurations from DynamoDB tables.
  3. The Lambda function creates four batches: three of them will be mapped to five operational tables each, and the fourth one is mapped to three operational tables. Then it feeds the batches to the Step Functions Map state.
  4. For each item in the Map state, the File Processor Trigger Lambda function will be invoked, which in turn runs the File Processor AWS Glue job.
  5. Each AWS Glue job performs the following actions:
    • Checks the status of an operational table and acquires a lock when it is not processed by any other job. The odpf_file_processing_tracker DynamoDB table is used for this purpose. When a lock is acquired, it inserts a record in the DynamoDB table with the status updating_table for the first time; otherwise, it updates the record.
    • Processes the CDC files for the given operational table from the S3 staging bucket and creates a source-aligned dataset in the S3 raw bucket. It also updates technical metadata in the AWS Glue Data Catalog.
    • Updates the status of the operational table to completed in the odpf_file_processing_tracker table. In case of processing errors, it updates the status to refresh_error and logs the stack trace.
    • It also inserts this record into the odpf_file_processing_tracker_history DynamoDB table along with additional details such as insert, update, and delete row counts.
    • Moves the records that belong to successfully processed CDC files from odpf_file_tracker to the odpf_file_tracker_history table with file_ingestion_status set to raw_file_processed.
    • Moves to the next operational table in the given batch.
    • Note: a failure to process CDC files for one of the operational tables of a given batch does not impact the processing of other operational tables.

Component 3: Configuration Manager

Configuration Manager is used to insert configuration details to the odpf_batch_config and odpf_raw_table_config tables. To keep this post concise, we provide two architecture patterns in the code repo and leave the implementation details to you.

Solution overview

Let’s test the ODP framework by replicating data from 18 operational tables to a data lake and creating source-aligned datasets with 10-minute refresh cadence. We use Amazon Relational Database Service (Amazon RDS) for MySQL to set up an operational database with 18 tables, upload the New York City Taxi – Yellow Trip Data dataset, set up AWS DMS to replicate data to Amazon S3, process the files using the framework, and finally validate the data using Amazon Athena.

Create S3 buckets

For instructions on creating an S3 bucket, refer to Creating a bucket. For this post, we create the following buckets:

  1. odpf-demo-staging-EXAMPLE-BUCKET – You will use this to migrate operational data using AWS DMS
  2. odpf-demo-raw-EXAMPLE-BUCKET – You will use this to store source-aligned datasets
  3. odpf-demo-code-artifacts-EXAMPLE-BUCKET – You will use this to store code artifacts

Deploy File Manager and File Processor

Deploy File Manager and File Processor by following instructions from this README and this README, respectively.

Set up Amazon RDS for MySQL

Complete the following steps to set up Amazon RDS for MySQL as the operational data source:

  1. Provision Amazon RDS for MySQL. For instructions, refer to Create and Connect to a MySQL Database with Amazon RDS.
  2. Connect to the database instance using MySQL Workbench or DBeaver.
  3. Create a database (schema) by running the SQL command CREATE DATABASE taxi_trips;.
  4. Create 18 tables by running the SQL commands in the ops_table_sample_ddl.sql script.

Populate data to the operational data source

Complete the following steps to populate data to the operational data source:

  1. To download the New York City Taxi – Yellow Trip Data dataset for January 2021 (Parquet file), navigate to NYC TLC Trip Record Data, expand 2021, and choose Yellow Taxi Trip records. A file called yellow_tripdata_2021-01.parquet will be downloaded to your computer.
  2. On the Amazon S3 console, open the bucket odpf-demo-staging-EXAMPLE-BUCKET and create a folder called nyc_yellow_trip_data.
  3. Upload the yellow_tripdata_2021-01.parquet file to the folder.
  4. Navigate to the bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET and create a folder called glue_scripts.
  5. Download the file load_nyc_taxi_data_to_rds_mysql.py from the GitHub repo and upload it to the folder.
  6. Create an AWS Identity and Access Management (IAM) policy called load_nyc_taxi_data_to_rds_mysql_s3_policy. For instructions, refer to Creating policies using the JSON editor. Use the odpf_setup_test_data_glue_job_s3_policy.json policy definition.
  7. Create an IAM role called load_nyc_taxi_data_to_rds_mysql_glue_role. Attach the policy created in the previous step.
  8. On the AWS Glue console, create a connection for Amazon RDS for MySQL. For instructions, refer to Adding a JDBC connection using your own JDBC drivers and Setting up a VPC to connect to Amazon RDS data stores over JDBC for AWS Glue. Name the connection as odpf_demo_rds_connection.
  9. In the navigation pane of the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  10. Choose the file load_nyc_taxi_data_to_rds_mysql.py and choose Create.
  11. Complete the following steps to create your job:
    • Provide a name for the job, such as load_nyc_taxi_data_to_rds_mysql.
    • For IAM role, choose load_nyc_taxi_data_to_rds_mysql_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Advanced properties, Connections, select the connection you created earlier.
    • Under Job parameters, add the following parameters:
      • input_sample_data_path = s3://odpf-demo-staging-EXAMPLE-BUCKET/nyc_yellow_trip_data/yellow_tripdata_2021-01.parquet
      • schema_name = taxi_trips
      • table_name = table_1
      • rds_connection_name = odpf_demo_rds_connection
    • Choose Save.
  12. On the Actions menu, run the job.
  13. Go back to your MySQL Workbench or DBeaver and validate the record count by running the SQL command select count(1) row_count from taxi_trips.table_1. You will get an output of 1369769.
  14. Populate the remaining 17 tables by running the SQL commands from the populate_17_ops_tables_rds_mysql.sql script.
  15. Get the row count from the 18 tables by running the SQL commands from the ops_data_validation_query_rds_mysql.sql script. The following screenshot shows the output.
    Record volumes (for 18 Tables) in Operational Database

Configure DynamoDB tables

Complete the following steps to configure the DynamoDB tables:

  1. Download file load_ops_table_configs_to_ddb.py from the GitHub repo and upload it to the folder glue_scripts in the S3 bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET.
  2. Create an IAM policy called load_ops_table_configs_to_ddb_ddb_policy. Use the odpf_setup_test_data_glue_job_ddb_policy.json policy definition.
  3. Create an IAM role called load_ops_table_configs_to_ddb_glue_role. Attach the policy created in the previous step.
  4. On the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  5. Choose the file load_ops_table_configs_to_ddb.py and choose Create.
  6. Complete the following steps to create a job:
    • Provide a name, such as load_ops_table_configs_to_ddb.
    • For IAM role, choose load_ops_table_configs_to_ddb_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Job parameters, add the following parameters
      • batch_config_ddb_table_name = odpf_batch_config
      • raw_table_config_ddb_table_name = odpf_demo_taxi_trips_raw
      • aws_region = e.g., us-west-1
    • Choose Save.
  7. On the Actions menu, run the job.
  8. On the DynamoDB console, get the item count from the tables. You will find 1 item in the odpf_batch_config table and 18 items in the odpf_demo_taxi_trips_raw table.

Set up a database in AWS Glue

Complete the following steps to create a database:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Databases.
  2. Create a database called odpf_demo_taxi_trips_raw.

Set up AWS DMS for CDC

Complete the following steps to set up AWS DMS for CDC:

  1. Create an AWS DMS replication instance. For Instance class, choose dms.t3.medium.
  2. Create a source endpoint for Amazon RDS for MySQL.
  3. Create target endpoint for Amazon S3. To configure the S3 endpoint settings, use the JSON definition from dms_s3_endpoint_setting.json.
  4. Create an AWS DMS task.
    • Use the source and target endpoints created in the previous steps.
    • To create AWS DMS task mapping rules, use the JSON definition from dms_task_mapping_rules.json.
    • Under Migration task startup configuration, select Automatically on create.
  5. When the AWS DMS task starts running, you will see a task summary similar to the following screenshot.
    DMS Task Summary
  6. In the Table statistics section, you will see an output similar to the following screenshot. Here, the Full load rows and Total rows columns are important metrics whose counts should match with the record volumes of the 18 tables in the operational data source.
    DMS Task Statistics
  7. As a result of successful full load completion, you will find Parquet files in the S3 staging bucket—one Parquet file per table in a dedicated folder, similar to the following screenshot. Similarly, you will find 17 such folders in the bucket.
    DMS Output in S3 Staging Bucket for Table 1

File Manager output

The File Manager Lambda function consumes messages from the SQS queue, extracts metadata for the CDC files, and inserts one item per file to the odpf_file_tracker DynamoDB table. When you check the items, you will find 18 items with file_ingestion_status set to raw_file_landed, as shown in the following screenshot.

CDC Files in File Tracker DynamoDB Table

File Processor output

  1. On the subsequent tenth minute (since the activation of the EventBridge rule), the event rule triggers the File Processor state machine. On the Step Functions console, you will notice that the state machine is invoked, as shown in the following screenshot.
    File Processor State Machine Run Summary
  2. As shown in the following screenshot, the Batch Generator Lambda function creates four batches and constructs a Map state for parallel running of the File Processor Trigger Lambda function.
    File Processor State Machine Run Details
  3. Then, the File Processor Trigger Lambda function runs the File Processor Glue Job, as shown in the following screenshot.
    File Processor Glue Job Parallel Runs
  4. Then, you will notice that the File Processor Glue Job runs create source-aligned datasets in Hudi format in the S3 raw bucket. For Table 1, you will see an output similar to the following screenshot. There will be 17 such folders in the S3 raw bucket.
    Data in S3 raw bucket
  5. Finally, in AWS Glue Data Catalog, you will notice 18 tables created in the odpf_demo_taxi_trips_raw database, similar to the following screenshot.
    Tables in Glue Database

Data validation

Complete the following steps to validate the data:

  1. On the Amazon Athena console, open the query editor, and select a workgroup or create a new workgroup.
  2. Choose AwsDataCatalog for Data source and odpf_demo_taxi_trips_raw for Database.
  3. Run the raw_data_validation_query_athena.sql SQL query. You will get an output similar to the following screenshot.
    Raw Data Validation via Amazon Athena

Validation summary: The counts in Amazon Athena match with the counts of the operational tables and it proves that the ODP framework has processed all the files and records successfully. This concludes the demo. To test additional scenarios, refer to Extended Testing in the code repo.


Let’s review how the ODP framework addressed the aforementioned requirements.

  1. As discussed earlier in this post, by logically grouping tables by refresh cadence and associating them to EventBridge rules, we ensured that the source-aligned tables are refreshed by the File Processor AWS Glue jobs. With the AWS Glue worker type configuration setting, we selected the appropriate compute resources while running the AWS Glue jobs (the instances of the AWS Glue job).
  2. By applying table-specific configurations (from odpf_batch_config and odpf_raw_table_config) dynamically, we were able to use one AWS Glue job to process CDC files for 18 tables.
  3. You can use this framework to support a variety of data migration use cases that require quicker data migration from on-premises storage systems to data lakes or analytics platforms on AWS. You can reuse File Manager as is and customize File Processor to work with other storage frameworks such as Apache Iceberg, Delta Lake, and purpose-built data stores such as Amazon Aurora and Amazon Redshift.
  4. To understand how the ODP framework met the company’s disaster recovery (DR) design criterion, we first need to understand the DR architecture strategy at a high level. The DR architecture strategy has the following aspects:
    • One AWS account and two AWS Regions are used for primary and secondary environments.
    • The data lake infrastructure in the secondary Region is kept in sync with the one in the primary Region.
    • Data is stored in S3 buckets, metadata data is stored in the AWS Glue Data Catalog, and access controls in Lake Formation are replicated from the primary to secondary Region.
    • The data lake source and target systems have their respective DR environments.
    • CI/CD tooling (version control, CI server, and so on) are to be made highly available.
    • The DevOps team needs to be able to deploy CI/CD pipelines of analytics frameworks (such as this ODP framework) to either the primary or secondary Region.
    • As you can imagine, disaster recovery on AWS is a vast subject, so we keep our discussion to the last design aspect.

By designing the ODP framework with three components and externalizing operational table configurations to DynamoDB global tables, the company was able to deploy the framework components to the secondary Region (in the rare event of a single-Region failure) and continue to process CDC files from the point it last processed in the primary Region. Because the CDC file tracking and processing audit data is replicated to the DynamoDB replica tables in the secondary Region, the File Manager microservice and File Processor can seamlessly run.

Clean up

When you’re finished testing this framework, you can delete the provisioned AWS resources to avoid any further charges.


In this post, we took a real-world operational data processing use case and presented you the framework we developed at AWS ProServe. We hope this post and the operational data processing framework using AWS Glue and Apache Hudi will expedite your journey in integrating operational databases into your modern data platforms built on AWS.

About the authors

Ravi-IthaRavi Itha is a Principal Consultant at AWS Professional Services with specialization in data and analytics and generalist background in application development. Ravi helps customers with enterprise data strategy initiatives across insurance, airlines, pharmaceutical, and financial services industries. In his 6-year tenure at Amazon, Ravi has helped the AWS builder community by publishing approximately 15 open-source solutions (accessible via GitHub handle), four blogs, and reference architectures. Outside of work, he is passionate about reading India Knowledge Systems and practicing Yoga Asanas.

srinivas-kandiSrinivas Kandi is a Data Architect at AWS Professional Services. He leads customer engagements related to data lakes, analytics, and data warehouse modernizations. He enjoys reading history and civilizations.

Securely process near-real-time data from Amazon MSK Serverless using an AWS Glue streaming ETL job with IAM authentication

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/securely-process-near-real-time-data-from-amazon-msk-serverless-using-an-aws-glue-streaming-etl-job-with-iam-authentication/

Streaming data has become an indispensable resource for organizations worldwide because it offers real-time insights that are crucial for data analytics. The escalating velocity and magnitude of collected data has created a demand for real-time analytics. This data originates from diverse sources, including social media, sensors, logs, and clickstreams, among others. With streaming data, organizations gain a competitive edge by promptly responding to real-time events and making well-informed decisions.

In streaming applications, a prevalent approach involves ingesting data through Apache Kafka and processing it with Apache Spark Structured Streaming. However, managing, integrating, and authenticating the processing framework (Apache Spark Structured Streaming) with the ingesting framework (Kafka) poses significant challenges, necessitating a managed and serverless framework. For example, integrating and authenticating a client like Spark streaming with Kafka brokers and zookeepers using a manual TLS method requires certificate and keystore management, which is not an easy task and requires a good knowledge of TLS setup.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. In this post, we use Amazon MSK Serverless, a cluster type for Amazon MSK that makes it possible for you to run Apache Kafka without having to manage and scale cluster capacity. To further enhance security and streamline authentication and authorization processes, MSK Serverless enables you to handle both authentication and authorization using AWS Identity and Access Management (IAM) in your cluster. This integration eliminates the need for separate mechanisms for authentication and authorization, simplifying and strengthening data protection. For example, when a client tries to write to your cluster, MSK Serverless uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

To process data effectively, we use AWS Glue, a serverless data integration service that uses the Spark Structured Streaming framework and enables near-real-time data processing. An AWS Glue streaming job can handle large volumes of incoming data from MSK Serverless with IAM authentication. This powerful combination ensures that data is processed securely and swiftly.

The post demonstrates how to build an end-to-end implementation to process data from MSK Serverless using an AWS Glue streaming extract, transform, and load (ETL) job with IAM authentication to connect MSK Serverless from the AWS Glue job and query the data using Amazon Athena.

Solution overview

The following diagram illustrates the architecture that you implement in this post.

The workflow consists of the following steps:

  1. Create an MSK Serverless cluster with IAM authentication and an EC2 Kafka client as the producer to ingest sample data into a Kafka topic. For this post, we use the kafka-console-producer.sh Kafka console producer client.
  2. Set up an AWS Glue streaming ETL job to process the incoming data. This job extracts data from the Kafka topic, loads it into Amazon Simple Storage Service (Amazon S3), and creates a table in the AWS Glue Data Catalog. By continuously consuming data from the Kafka topic, the ETL job ensures it remains synchronized with the latest streaming data. Moreover, the job incorporates the checkpointing functionality, which tracks the processed records, enabling it to resume processing seamlessly from the point of interruption in the event of a job run failure.
  3. Following the data processing, the streaming job stores data in Amazon S3 and generates a Data Catalog table. This table acts as a metadata layer for the data. To interact with the data stored in Amazon S3, you can use Athena, a serverless and interactive query service. Athena enables the run of SQL-like queries on the data, facilitating seamless exploration and analysis.

For this post, we create the solution resources in the us-east-1 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of ingestion and processing part according to your use case and if you have requirements to create specific process resources only.

  • vpc-mskserverless-client.yaml – This template sets up data the ingestion service resources such as a VPC, MSK Serverless cluster, and S3 bucket
  • gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database, connection, and streaming job

Create data ingestion resources

The vpc-mskserverless-client.yaml stack creates a VPC, private and public subnets, security groups, S3 VPC Endpoint, MSK Serverless cluster, EC2 instance with Kafka client, and S3 bucket. To create the solution resources for data ingestion, complete the following steps:

  1. Launch the stack vpc-mskserverless-client using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.
Parameters Description Sample Value
EnvironmentName Environment name that is prefixed to resource names .
PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone .
PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone .
PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone .
PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone .
VpcCIDR IP range (CIDR notation) for this VPC .
InstanceType Instance type for the EC2 instance t2.micro
LatestAmiId AMI used for the EC2 instance /aws/service/ami-amazon-linux- latest/amzn2-ami-hvm-x86_64-gp2
  1. When the stack creation is complete, retrieve the EC2 instance PublicDNS from the vpc-mskserverless-client stack’s Outputs tab.

The stack creation process can take around 15 minutes to complete.

  1. On the Amazon EC2 console, access the EC2 instance that you created using the CloudFormation template.
  2. Choose the EC2 instance whose InstanceId is shown on the stack’s Outputs tab.

Next, you log in to the EC2 instance using Session Manager, a capability of AWS Systems Manager.

  1. On the Amazon EC2 console, select the instanceid and on the Session Manager tab, choose Connect.

After you log in to the EC2 instance, you create a Kafka topic in the MSK Serverless cluster from the EC2 instance.

  1. In the following export command, provide the MSKBootstrapServers value from the vpc-mskserverless- client stack output for your endpoint:
    $ sudo su – ec2-user
    $ BS=<your-msk-serverless-endpoint (e.g.) boot-xxxxxx.yy.kafka-serverless.us-east-1.a>

  2. Run the following command on the EC2 instance to create a topic called msk-serverless-blog. The Kafka client is already installed in the ec2-user home directory (/home/ec2-user).
    $ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-topics.sh \
    --bootstrap-server $BS \
    --command-config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
    --create –topic msk-serverless-blog \
    --partitions 1
    Created topic msk-serverless-blog

After you confirm the topic creation, you can push the data to the MSK Serverless.

  1. Run the following command on the EC2 instance to create a console producer to produce records to the Kafka topic. (For source data, we use nycflights.csv downloaded at the ec2-user home directory /home/ec2-user.)
$ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-console-producer.sh \
--broker-list $BS \
--producer.config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
--topic msk-serverless-blog < nycflights.csv

Next, you set up the data processing service resources, specifically AWS Glue components like the database, table, and streaming job to process the data.

Create data processing resources

The gluejob-setup.yaml CloudFormation template creates a database, table, AWS Glue connection, and AWS Glue streaming job. Retrieve the values for VpcId, GluePrivateSubnet, GlueconnectionSubnetAZ, SecurityGroup, S3BucketForOutput, and S3BucketForGlueScript from the vpc-mskserverless-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup:

  1. Provide parameter values as listed in the following table.
Parameters Description Sample value
EnvironmentName Environment name that is prefixed to resource names. Gluejob-setup
VpcId ID of the VPC for security group. Use the VPC ID created with the first stack. Refer to the first stack’s output.
GluePrivateSubnet Private subnet used for creating the AWS Glue connection. Refer to the first stack’s output.
SecurityGroupForGlueConnection Security group used by the AWS Glue connection. Refer to the first stack’s output.
GlueconnectionSubnetAZ Availability Zone for the first private subnet used for the AWS Glue connection. .
GlueDataBaseName Name of the AWS Glue Data Catalog database. glue_kafka_blog_db
GlueTableName Name of the AWS Glue Data Catalog table. blog_kafka_tbl
S3BucketNameForScript Bucket Name for Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws-gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
GlueWorkerType Worker type for AWS Glue job. For example, G.1X. G.1X
NumberOfWorkers Number of workers in the AWS Glue job. 3
S3BucketNameForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
TopicName MSK topic name that needs to be processed. msk-serverless-blog
MSKBootstrapServers Kafka bootstrap server. boot-30vvr5lg.c1.kafka-serverless.us- east-1.amazonaws.com:9098

The stack creation process can take around 1–2 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created a Kafka type AWS Glue connection, which consists of broker information like the MSK bootstrap server, topic name, and VPC in which the MSK Serverless cluster is created. Most importantly, it specifies the IAM authentication option, which helps AWS Glue authenticate and authorize using IAM authentication while consuming the data from the MSK topic. For further clarity, you can examine the AWS Glue connection and the associated AWS Glue table generated through AWS CloudFormation.

After successfully creating the CloudFormation stack, you can now proceed with processing data using the AWS Glue streaming job with IAM authentication.

Run the AWS Glue streaming job

To process the data from the MSK topic using the AWS Glue streaming job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue streaming job from the GlueJobName row. In the following screenshot, the name is GlueStreamingJob-glue-streaming-job.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue streaming job named GlueStreamingJob-glue-streaming-job.
  3. Choose the job name to open its details page.
  4. Choose Run to start the job.
  5. On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template outputs.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

  1. On the AWS Glue console, choose the AWS Glue streaming job you ran, then choose Stop job run.

Because this is a streaming job, it will continue to run indefinitely until manually stopped. After you verify the data is present in the S3 output bucket, you can stop the job to save cost.

Validate the data in Athena

After the AWS Glue streaming job has successfully created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database and table that the AWS Glue streaming job created.
  4. To validate the data, run the following query to find the flight number, origin, and destination that covered the highest distance in a year:
SELECT distinct(flight),distance,origin,dest,year from "glue_kafka_blog_db"."output" where distance= (select MAX(distance) from "glue_kafka_blog_db"."output")

The following screenshot shows the output of our example query.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-mskserverless-client.


In this post, we demonstrated a use case for building a serverless ETL pipeline for streaming with IAM authentication, which allows you to focus on the outcomes of your analytics. You can also modify the AWS Glue streaming ETL code in this post with transformations and mappings to ensure that only valid data gets loaded to Amazon S3. This solution enables you to harness the prowess of AWS Glue streaming, seamlessly integrated with MSK Serverless through the IAM authentication method. It’s time to act and revolutionize your streaming processes.


This section provides more information about how to create the AWS Glue connection on the AWS Glue console, which helps establish the connection to the MSK Serverless cluster and allow the AWS Glue streaming job to authenticate and authorize using IAM authentication while consuming the data from the MSK topic.

  1. On the AWS Glue console, in the navigation pane, under Data catalog, choose Connections.
  2. Choose Create connection.
  3. For Connection name, enter a unique name for your connection.
  4. For Connection type, choose Kafka.
  5. For Connection access, select Amazon managed streaming for Apache Kafka (MSK).
  6. For Kafka bootstrap server URLs, enter a comma-separated list of bootstrap server URLs. Include the port number. For example, boot-xxxxxxxx.c2.kafka-serverless.us-east- 1.amazonaws.com:9098.

  1. For Authentication, choose IAM Authentication.
  2. Select Require SSL connection.
  3. For VPC, choose the VPC that contains your data source.
  4. For Subnet, choose the private subnet within your VPC.
  5. For Security groups, choose a security group to allow access to the data store in your VPC subnet.

Security groups are associated to the ENI attached to your subnet. You must choose at least one security group with a self-referencing inbound rule for all TCP ports.

  1. Choose Save changes.

After you create the AWS Glue connection, you can use the AWS Glue streaming job to consume data from the MSK topic using IAM authentication.

About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specialized in AWS Glue and Amazon Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS with a specialization in AWS Glue. He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS.

Extracting key insights from Amazon S3 access logs with AWS Glue for Ray

Post Syndicated from Cristiane de Melo original https://aws.amazon.com/blogs/big-data/extracting-key-insights-from-amazon-s3-access-logs-with-aws-glue-for-ray/

Customers of all sizes and industries use Amazon Simple Storage Service (Amazon S3) to store data globally for a variety of use cases. Customers want to know how their data is being accessed, when it is being accessed, and who is accessing it. With exponential growth in data volume, centralized monitoring becomes challenging. It is also crucial to audit granular data access for security and compliance needs.

This blog post presents an architecture solution that allows customers to extract key insights from Amazon S3 access logs at scale. We will partition and format the server access logs with Amazon Web Services (AWS) Glue, a serverless data integration service, to generate a catalog for access logs and create dashboards for insights.

Amazon S3 access logs

Amazon S3 access logs monitor and log Amazon S3 API requests made to your buckets. These logs can track activity, such as data access patterns, lifecycle and management activity, and security events. For example, server access logs could answer a financial organization’s question about how many requests are made and who is making what type of requests. Amazon S3 access logs provide object-level visibility and incur no additional cost besides storage of logs. They store attributes such as object size, total time, turn-around time, and HTTP referer for log records. For more details on the server access log file format, delivery, and schema, see Logging requests using server access logging and Amazon S3 server access log format.

Key considerations when using Amazon S3 access logs:

  1. Amazon S3 delivers server access log records on a best-effort basis. Amazon S3 does not guarantee the completeness and timeliness of them, although delivery of most log records is within a few hours of the recorded time.
  2. A log file delivered at a specific time can contain records written at any point before that time. A log file may not capture all log records for requests made up to that point.
  3. Amazon S3 access logs provide small unpartitioned files stored as space-separated, newline-delimited records. They can be queried using Amazon Athena, but this solution poses high latency and increased query cost for customers generating logs in petabyte scale. Top 10 Performance Tuning Tips for Amazon Athena include converting the data to a columnar format like Apache Parquet and partitioning the data in Amazon S3.
  4. Amazon S3 listing can become a bottleneck even if you use a prefix, particularly with billions of objects. Amazon S3 uses the following object key format for log files:

TargetPrefix is optional and makes it simpler for you to locate the log objects. We use the YYYY-mm-DD-HH format to generate a manifest of logs matching a specific prefix.

Architecture overview

The following diagram illustrates the solution architecture. The solution uses AWS Serverless Analytics services such as AWS Glue to optimize data layout by partitioning and formatting the server access logs to be consumed by other services. We catalog the partitioned server access logs from multiple Regions. Using Amazon Athena and Amazon QuickSight, we query and create dashboards for insights.

Architecture Diagram

As a first step, enable server access logging on S3 buckets. Amazon S3 recommends delivering logs to a separate bucket to avoid an infinite loop of logs. Both the user data and logs buckets must be in the same AWS Region and owned by the same account.

AWS Glue for Ray, a data integration engine option on AWS Glue, is now generally available. It combines AWS Glue’s serverless data integration with Ray (ray.io), a popular new open-source compute framework that helps you scale Python workloads. The Glue for Ray job will partition and store the logs in parquet format. The Ray script also contains checkpointing logic to avoid re-listing, duplicate processing, and missing logs. The job stores the partitioned logs in a separate bucket for simplicity and scalability.

The AWS Glue Data Catalog is a metastore of the location, schema, and runtime metrics of your data. AWS Glue Data Catalog stores information as metadata tables, where each table specifies a single data store. The AWS Glue crawler writes metadata to the Data Catalog by classifying the data to determine the format, schema, and associated properties of the data. Running the crawler on a schedule updates AWS Glue Data Catalog with new partitions and metadata.

Amazon Athena provides a simplified, flexible way to analyze petabytes of data where it lives. We can query partitioned logs directly in Amazon S3 using standard SQL. Athena uses AWS Glue Data Catalog metadata like databases, tables, partitions, and columns under the hood. AWS Glue Data Catalog is a cross-Region metadata store that helps Athena query logs across multiple Regions and provide consolidated results.

Amazon QuickSight enables organizations to build visualizations, perform case-by-case analysis, and quickly get business insights from their data anytime, on any device. You can use other business intelligence (BI) tools that integrate with Athena to build dashboards and share or publish them to provide timely insights.

Technical architecture implementation

This section explains how to process Amazon S3 access logs and visualize Amazon S3 metrics with QuickSight.

Before you begin

There are a few prerequisites before you get started:

  1. Create an IAM role to use with AWS Glue. For more information, see Create an IAM Role for AWS Glue in the AWS Glue documentation.
  2. Ensure that you have access to Athena from your account.
  3. Enable access logging on an S3 bucket. For more information, see How to Enable Server Access Logging in the Amazon S3 documentation.

Run AWS Glue for Ray job

The following screenshots guide you through creating a Ray job on Glue console. Create an ETL job with Ray engine with the sample Ray script provided. In the Job details tab, select an IAM role.

Create AWS Glue job

AWS Glue job details

Pass required arguments and any optional arguments with `--{arg}` in the job parameters.

AWS Glue job parameters

Save and run the job. In the Runs tab, you can select the current execution and view the logs using the Log group name and Id (Job Run Id). You can also graph job run metrics from the CloudWatch metrics console.

CloudWatch metrics console

Alternatively, you can select a frequency to schedule the job run.

AWS Glue job run schedule

Note: Schedule frequency depends on your data latency requirement.

On a successful run, the Ray job writes partitioned log files to the output Amazon S3 location. Now we run an AWS Glue crawler to catalog the partitioned files.

Create an AWS Glue crawler with the partitioned logs bucket as the data source and schedule it to capture the new partitions. Alternatively, you can configure the crawler to run based on Amazon S3 events. Using Amazon S3 events improves the re-crawl time to identify the changes between two crawls by listing all the files from a partition instead of listing the full S3 bucket.

AWS Glue Crawler

You can view the AWS Glue Data Catalog table via the Athena console and run queries using standard SQL. The Athena console displays the Run time and Data scanned metrics. In the following screenshots below, you will see how partitioning improves performance by reducing the amount of data scanned.

There are significant wins when we partition and format server access logs as parquet. Compared to the unpartitioned raw logs, the Athena queries 1/scanned 99.9 percent less data, and 2/ran 92 percent faster. This is evident from the following Athena SQL queries, which are similar but on unpartitioned and partitioned server access logs respectively.

SELECT “operation”, “requestdatetime”
FROM “s3_access_logs_db”.”unpartitioned_sal”
GROUP BY “requestdatetime”, “operation”

Amazon Athena query

Note: You can create a table schema on raw server access logs by following the directions at How do I analyze my Amazon S3 server access logs using Athena?

SELECT “operation”, “requestdate”, “requesthour” 
FROM “s3_access_logs_db”.”partitioned_sal” 
GROUP BY “requestdate”, “requesthour”, “operation”

Amazon Athena query

You can run queries on Athena or build dashboards with a BI tool that integrates with Athena. We built the following sample dashboard in Amazon QuickSight to provide insights from the Amazon S3 access logs. For additional information, see Visualize with QuickSight using Athena.

Amazon QuickSight dashboard

Clean up

Delete all the resources to avoid any unintended costs.

  1. Disable the access log on the source bucket.
  2. Disable the scheduled AWS Glue job run.
  3. Delete the AWS Glue Data Catalog tables and QuickSight dashboards.

Why we considered AWS Glue for Ray

AWS Glue for Ray offers scalable Python-native distributed compute framework combined with AWS Glue’s serverless data integration. The primary reason for using the Ray engine in this solution is its flexibility with task distribution. With the Amazon S3 access logs, the largest challenge in processing them at scale is the object count rather than the data volume. This is because they are stored in a single, flat prefix that can contain hundreds of millions of objects for larger customers. In this unusual edge case, the Amazon S3 listing in Spark takes most of the job’s runtime. The object count is also large enough that most Spark drivers will run out of memory during listing.

In our test bed with 470 GB (1,544,692 objects) of access logs, large Spark drivers using AWS Glue’s G.8X worker type (32 vCPU, 128 GB memory, and 512 GB disk) ran out of memory. Using Ray tasks to distribute Amazon S3 listing dramatically reduced the time to list the objects. It also kept the list in Ray’s distributed object store preventing out-of-memory failures when scaling. The distributed lister combined with Ray data and map_batches to apply a pandas function against each block of data resulted in a highly parallel and performant execution across all stages of the process. With Ray engine, we successfully processed the logs in ~9 minutes. Using Ray reduces the server access logs processing cost, adding to the reduced Athena query cost.

Ray job run details:

Ray job logs

Ray job run details

Please feel free to download the script and test this solution in your development environment. You can add additional transformations in Ray to better prepare your data for analysis.


In this blog post, we detailed a solution to visualize and monitor Amazon S3 access logs at scale using Athena and QuickSight. It highlights a way to scale the solution by partitioning and formatting the logs using AWS Glue for Ray. To learn how to work with Ray jobs in AWS Glue, see Working with Ray jobs in AWS Glue. To learn how to accelerate your Athena queries, see Reusing query results.

About the Authors

Cristiane de Melo is a Solutions Architect Manager at AWS based in Bay Area, CA. She brings 25+ years of experience driving technical pre-sales engagements and is responsible for delivering results to customers. Cris is passionate about working with customers, solving technical and business challenges, thriving on building and establishing long-term, strategic relationships with customers and partners.

Archana Inapudi is a Senior Solutions Architect at AWS supporting Strategic Customers. She has over a decade of experience helping customers design and build data analytics, and database solutions. She is passionate about using technology to provide value to customers and achieve business outcomes.

Nikita Sur is a Solutions Architect at AWS supporting a Strategic Customer. She is curious to learn new technologies to solve customer problems. She has a Master’s degree in Information Systems – Big Data Analytics and her passion is databases and analytics.

Zach Mitchell is a Sr. Big Data Architect. He works within the product team to enhance understanding between product engineers and their customers while guiding customers through their journey to develop their enterprise data architecture on AWS.

Implement a serverless CDC process with Apache Iceberg using Amazon DynamoDB and Amazon Athena

Post Syndicated from Vijay Velpula original https://aws.amazon.com/blogs/big-data/implement-a-serverless-cdc-process-with-apache-iceberg-using-amazon-dynamodb-and-amazon-athena/

Apache Iceberg is an open table format for very large analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon Simple Storage Service (Amazon S3). Iceberg also helps guarantee data correctness under concurrent write scenarios.

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. Change Data Capture (CDC) in the context of a data lake refers to the process of capturing and propagating changes made to source data. Source systems often lack the capability to publish data that is modified or changed. This requires data pipelines to consume full load datasets every day, increasing the data processing duration and also the storage cost. If the source is tabular format, then there are mechanisms to identify the data changes easily. However, the complexity increases if the data is in semi-structured format and propagating changes made to source data into the data lake in near-real-time.

This post presents a solution to handle incoming semi-structured datasets from source systems and effectively determine changed records and load them into Iceberg tables. With this approach, we will not only use Athena to query data source files in Amazon S3, but also achieve ACID compliance.

Solution overview

We demonstrate this solution with an end-to-end serverless CDC process. We use a sample JSON file as input to Amazon DynamoDB. We identify changed records by utilizing Amazon DynamoDB Streams and AWS Lambda to update the data lake with changed records. We then utilize an Iceberg table to demonstrate CDC functionality for a sample employee dataset. This data represents employee details such as name, address, date joined, and other fields.

The architecture is implemented as follows:

  1. Source systems ingest a semi-structured (JSON) dataset into a DynamoDB table.
  2. The DynamoDB table stores the semi-structured dataset, and these tables have DynamoDB Streams enabled. DynamoDB Streams helps identify if the incoming data is new, modified, or deleted based on the keys defined and delivers the ordered messages to a Lambda function.
  3. For every stream, the Lambda function parses the stream and builds the dynamic DML SQL statements.
  4. The constructed DML SQL statements are run on the corresponding Iceberg tables to reflect the changes.

The following diagram illustrates this workflow.


Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments.

Note : – Deploying the CloudFormation stack in your account incurs AWS usage charges.

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack.
  2. Enter a stack name.
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake S3 bucketiceberg-cdc-xxxxx-us-east-1-xxxxx
  • AthenaWorkGroupNameAthenaWorkgroup-xxxxxx
  • DataGeneratorLambdaFunctionUserRecordsFunction-xxxxxx
  • DynamoDBTableNameusers_xxxxxx
  • LambdaDMLFunctionIcebergUpsertFunction-xxxxxx
  • AthenaIcebergTableNameusers_xxxxxx

Generate sample employee data and load into the DynamoDB table using Lambda

To test the solution, trigger the UserRecordsFunction-XXXXX function by creating a test event which loads sample data into DynamoDB table.

  1. On the Lambda console, open the Lambda function with the name UserRecordsFunction-XXXXX.
  2. On the Code tab, choose Test, then Configure test event.
  3. Configure a test event with the default hello-world template event JSON.
  4. Provide an event name without any changes to the template and save the test event.
  5. On the Test tab, choose Test to trigger the SampleEvent test event. This will invoke the data generator Lambda function to load data into the users_xxxxxx DynamoDB table. When the test event is complete, you should notice a success notification as shown in the following screenshot.
  6. On the DynamoDB console, navigate to the users_XXXXXX table and choose Explore table items to verify the data loaded into the table.

The data loads performed on the DynamoDB table will be cascaded to the Athena table with the help of the IcebergUpsertFunction-xxxxx Lambda function deployed by CloudFormation template.

In the following sections, we simulate and validate various scenarios to demonstrate Iceberg capabilities, including DML operations, time travel, and optimizations.

Simulate the scenarios and validate CDC functionality in Athena

After the first run of the data generator Lambda function, navigate to the Athena query editor, choose the AthenaWorkgroup-XXXXX workgroup, and preview the user_XXXXXX Iceberg table to query the records.

With the data inserted into the DynamoDB table, all the data change activities such as inserts, updates, and deletes are captured in DynamoDB Streams. DynamoDB Streams triggers IcebergUpsertFunction-xxxxx Lambda function which processes the events in the order they are received. IcebergUpsertFunction-xxxxx function, performs the following steps:

  • Receives the stream event
  • Parses the stream event based on the  DynamdoDB eventType (insert, update, or delete) and eventually generates an Athena DML SQL statement
  • Runs the SQL statement in Athena

Let’s deep dive in to the IcebergUpsertFunction-XXXX function code and how it handles various scenarios.

IcebergUpsertFunction-xxxxx function code

As indicated in the following Lambda function code block, the DynamoDB Streams event received by the function, categorizes events based on eventType—INSERT, MODIFY, or DELETE. Any other event raises InvalidEventException. MODIFY is considered an UPDATE event.

All the DML operations are run on the user_XXXXXX table in Athena. We fetch the metadata of the users_xxxxxx table from Athena. The following are a few important considerations regarding how the Lambda function handles Iceberg table metadata changes:

  • In this approach, target metadata takes precedence during DML operations.
  • Any columns that are missing in the target will be excluded in the DML command.
  • It’s imperative that the source and target metadata match. Incase new columns and attributes are added to source table than the current solution is configured to skip the new columns and attributes.
  • This solution can be enhanced further to cascade source system metadata changes to the target table in Athena.

The following is the Lambda function code:

def iceberg_upsert(event, database, tablename):
    response ={}
    logger.info(f'Started iceberg_upsert executing.')
    logger.info(f'Started parsing received event.')
    # Determine type of event
    # call for athena function 
    except Exception as e:
        logger.error(f"Athena Metadata does not have column information. Please check table {tablename} and database {database} ")
    else: # else block for try/except
        if eventName == "INSERT":
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        elif eventName == "MODIFY":
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        elif eventName == "REMOVE":
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
            raise InvalidEventTypeException
    except InvalidEventTypeException:
        logger.warning(f'Event type should be INSERT/MODIFY/REMOVE. Received event type is : {eventName}.')
        logger.warning(f'Skipping applying grant/revoke permissions.')
    except Exception as e:
        logger.error("iceberg_upsert function failed with error")
    else : # else block for try/except
        return response

The following code uses the Athena Boto3 client to fetch the table metadata:

def retrieve_athena_table_metadata(databaseName, tableName, catalogName=None):
    if catalogName is None:
        catalogName='AWSDATACATALOG' # default value 
    except Exception as e:
        logger.error("Athena Table Metadata retrieval function Failed.Please check exception", e)
    else: # else block for try except
        return athenaTblMd

Insert operations

Now let’s see how insert operations are handled with the sample data generated in the DynamoDB table.

  1. On the DynamoDB console, navigate to the users_XXXXX table.
  2. Choose Create item.
  3. Enter a sample record with the following code:
      "emp_no": {
         "N": "11"
      "country": {
         "S": "USA"
      "dateOfBirth": {
         "S": "1991-10-23"
      "first_name": {
         "S": "Tom"
      "isContractAthlete": {
         "BOOL": false
      "job": {
         "S": "Sr Manager"
      "last_name": {
         "S": "Carter"
      "phone_number": {
         "S": "+1-226-333-789"
      "sex": {
         "S": "male"
      "ssn": {
         "S": "434-98-2345"

  4. Choose Create item to insert the new record into the DynamoDB table.

After the item is created in the DynamoDB table, a stream event is generated in DynamoDB Streams, which triggers the Lambda function. The function processes the event and generates an equivalent INSERT SQL statement to run on the Athena table. The following screenshot shows the INSERT SQL that was generated by the Lambda function on the Athena console in the Recent queries section.

The IcebergUpsertFunction-xxxxx Lambda code has modularized functions for each eventType. The following code highlights the function, which processes insert eventType streams:

def insert_stmt(insert_event_resp,AthenTblMd,database,tablename):
    Tblvalues={ k.lower():v for k,v in Tablevalues.items()} # converting key names to lowercase to prevent case-sensitive mismatches
    for item in val_list:
        if item.get('data') is not None:
            if item['Type'] != 'string':
                val_for_col.append(f"CAST ({(item['data'])} AS {item['Type']})" )
    colnames_with_doublequotes=",".join([f'"{i}"' for i in col_nm])
    values_formatted=",".join([f"{i}" if i.startswith('CAST') else f"'{i}'" for i in val_for_col] )
    return f"insert into {database}.{tablename} ({colnames_with_doublequotes}) values ({values_formatted})"

This function parses the create item stream event and constructs an INSERT SQL statement in the following format:

INSERT into <tablename> values (val1, val2....)

The function returns a string, which is an ANSI SQL compliant statement that can be run directly in Athena.

Update operations

For our update operation, let’s identify the current state of a record in the Athena table. We see emp_no=5 and its column values in Athena and compare them to the DynamoDB table. If there are no changes, the records should be the same, as shown in the following screenshots.

Let’s initiate an edit item operation in the DynamoDB table. We modify the following values:

  • IsContractAthlete – True
  • Phone_number – 123-456-789

After the item is edited in the DynamoDB table, a MODIFY stream event is generated in DynamoDB Streams, which triggers the Lambda function. The function processes the event and generates the equivalent UPDATE SQL statement to run on the Athena table.

MODIFY DynamoDB Streams events have two components: the old image and the new image. Here we parse only the new image data section to construct an UPDATE ANSI SQL statement and run it on the Athena tables.

The following update_stmt code block parses the modify item stream event and constructs the corresponding UPDATE SQL statement with new image data. The code block performs the following steps:

  • Finds the key columns for the WHERE clause
  • Finds columns for the SET clause
  • Ensures key columns are not part of the SET command

The function returns a string that is a SQL ANSI compliant statement that can be run directly in Athena. For example:

UPDATE <TABLENAME> SET col = value where key = value

See the following code:

def update_stmt(update_event_resp,AthenTblMd,database,tablename):
    Tblvalues={ k.lower():v for k,v in Tablevalues.items()} # converting key names to lowercase to prevent case-sensitive mismatches
    # removing primary keys from the stream dictionary so that SET command for Update can be constructed.
    for col_pkey in primary_key_col_names.keys():
    for position,item in enumerate(AthenTblMd):
        if forUpdate.get(item.get('Name')) is not None:
    # For set clause
    for item in new_upd_AthenaTblMd:
        if item.get('data') is not None:
            if item['Type'] != 'string':
                set_nm.append(f"{item['Name']} = CAST ('{(item['data'])}' AS {item['Type']})")
                set_nm.append(f" {item['Name']} = '{item['data']}' ")
    set_cmd=f" set {','.join(set_nm)}"
    # for where clause
    for key, val in primary_key_col_names.items():
        where_nm.append(f" {key} = {list(val.values())[0]}")
    where_cmd=f" where {' and '.join(where_nm)}"
    return (f" UPDATE {database}.{tablename} {set_cmd}  {where_cmd}")

In the Athena table, we can see the columns IsContractAthlete and Phone_number have been updated to the recent values. The other column values remain the same because they weren’t modified.

Delete operations

For delete operations, let’s identify the current state of a record in Athena table. We choose emp_no=6 for this activity.

  1. On the DynamoDB console, navigate to the user table.
  2. Select the record for emp_no=6.
  3. On the Actions menu, choose Delete items.

After the delete item operation is performed on the DynamoDB table, it generates a DELETE eventType in the DynamoDB stream, which triggers the Iceberg-Upsert Lambda function.

The DELETE function removes the data based on key columns in the stream. The following function parses the stream to identify key columns of the deleted item. We construct a DELETE DML SQL statement with a WHERE clause of emp_no=6:

DELETE &lt;TABLENAME&gt; WHERE key = value

See the following code:

def del_stmt(del_event_resp,database,tablename):
    for key, val in primary_key_col_names.items():
        del_where_nm.append(f" {key} = {list(val.values())[0]}")
    del_where_cmd=f" where {' and '.join(del_where_nm)}"
    return f" DELETE FROM {database}.{tablename} {del_where_cmd} "   

The function returns a string, which is an ANSI SQL compliant statement that can be run directly in Athena. The following screenshot shows the DELETE statement that was run in Athena.

As you can see from the following screenshot, emp_no=6 record no longer exists in the Iceberg table when queried with Athena.

Time travel

Time travel queries in Athena query Amazon S3 for historical data from a consistent snapshot as of a specified date and time. Iceberg tables provide the capability of time travel. Each Iceberg table maintains a versioned manifest of the S3 objects that it contains. Previous versions of the manifest can be used for time travel and version travel queries. Version travel queries in Athena query Amazon S3 for historical data as of a specified snapshot ID. Iceberg format tracks every change that happened to the table in the tablename$iceberg_history table. When you query them, it will show timestamps when the changes occurred in the table.

Let’s find the timestamp when a DELETE statement was applied to the Athena table. In our query, it corresponds to the time 2023-04-18 21:34:13.970. With this timestamp, let’s query the main table to see if the emp_no=6 exists in it.

As shown in the following screenshot, the query result shows that the deleted record exists, and this can be used to reinsert data if required.

Optimize Iceberg tables

Every insert and update operation on an Iceberg table creates a separate data and metadata file. If there are multiple such update and insert operations, it might lead to multiple small fragmented files. Having these small files can cause an unnecessary number of metadata and less efficient queries. Utilize Athena OPTIMIZE command to compact these small files.


The OPTIMIZE table REWRITE DATA compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files.

The following query shows the number of data files that exist before the compaction process:

SELECT * FROM "users_73591300$iceberg_files"

The following query performs compaction on the Iceberg table:

OPTIMIZE "users_73591300$iceberg_files" REWRITE DATA USING BIN_PACK

We can observe that the compaction process merged multiple data files into a larger file.


The VACUUM statement on Iceberg tables removes data files that are no longer relevant, which reduces metadata size and storage consumption. VACUUM removes unwanted files older than the amount of time that is specified by the vacuum_max_snapshot_age_seconds table property (default 432000), as shown in the following code:

ALTER TABLE users_73591300 SET TBLPROPERTIES ('vacuum_max_snapshot_age_seconds'='259200')

The following query performs a vacuum operation on the Iceberg table:

VACUUM users_73591300

Clean up

When you have finished experimenting with this solution, clean up your resources to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.


In this post, we introduced a serverless CDC solution for semi-structured data using DynamoDB Streams and processing them in Iceberg tables. We demonstrated how to ingest semi-structured data in DynamoDB, identify changed data using DynamoDB Streams, and process them in Iceberg tables. We can expand the solution to build SCD type-2 functionality in data lakes to track historical data changes. This solution is appropriate for low frequency of updates, but for high frequency and larger volumes of data, we can aggregate the changes in a separate intermediate table using DynamoDB Streams and Amazon Kinesis Data Firehose, and then run periodic MERGE operations into the main Iceberg table.

We hope this post provided insights on how to process semi-structured data in a data lake when sources systems lack CDC capability.

About the authors

Vijay Velpula is a Data Lake Architect with AWS Professional Services. He helps customers building  modern data platforms through implementing Big Data & Analytics solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Karthikeyan Ramachandran is a Data Architect with AWS Professional Services. He specializes in MPP systems helping Customers build and maintain Data warehouse environments. Outside of work, he likes to binge-watch tv shows and loves playing cricket and volleyball.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

Use Amazon Athena to query data stored in Google Cloud Platform

Post Syndicated from Jonathan Wong original https://aws.amazon.com/blogs/big-data/use-amazon-athena-to-query-data-stored-in-google-cloud-platform/

As customers accelerate their migrations to the cloud and transform their businesses, some find themselves in situations where they have to manage data analytics in a multi-cloud environment, such as acquiring a company that runs on a different cloud provider. Customers who use multi-cloud environments often face challenges in data access and compatibility that can create blockades and slow down productivity.

When managing multi-cloud environments, customers must look for services that address these gaps through features providing interoperability across clouds. With the release of the Amazon Athena data source connector for Google Cloud Storage (GCS), you can run queries within AWS to query data in Google Cloud Storage, which can be stored in relational, non-relational, object, and custom data sources, whether that be Parquet or comma-separated value (CSV) format. Athena provides the connectivity and query interface and can easily be plugged into other AWS services for downstream use cases such as interactive analysis and visualizations. Some examples include AWS data analytics services such as AWS Glue for data integration, Amazon QuickSight for business intelligence (BI), as well as third-party software and services from AWS Marketplace.

This post demonstrates how to use Athena to run queries on Parquet or CSV files in a GCS bucket.

Solution overview

The following diagram illustrates the solution architecture.

The Athena Google Cloud Storage connector uses both AWS and Google Cloud Platform (GCP), so we will be referencing both cloud providers in the architecture diagram.

We use the following AWS services in this solution:

  • Amazon Athena – A serverless interactive analytics service. We use Athena to run queries on data stored on Google Cloud Storage.
  • AWS Lambda – A serverless compute service that is event driven and manages the underlying resources for you. We deploy a Lambda function data source connector to connect AWS with Google Cloud Provider.
  • AWS Secrets Manager – A secrets management service that helps protect access to your applications and services. We reference the secret in Secrets Manager in the Lambda function so we can run a query on AWS and it can access the data stored on Google Cloud Provider.
  • AWS Glue – A serverless data analytics service for data discovery, preparation, and integration. We create an AWS Glue database and table to point to the correct bucket and files within Google Cloud Storage.
  • Amazon Simple Storage Service (Amazon S3) – An object storage service that stores data as objects within buckets. We create an S3 bucket to store data that exceeds the Lambda function’s response size limits.

The Google Cloud Platform portion of the architecture contains a few services as well:

  • Google Cloud Storage – A managed service for storing unstructured data. We use Google Cloud Storage to store data within a bucket that will be used in a query from Athena, and we upload a CSV file directly to the GCS bucket.
  • Google Cloud Identity and Access Management (IAM) – The central source to control and manage visibility for cloud resources. We use Google Cloud IAM to create a service account and generate a key that will allow AWS to access GCP. We create a key with the service account, which is uploaded to Secrets Manager.


For this post, we create a VPC and security group that will be used in conjunction with the GCP connector. For complete steps, refer to Creating a VPC for a data source connector. The first step is to create the VPC using Amazon Virtual Private Cloud (Amazon VPC), as shown in the following screenshot.

Then we create a security group for the VPC, as shown in the following screenshot.

For more information about the prerequisites, refer to Amazon Athena Google Cloud Storage connector. Additionally, there are tables that highlight the specific data types that can be used such as CSV and Parquet files. There are also required permissions to run the solution.

Google Cloud Platform configuration

To begin, you must have either CSV or Parquet files stored within a GCS bucket. To create the bucket, refer to Create buckets. Make sure to note the bucket name—it will be referenced in a later step. After you create the bucket, upload your objects to the bucket. For instructions, refer to Upload objects from a filesystem.

The CSV data used in this example came from Mockaroo, which generated random test data as shown in the following screenshot. In this example, we use a CSV file, but you can also use Parquet files.

Additionally, you must create a service account to generate a key pair within Google Cloud IAM, which will be uploaded to Secrets Manager. For full instructions, refer to Create service accounts.

After you create the service account, you can create a key. For instructions, refer to Create and delete service account keys.

AWS configuration

Now that you have a GCS bucket with a CSV file and a generated JSON key file from Google Cloud Platform, you can proceed with the rest of the steps on AWS.

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret and specify Other type of secret.
  3. Provide the GCP generated key file content.

The next step is to deploy the Athena Google Cloud Storage connector. For more information, refer to Using the Athena console.

  1. On the Athena console, add a new data source.
  2. Select Google Cloud Storage.

  1. For Data source name, enter a name.
  2. For Lambda function, choose Create Lambda function to be redirected to the Lambda console.

  1. In the Application settings section, enter the information for Application name, SpillBucket, GCSSecretName, and LambdaFunctionName.

  1. You also have to create an S3 bucket to reference the S3 spill bucket parameter in order to store data that exceeds the Lambda function’s response size limits. For more information, refer to Create your first S3 bucket.

After you provide the Lambda function’s application settings, you’re redirected to the Review and create page.

  1. Confirm that these are the correct fields and choose Create data source.

Now that the data source connector has been created, you can connect Athena to the data source.

  1. On the Athena console, navigate to the data source.
  2. Under Data source details, choose the link for the Lambda function.

You can reference the Lambda function to connect to the data source. As an optional step and for validation, the variables that were put into the Lambda function can be found within the Lambda function’s environment variables on the Configuration tab.

  1. Because the built-in GCS connector schema inference capability is limited, it’s recommended to create an AWS Glue database and table for your metadata. For instructions, refer to Setting up databases and tables in AWS Glue.

The following screenshot shows our database details.

The following screenshot shows our table details.

Query the data

Now you can run queries on Athena that will access the data stored on Google Cloud Storage.

  1. On the Athena console, choose the correct data source, database, and table within the query editor.
  2. RunSELECT * FROM [AWS Glue Database name].[AWS Glue Table name]in the query editor.

As shown in the following screenshot, the results will be from the bucket on Google Cloud Storage.

The data that is stored on Google Cloud Platform can be accessed through AWS and used for many use cases, such as performing business intelligence, machine learning, or data science. Doing so can help unblock developers and data scientists so they can efficiently provide results and save time.

Clean up

Complete the following steps to clean up your resources:

  1. Delete the provisioned bucket in Google Cloud Storage.
  2. Delete the service account under IAM & Admin.
  3. Delete the secret GCP credentials in Secrets Manager.
  4. Delete the S3 spill bucket.
  5. Delete the Athena connector Lambda function.
  6. Delete the AWS Glue database and table.


If you receive a ROLLBACK_COMPLETE state and “can not be updated error” when creating the data source in Lambda, go to AWS CloudFormation, delete the CloudFormation stack, and try recreating it.

If the AWS Glue table doesn’t appear in the Athena query editor, verify that the data source and database values are correctly selected in the Data pane on the Athena query editor console.


In this post, we saw how you can minimize the time and effort required to access data on Google Cloud Platform and use it efficiently on AWS. Using the data connector helps organizations become multi-cloud agnostic and helps accelerate business growth. Additionally, you can build out BI applications with the discoveries, relationships, and insights found when analyzing the data, which can further your organization’s data analysis process.

About the Author

Jonathan Wong is a Solutions Architect at AWS assisting with initiatives within Strategic Accounts. He is passionate about solving customer challenges and has been exploring emerging technologies to accelerate innovation.

Monitor data pipelines in a serverless data lake

Post Syndicated from Virendhar Sivaraman original https://aws.amazon.com/blogs/big-data/monitor-data-pipelines-in-a-serverless-data-lake/

AWS serverless services, including but not limited to AWS Lambda, AWS Glue, AWS Fargate, Amazon EventBridge, Amazon Athena, Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS), and Amazon Simple Storage Service (Amazon S3), have become the building blocks for any serverless data lake, providing key mechanisms to ingest and transform data without fixed provisioning and the persistent need to patch the underlying servers. The combination of a data lake in a serverless paradigm brings significant cost and performance benefits. The advent of rapid adoption of serverless data lake architectures—with ever-growing datasets that need to be ingested from a variety of sources, followed by complex data transformation and machine learning (ML) pipelines—can present a challenge. Similarly, in a serverless paradigm, application logs in Amazon CloudWatch are sourced from a variety of participating services, and traversing the lineage across logs can also present challenges. To successfully manage a serverless data lake, you require mechanisms to perform the following actions:

  • Reinforce data accuracy with every data ingestion
  • Holistically measure and analyze ETL (extract, transform, and load) performance at the individual processing component level
  • Proactively capture log messages and notify failures as they occur in near-real time

In this post, we will walk you through a solution to efficiently track and analyze ETL jobs in a serverless data lake environment. By monitoring application logs, you can gain insights into job execution, troubleshoot issues promptly to ensure the overall health and reliability of data pipelines.

Overview of solution

The serverless monitoring solution focuses on achieving the following goals:

  • Capture state changes across all steps and tasks in the data lake
  • Measure service reliability across a data lake
  • Quickly notify operations of failures as they happen

To illustrate the solution, we create a serverless data lake with a monitoring solution. For simplicity, we create a serverless data lake with the following components:

  • Storage layer – Amazon S3 is the natural choice, in this case with the following buckets:
    • Landing – Where raw data is stored
    • Processed – Where transformed data is stored
  • Ingestion layer – For this post, we use Lambda and AWS Glue for data ingestion, with the following resources:
    • Lambda functions – Two Lambda functions that run to simulate a success state and failure state, respectively
    • AWS Glue crawlers – Two AWS Glue crawlers that run to simulate a success state and failure state, respectively
    • AWS Glue jobs – Two AWS Glue jobs that run to simulate a success state and failure state, respectively
  • Reporting layer – An Athena database to persist the tables created via the AWS Glue crawlers and AWS Glue jobs
  • Alerting layer – Slack is used to notify stakeholders

The serverless monitoring solution is devised to be loosely coupled as plug-and-play components that complement an existing data lake. The Lambda-based ETL tasks state changes are tracked using AWS Lambda Destinations. We have used an SNS topic for routing both success and failure states for the Lambda-based tasks. In the case of AWS Glue-based tasks, we have configured EventBridge rules to capture state changes. These event changes are also routed to the same SNS topic. For demonstration purposes, this post only provides state monitoring for Lambda and AWS Glue, but you can extend the solution to other AWS services.

The following figure illustrates the architecture of the solution.

The architecture contains the following components:

  • EventBridge rules – EventBridge rules that capture the state change for the ETL tasks—in this case AWS Glue tasks. This can be extended to other supported services as the data lake grows.
  • SNS topic – An SNS topic that serves to catch all state events from the data lake.
  • Lambda function – The Lambda function is the subscriber to the SNS topic. It’s responsible for analyzing the state of the task run to do the following:
    • Persist the status of the task run.
    • Notify any failures to a Slack channel.
  • Athena database – The database where the monitoring metrics are persisted for analysis.

Deploy the solution

The source code to implement this solution uses AWS Cloud Development Kit (AWS CDK) and is available on the GitHub repo monitor-serverless-datalake. This AWS CDK stack provisions required network components and the following:

  • Three S3 buckets (the bucket names are prefixed with the AWS account name and Regions, for example, the landing bucket is <aws-account-number>-<aws-region>-landing):
    • Landing
    • Processed
    • Monitor
  • Three Lambda functions:
    • datalake-monitoring-lambda
    • lambda-success
    • lambda-fail
  • Two AWS Glue crawlers:
    • glue-crawler-success
    • glue-crawler-fail
  • Two AWS Glue jobs:
    • glue-job-success
    • glue-job-fail
  • An SNS topic named datalake-monitor-sns
  • Three EventBridge rules:
    • glue-monitor-rule
    • event-rule-lambda-fail
    • event-rule-lambda-success
  • An AWS Secrets Manager secret named datalake-monitoring
  • Athena artifacts:
    • monitor database
    • monitor-table table

You can also follow the instructions in the GitHub repo to deploy the serverless monitoring solution. It takes about 10 minutes to deploy this solution.

Connect to a Slack channel

We still need a Slack channel to which the alerts are delivered. Complete the following steps:

  1. Set up a workflow automation to route messages to the Slack channel using webhooks.
  2. Note the webhook URL.

The following screenshot shows the field names to use.

The following is a sample message for the preceding template.

  1. On the Secrets Manager console, navigate to the datalake-monitoring secret.
  2. Add the webhook URL to the slack_webhook secret.

Load sample data

The next step is to load some sample data. Copy the sample data files to the landing bucket using the following command:

aws s3 cp --recursive s3://awsglue-datasets/examples/us-legislators s3://<AWS_ACCCOUNT>-<AWS_REGION>-landing/legislators

In the next sections, we show how Lambda functions, AWS Glue crawlers, and AWS Glue jobs work for data ingestion.

Test the Lambda functions

On the EventBridge console, enable the rules that trigger the lambda-success and lambda-fail functions every 5 minutes:

  • event-rule-lambda-fail
  • event-rule-lambda-success

After a few minutes, the failure events are relayed to the Slack channel. The following screenshot shows an example message.

Disable the rules after testing to avoid repeated messages.

Test the AWS Glue crawlers

On the AWS Glue console, navigate to the Crawlers page. Here you can start the following crawlers:

  • glue-crawler-success
  • glue-crawler-fail

In a minute, the glue-crawler-fail crawler’s status changes to Failed, which triggers a notification in Slack in near-real time.

Test the AWS Glue jobs

On the AWS Glue console, navigate to the Jobs page, where you can start the following jobs:

  • glue-job-success
  • glue-job-fail

In a few minutes, the glue-job-fail job status changes to Failed, which triggers a notification in Slack in near-real time.

Analyze the monitoring data

The monitoring metrics are persisted in Amazon S3 for analysis and can be used of historical analysis.

On the Athena console, navigate to the monitor database and run the following query to find the service that failed the most often:

SELECT service_type, count(*) as "fail_count"
FROM "monitor"."monitor"
WHERE event_type = 'failed'
group by service_type
order by fail_count desc;

Over time with rich observability data – time series based monitoring data analysis will yield interesting findings.

Clean up

The overall cost of the solution is less than one dollar but to avoid future costs, make sure to clean up the resources created as part of this post.


The post provided an overview of a serverless data lake monitoring solution that you can configure and deploy to integrate with enterprise serverless data lakes in just a few hours. With this solution, you can monitor a serverless data lake, send alerts in near-real time, and analyze performance metrics for all ETL tasks operating in the data lake. The design was intentionally kept simple to demonstrate the idea; you can further extend this solution with Athena and Amazon QuickSight to generate custom visuals and reporting. Check out the GitHub repo for a sample solution and further customize it for your monitoring needs.

About the Authors

Virendhar (Viru) Sivaraman is a strategic Senior Big Data & Analytics Architect with Amazon Web Services. He is passionate about building scalable big data and analytics solutions in the cloud. Besides work, he enjoys spending time with family, hiking & mountain biking.

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a Bigdata enthusiast and holds 14 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation.

Create an Apache Hudi-based near-real-time transactional data lake using AWS DMS, Amazon Kinesis, AWS Glue streaming ETL, and data visualization using Amazon QuickSight

Post Syndicated from Raj Ramasubbu original https://aws.amazon.com/blogs/big-data/create-an-apache-hudi-based-near-real-time-transactional-data-lake-using-aws-dms-amazon-kinesis-aws-glue-streaming-etl-and-data-visualization-using-amazon-quicksight/

With the rapid growth of technology, more and more data volume is coming in many different formats—structured, semi-structured, and unstructured. Data analytics on operational data at near-real time is becoming a common need. Due to the exponential growth of data volume, it has become common practice to replace read replicas with data lakes to have better scalability and performance. In most real-world use cases, it’s important to replicate the data from the relational database source to the target in real time. Change data capture (CDC) is one of the most common design patterns to capture the changes made in the source database and reflect them to other data stores.

We recently announced support for streaming extract, transform, and load (ETL) jobs in AWS Glue version 4.0, a new version of AWS Glue that accelerates data integration workloads in AWS. AWS Glue streaming ETL jobs continuously consume data from streaming sources, clean and transform the data in-flight, and make it available for analysis in seconds. AWS also offers a broad selection of services to support your needs. A database replication service such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3), which commonly hosts the storage layer of the data lake. Although it’s straightforward to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s difficult to apply this CDC process on your data lakes. Apache Hudi, an open-source data management framework used to simplify incremental data processing and data pipeline development, is a good option to solve this problem.

This post demonstrates how to apply CDC changes from Amazon Relational Database Service (Amazon RDS) or other relational databases to an S3 data lake, with flexibility to denormalize, transform, and enrich the data in near-real time.

Solution overview

We use an AWS DMS task to capture near-real-time changes in the source RDS instance, and use Amazon Kinesis Data Streams as a destination of the AWS DMS task CDC replication. An AWS Glue streaming job reads and enriches changed records from Kinesis Data Streams and performs an upsert into the S3 data lake in Apache Hudi format. Then we can query the data with Amazon Athena visualize it in Amazon QuickSight. AWS Glue natively supports continuous write operations for streaming data to Apache Hudi-based tables.

The following diagram illustrates the architecture used for this post, which is deployed through an AWS CloudFormation template.


Before you get started, make sure you have the following prerequisites:

Source data overview

To illustrate our use case, we assume a data analyst persona who is interested in analyzing near-real-time data for sport events using the table ticket_activity. An example of this table is shown in the following screenshot.

Apache Hudi connector for AWS Glue

For this post, we use AWS Glue 4.0, which already has native support for the Hudi framework. Hudi, an open-source data lake framework, simplifies incremental data processing in data lakes built on Amazon S3. It enables capabilities including time travel queries, ACID (Atomicity, Consistency, Isolation, Durability) transactions, streaming ingestion, CDC, upserts, and deletes.

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An RDS database instance (source).
  • An AWS DMS replication instance, used to replicate the data from the source table to Kinesis Data Streams.
  • A Kinesis data stream.
  • Four AWS Glue Python shell jobs:
    • rds-ingest-rds-setup-<CloudFormation Stack name> – creates one source table called ticket_activity on Amazon RDS.
    • rds-ingest-data-initial-<CloudFormation Stack name> – Sample data is automatically generated at random by the Faker library and loaded to the ticket_activity table.
    • rds-ingest-data-incremental-<CloudFormation Stack name> – Ingests new ticket activity data into the source table ticket_activity continuously. This job simulates customer activity.
    • rds-upsert-data-<CloudFormation Stack name> – Upserts specific records in the source table ticket_activity. This job simulates administrator activity.
  • AWS Identity and Access Management (IAM) users and policies.
  • An Amazon VPC, a public subnet, two private subnets, internet gateway, NAT gateway, and route tables.
    • We use private subnets for the RDS database instance and AWS DMS replication instance.
    • We use the NAT gateway to have reachability to pypi.org to use the MySQL connector for Python from the AWS Glue Python shell jobs. It also provides reachability to Kinesis Data Streams and an Amazon S3 API endpoint

To set up these resources, you must have the following prerequisites:

The following diagram illustrates the architecture of our provisioned resources.

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack
  3. Choose Next.
  4. For S3BucketName, enter the name of your new S3 bucket.
  5. For VPCCIDR, enter a CIDR IP address range that doesn’t conflict with your existing networks.
  6. For PublicSubnetCIDR, enter the CIDR IP address range within the CIDR you gave for VPCCIDR.
  7. For PrivateSubnetACIDR and PrivateSubnetBCIDR, enter the CIDR IP address range within the CIDR you gave for VPCCIDR.
  8. For SubnetAzA and SubnetAzB, choose the subnets you want to use.
  9. For DatabaseUserName, enter your database user name.
  10. For DatabaseUserPassword, enter your database user password.
  11. Choose Next.
  12. On the next page, choose Next.
  13. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  14. Choose Create stack.

Stack creation can take about 20 minutes.

Set up an initial source table

The AWS Glue job rds-ingest-rds-setup-<CloudFormation stack name> creates a source table called event on the RDS database instance. To set up the initial source table in Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose rds-ingest-rds-setup-<CloudFormation stack name> to open the job.
  3. Choose Run.
  4. Navigate to the Runs tab and wait for Run status to show as SUCCEEDED.

This job will only create the one table, ticket_activity, in the MySQL instance (DDL). See the following code:

CREATE TABLE ticket_activity (
sport_type VARCHAR(256) NOT NULL,
location VARCHAR(256) NOT NULL,
seat_level VARCHAR(256) NOT NULL,
seat_location VARCHAR(256) NOT NULL,
ticket_price INT NOT NULL,
customer_name VARCHAR(256) NOT NULL,
email_address VARCHAR(256) NOT NULL,
updated_at DATETIME NOT NULL )

Ingest new records

In this section, we detail the steps to ingest new records. Implement following steps to star the execution of the jobs.

Start data ingestion to Kinesis Data Streams using AWS DMS

To start data ingestion from Amazon RDS to Kinesis Data Streams, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the task rds-to-kinesis-<CloudFormation stack name>.
  3. On the Actions menu, choose Restart/Resume.
  4. Wait for the status to show as Load complete and Replication ongoing.

The AWS DMS replication task ingests data from Amazon RDS to Kinesis Data Streams continuously.

Start data ingestion to Amazon S3

Next, to start data ingestion from Kinesis Data Streams to Amazon S3, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose streaming-cdc-kinesis2hudi-<CloudFormation stack name> to open the job.
  3. Choose Run.

Do not stop this job; you can check the run status on the Runs tab and wait for it to show as Running.

Start the data load to the source table on Amazon RDS

To start data ingestion to the source table on Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose rds-ingest-data-initial-<CloudFormation stack name> to open the job.
  3. Choose Run.
  4. Navigate to the Runs tab and wait for Run status to show as SUCCEEDED.

Validate the ingested data

After about 2 minutes from starting the job, the data should be ingested into the Amazon S3. To validate the ingested data in the Athena, complete the following steps:

  1. On the Athena console, complete the following steps if you’re running an Athena query for the first time:
    • On the Settings tab, choose Manage.
    • Specify the stage directory and the S3 path where Athena saves the query results.
    • Choose Save.

  1. On the Editor tab, run the following query against the table to check the data:
SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" limit 10;

Note that AWS Cloud Formation will create the database with the account number as database_<your-account-number>_hudi_cdc_demo.

Update existing records

Before you update the existing records, note down the ticketactivity_id value of a record from the ticket_activity table. Run the following SQL using Athena. For this post, we use ticketactivity_id = 46 as an example:

SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" limit 10;

To simulate a real-time use case, update the data in the source table ticket_activity on the RDS database instance to see that the updated records are replicated to Amazon S3. Complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose rds-ingest-data-incremental-<CloudFormation stack name> to open the job.
  3. Choose Run.
  4. Choose the Runs tab and wait for Run status to show as SUCCEEDED.

To upsert the records in the source table, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job rds-upsert-data-<CloudFormation stack name>.
  3. On the Job details tab, under Advanced properties, for Job parameters, update the following parameters:
    • For Key, enter --ticketactivity_id.
    • For Value, replace 1 with one of the ticket IDs you noted above (for this post, 46).

  1. Choose Save.
  2. Choose Run and wait for the Run status to show as SUCCEEDED.

This AWS Glue Python shell job simulates a customer activity to buy a ticket. It updates a record in the source table ticket_activity on the RDS database instance using the ticket ID passed in the job argument --ticketactivity_id. It will update ticket_price=500 and updated_at with the current timestamp.

To validate the ingested data in Amazon s3, run the same query from Athena and check the ticket_activity value you noted earlier to observe the ticket_price and updated_at fields:

SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" where ticketactivity_id = 46 ;

Visualize the data in QuickSight

After you have the output file generated by the AWS Glue streaming job in the S3 bucket, you can use QuickSight to visualize the Hudi data files. QuickSight is a scalable, serverless, embeddable, ML-powered business intelligence (BI) service built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights. QuickSight dashboards can be accessed from any device and seamlessly embedded into your applications, portals, and websites.

Build a QuickSight dashboard

To build a QuickSight dashboard, complete the following steps:

  1. Open the QuickSight console.

You’re presented with the QuickSight welcome page. If you haven’t signed up for QuickSight, you may have to complete the signup wizard. For more information, refer to Signing up for an Amazon QuickSight subscription.

After you have signed up, QuickSight presents a “Welcome wizard.” You can view the short tutorial, or you can close it.

  1. On the QuickSight console, choose your user name and choose Manage QuickSight.
  2. Choose Security & permissions, then choose Manage.
  3. Select Amazon S3 and select the buckets that you created earlier with AWS CloudFormation.
  4. Select Amazon Athena.
  5. Choose Save.
  6. If you changed your Region during the first step of this process, change it back to the Region that you used earlier during the AWS Glue jobs.

Create a dataset

Now that you have QuickSight up and running, you can create your dataset. Complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter a name (for example, hudi-blog).
  5. Choose Validate.
  6. After the validation is successful, choose Create data source.
  7. For Database, choose database_<your-account-number>_hudi_cdc_demo.
  8. For Tables, select ticket_activity.
  9. Choose Select.
  10. Choose Visualize.
  11. Choose hour and then ticket_activity_id to get the count of ticket_activity_id by hour.

Clean up

To clean up your resources, complete the following steps:

  1. Stop the AWS DMS replication task rds-to-kinesis-<CloudFormation stack name>.
  2. Navigate to the RDS database and choose Modify.
  3. Deselect Enable deletion protection, then choose Continue.
  4. Stop the AWS Glue streaming job streaming-cdc-kinesis2redshift-<CloudFormation stack name>.
  5. Delete the CloudFormation stack.
  6. On the QuickSight dashboard, choose your user name, then choose Manage QuickSight.
  7. Choose Account settings, then choose Delete account.
  8. Choose Delete account to confirm.
  9. Enter confirm and choose Delete account.


In this post, we demonstrated how you can stream data—not only new records, but also updated records from relational databases—to Amazon S3 using an AWS Glue streaming job to create an Apache Hudi-based near-real-time transactional data lake. With this approach, you can easily achieve upsert use cases on Amazon S3. We also showcased how to visualize the Apache Hudi table using QuickSight and Athena. As a next step, refer to the Apache Hudi performance tuning guide for a high-volume dataset. To learn more about authoring dashboards in QuickSight, check out the QuickSight Author Workshop.

About the Authors

Raj Ramasubbu is a Sr. Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Sundeep Kumar is a Sr. Data Architect, Data Lake at AWS, helping customers build data lake and analytics platform and solutions. When not building and designing data lakes, Sundeep enjoys listening music and playing guitar.

Estimating Scope 1 Carbon Footprint with Amazon Athena

Post Syndicated from Thomas Burns original https://aws.amazon.com/blogs/big-data/estimating-scope-1-carbon-footprint-with-amazon-athena/

Today, more than 400 organizations have signed The Climate Pledge, a commitment to reach net-zero carbon by 2040. Some of the drivers that lead to setting explicit climate goals include customer demand, current and anticipated government relations, employee demand, investor demand, and sustainability as a competitive advantage. AWS customers are increasingly interested in ways to drive sustainability actions. In this blog, we will walk through how we can apply existing enterprise data to better understand and estimate Scope 1 carbon footprint using Amazon Simple Storage Service (S3) and Amazon Athena, a serverless interactive analytics service that makes it easy to analyze data using standard SQL.

The Greenhouse Gas Protocol

The Greenhouse Gas Protocol (GHGP) provides standards for measuring and managing global warming impacts from an organization’s operations and value chain.

The greenhouse gases covered by the GHGP are the seven gases required by the UNFCCC/Kyoto Protocol (which is often called the “Kyoto Basket”). These gases are carbon dioxide (CO2), methane (CH4), nitrous oxide (N2O), the so-called F-gases (hydrofluorocarbons and perfluorocarbons), sulfur hexafluoride (SF6) nitrogen trifluoride (NF3). Each greenhouse gas is characterized by its global warming potential (GWP), which is determined by the gas’s greenhouse effect and its lifetime in the atmosphere. Since carbon dioxide (CO2) accounts for about 76 percent of total man-made greenhouse gas emissions, the global warming potential of greenhouse gases are measured relative to CO2, and are thus expressed as CO2-equivalent (CO2e).

The GHGP divides an organization’s emissions into three primary scopes:

  • Scope 1 – Direct greenhouse gas emissions (for example from burning fossil fuels)
  • Scope 2 – Indirect emissions from purchased energy (typically electricity)
  • Scope 3 – Indirect emissions from the value chain, including suppliers and customers

How do we estimate greenhouse gas emissions?

There are different methods to estimating GHG emissions that includes the Continuous Emissions Monitoring System (CEMS) Method, the Spend-Based Method, and the Consumption-Based Method.

Direct Measurement – CEMS Method

An organization can estimate its carbon footprint from stationary combustion sources by performing a direct measurement of carbon emissions using the CEMS method. This method requires continuously measuring the pollutants emitted in exhaust gases from each emissions source using equipment such as gas analyzers, gas samplers, gas conditioning equipment (to remove particulate matter, water vapor and other contaminants), plumbing, actuated valves, Programmable Logic Controllers (PLCs) and other controlling software and hardware. Although this approach may yield useful results, CEMS requires specific sensing equipment for each greenhouse gas to be measured, requires supporting hardware and software, and is typically more suitable for Environment Health and Safety applications of centralized emission sources. More information on CEMS is available here.

Spend-Based Method

Because the financial accounting function is mature and often already audited, many organizations choose to use financial controls as a foundation for their carbon footprint accounting. The Economic Input-Output Life Cycle Assessment (EIO LCA) method is a spend-based method that combines expenditure data with monetary-based emission factors to estimate the emissions produced. The emission factors are published by the U.S. Environment Protection Agency (EPA) and other peer-reviewed academic and government sources. With this method, you can multiply the amount of money spent on a business activity by the emission factor to produce the estimated carbon footprint of the activity.

For example, you can convert the amount your company spends on truck transport to estimated kilograms (KG) of carbon dioxide equivalent (CO₂e) emitted as shown below.

Estimated Carbon Footprint = Amount of money spent on truck transport * Emission Factor [1]

Although these computations are very easy to make from general ledgers or other financial records, they are most valuable for initial estimates or for reporting minor sources of greenhouse gases. As the only user-provided input is the amount spent on an activity, EIO LCA methods aren’t useful for modeling improved efficiency. This is because the only way to reduce EIO-calculated emissions is to reduce spending. Therefore, as a company continues to improve its carbon footprint efficiency, other methods of estimating carbon footprint are often more desirable.

Consumption-Based Method

From either Enterprise Resource Planning (ERP) systems or electronic copies of fuel bills, it’s straightforward to determine the amount of fuel an organization procures during a reporting period. Fuel-based emission factors are available from a variety of sources such as the US Environmental Protection Agency and commercially-licensed databases. Multiplying the amount of fuel procured by the emission factor yields an estimate of the CO2e emitted through combustion. This method is often used for estimating the carbon footprint of stationary emissions (for instance backup generators for data centers or fossil fuel ovens for industrial processes).

If for a particular month an enterprise consumed a known amount of motor gasoline for stationary combustion, the Scope 1 CO2e footprint of the stationary gasoline combustion can be estimated in the following manner:

Estimated Carbon Footprint = Amount of Fuel Consumed * Stationary Combustion Emission Factor[2]

Organizations may estimate their carbon emissions by using existing data found in fuel and electricity bills, ERP data, and relevant emissions factors, which are then consolidated in to a data lake. Using existing analytics tools such as Amazon Athena and Amazon QuickSight an organization can gain insight into its estimated carbon footprint.

The data architecture diagram below shows an example of how you could use AWS services to calculate and visualize an organization’s estimated carbon footprint.

Analytics Architecture

Customers have the flexibility to choose the services in each stage of the data pipeline based on their use case. For example, in the data ingestion phase, depending on the existing data requirements, there are many options to ingest data into the data lake such as using the AWS Command Line Interface (CLI), AWS DataSync, or AWS Database Migration Service.

Example of calculating a Scope 1 stationary emissions footprint with AWS services

Let’s assume you burned 100 standard cubic feet (scf) of natural gas in an oven. Using the US EPA emission factors for stationary emissions we can estimate the carbon footprint associated with the burning. In this case the emission factor is 0.05449555 Kg CO2e /scf.[3]

Amazon S3 is ideal for building a data lake on AWS to store disparate data sources in a single repository, due to its virtually unlimited scalability and high durability. Athena, a serverless interactive query service, allows the analysis of data directly from Amazon S3 using standard SQL without having to load the data into Athena or run complex extract, transform, and load (ETL) processes. Amazon QuickSight supports creating visualizations of different data sources, including Amazon S3 and Athena, and the flexibility to use custom SQL to extract a subset of the data. QuickSight dashboards can provide you with insights (such as your company’s estimated carbon footprint) quickly, and also provide the ability to generate standardized reports for your business and sustainability users.

In this example, the sample data is stored in a file system and uploaded to Amazon S3 using the AWS Command Line Interface (CLI) as shown in the following architecture diagram. AWS recommends creating AWS resources and managing CLI access in accordance with the Best Practices for Security, Identity, & Compliance guidance.

The AWS CLI command below demonstrates how to upload the sample data folders into the S3 target location.

aws s3 cp /path/to/local/file s3://bucket-name/path/to/destination

The snapshot of the S3 console shows two newly added folders that contains the files.

S3 Bucket Overview of Files

To create new table schemas, we start by running the following script for the gas utilization table in the Athena query editor using Hive DDL. The script defines the data format, column details, table properties, and the location of the data in S3.

CREATE EXTERNAL TABLE `gasutilization`(
`fuel_id` int,
`month` string,
`year` int,
`usage_therms` float,
`usage_scf` float,
`g-nr1_schedule_charge` float,
`accountfee` float,
`gas_ppps` float,
`netcharge` float,
`taxpercentage` float,
`totalcharge` float)
's3://<bucketname>/Scope 1 Sample Data/gasutilization'

Athena Hive DDLThe script below shows another example of using Hive DDL to generate the table schema for the gas emission factor data.

CREATE EXTERNAL TABLE `gas_emission_factor`(
`fuel_id` int,
`gas_name` string,
`emission_factor` float)
's3://<bucketname>/Scope 1 Sample Data/gas_emission_factor'

After creating the table schema in Athena, we run the below query against the gas utilization table that includes details of gas bills to show the gas utilization and the associated charges, such as gas public purpose program surcharge (PPPS) and total charges after taxes for the year of 2020:

SELECT * FROM "gasutilization" where year = 2020;

Athena gas utilization overview by month

We are also able to analyze the emission factor data showing the different fuel types and their corresponding CO2e emission as shown in the screenshot.

athena co2e emission factor

With the emission factor and the gas utilization data, we can run the following query below to get an estimated Scope 1 carbon footprint alongside other details. In this query, we joined the gas utilization table and the gas emission factor table on fuel id and multiplied the gas usage in standard cubic foot (scf) by the emission factor to get the estimated CO2e impact. We also selected the month, year, total charge, and gas usage measured in therms and scf, as these are often attributes that are of interest for customers.

SELECT "gasutilization"."usage_scf" * "gas_emission_factor"."emission_factor" 
AS "estimated_CO2e_impact", 
FROM "gasutilization" 
JOIN "gas_emission_factor" 
on "gasutilization"."fuel_id"="gas_emission_factor"."fuel_id";

athena join

Lastly, Amazon QuickSight allows visualization of different data sources, including Amazon S3 and Athena, and the flexibility to use custom SQL to get a subset of the data. The following is an example of a QuickSight dashboard showing the gas utilization, gas charges, and estimated carbon footprint across different years.

QuickSight sample dashboard

We have just estimated the Scope 1 carbon footprint for one source of stationary combustion. If we were to do the same process for all sources of stationary and mobile emissions (with different emissions factors) and add the results together, we could roll up an accurate estimate of our Scope 1 carbon emissions for the entire business by only utilizing native AWS services and our own data. A similar process will yield an estimate of Scope 2 emissions, with grid carbon intensity in the place of Scope 1 emission factors.


This blog discusses how organizations can use existing data in disparate sources to build a data architecture to gain better visibility into Scope 1 greenhouse gas emissions. With Athena, S3, and QuickSight, organizations can now estimate their stationary emissions carbon footprint in a repeatable way by applying the consumption-based method to convert fuel utilization into an estimated carbon footprint.

Other approaches available on AWS include Carbon Accounting on AWS, Sustainability Insights Framework, Carbon Data Lake on AWS, and general guidance detailed at the AWS Carbon Accounting Page.

If you are interested in information on estimating your organization’s carbon footprint with AWS, please reach out to your AWS account team and check out AWS Sustainability Solutions.


  1. An example from page four of Amazon’s Carbon Methodology document illustrates this concept.
    Amount spent on truck transport: $100,000
    EPA Emission Factor: 1.556 KG CO2e /dollar of truck transport
    Estimated CO₂e emission: $100,000 * 1.556 KG CO₂e/dollar of truck transport = 155,600 KG of CO2e
  2. For example,
    Gasoline consumed: 1,000 US Gallons
    EPA Emission Factor: 8.81 Kg of CO2e /gallon of gasoline combusted
    Estimated CO2e emission = 1,000 US Gallons * 8.81 Kg of CO2e per gallon of gasoline consumed= 8,810 Kg of CO2e.
    EPA Emissions Factor for stationary emissions of motor gasoline is 8.78 kg CO2 plus .38 grams of CH4, plus .08 g of N2O.
    Combining these emission factors using 100-year global warming potential for each gas (CH4:25 and N2O:298) gives us Combined Emission Factor = 8.78 kg + 25*.00038 kg + 298 *.00008 kg = 8.81 kg of CO2e per gallon.
  3. The Emission factor per scf is 0.05444 kg of CO2 plus 0.00103 g of CH4 plus 0.0001 g of N2O. To get this in terms of CO2e we need to multiply the emission factor of the other two gases by their global warming potentials (GWP). The 100-year GWP for CH4  and N2O are 25 and 298 respectively. Emission factors and GWPs come from the US EPA website.

About the Authors

Thomas Burns
, SCR, CISSP is a Principal Sustainability Strategist and Principal Solutions Architect at Amazon Web Services. Thomas supports manufacturing and industrial customers world-wide. Thomas’s focus is using the cloud to help companies reduce their environmental impact both inside and outside of IT.

Aileen Zheng is a Solutions Architect supporting US Federal Civilian Sciences customers at Amazon Web Services (AWS). She partners with customers to provide technical guidance on enterprise cloud adoption and strategy and helps with building well-architected solutions. She is also very passionate about data analytics and machine learning. In her free time, you’ll find Aileen doing pilates, taking her dog Mumu out for a hike, or hunting down another good spot for food! You’ll also see her contributing to projects to support diversity and women in technology.

Extend your data mesh with Amazon Athena and federated views

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/extend-your-data-mesh-with-amazon-athena-and-federated-views/

Amazon Athena is a serverless, interactive analytics service built on the Trino, PrestoDB, and Apache Spark open-source frameworks. You can use Athena to run SQL queries on petabytes of data stored on Amazon Simple Storage Service (Amazon S3) in widely used formats such as Parquet and open-table formats like Apache Iceberg, Apache Hudi, and Delta Lake. However, Athena also allows you to query data stored in 30 different data sources—in addition to Amazon S3—including relational, non-relational, and object stores running on premises or in other cloud environments.

In Athena, we refer to queries on non-Amazon S3 data sources as federated queries. These queries run on the underlying database, which means you can analyze the data without learning a new query language and without the need for separate extract, transform, and load (ETL) scripts to extract, duplicate, and prepare data for analysis.

Recently, Athena added support for creating and querying views on federated data sources to bring greater flexibility and ease of use to use cases such as interactive analysis and business intelligence reporting. Athena also updated its data connectors with optimizations that improve performance and reduce cost when querying federated data sources. The updated connectors use dynamic filtering and an expanded set of predicate pushdown optimizations to perform more operations in the underlying data source rather than in Athena. As a result, you get faster queries with less data scanned, especially on tables with millions to billions of rows of data.

In this post, we show how to create and query views on federated data sources in a data mesh architecture featuring data producers and consumers.

The term data mesh refers to a data architecture with decentralized data ownership. A data mesh enables domain-oriented teams with the data they need, emphasizes self-service, and promotes the notion of purpose-built data products. In a data mesh, data producers expose datasets to the organization and data consumers subscribe to and consume the data products created by producers. By distributing data ownership to cross-functional teams, a data mesh can foster a culture of collaboration, invention, and agility around data.

Let’s dive into the solution.

Solution overview

For this post, imagine a hypothetical ecommerce company that uses multiple data sources, each playing a different role:

  • In an S3 data lake, ecommerce records are stored in a table named Lineitems
  • Amazon ElastiCache for Redis stores Nations and ActiveOrders data, ensuring ultra-fast reads of operational data by downstream ecommerce systems
  • On Amazon Relational Database Service (Amazon RDS), MySQL is used to store data like email addresses and shipping addresses in the Orders, Customer, and Suppliers tables
  • For flexibility and low-latency reads and writes, an Amazon DynamoDB table holds Part and Partsupp data

We want to query these data sources in a data mesh design. In the following sections, we set up Athena data source connectors for MySQL, DynamoDB, and Redis, and then run queries that perform complex joins across these data sources. The following diagram depicts our data architecture.

Architecture diagram

As you proceed with this solution, note that you will create AWS resources in your account. We have provided you with an AWS CloudFormation template that defines and configures the required resources, including the sample MySQL database, S3 tables, Redis store, and DynamoDB table. The template also creates the AWS Glue database and tables, S3 bucket, Amazon S3 VPC endpoint, AWS Glue VPC endpoint, and other AWS Identity and Access Management (IAM) resources that are used in the solution.

The template is designed to demonstrate how to use federated views in Athena, and is not intended for production use without modification. Additionally, the template uses the us-east-1 Region and will not work in other Regions without modification. The template creates resources that incur costs while they are in use. Follow the cleanup steps at the end of this post to delete the resources and avoid unnecessary charges.


Before you launch the CloudFormation stack, ensure you have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI), and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation

Create resources with AWS CloudFormation

To get started, complete the following steps:

  1. Choose Launch Stack: Cloudformation Launch Stack
  2. Select I acknowledge that this template may create IAM resources.

The CloudFormation stack takes approximately 20–30 minutes to complete. You can monitor its progress on the AWS CloudFormation console. When status reads CREATE_COMPLETE, your AWS account will have the resources necessary to implement this solution.

Deploy connectors and connect to data sources

With our resources provisioned, we can begin to connect the dots in our data mesh. Let’s start by connecting the data sources created by the CloudFormation stack with Athena.

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select MySQL, then choose Next.
  4. For Data source name, enter a name, such as mysql. The Athena connector for MySQL is an AWS Lambda function that was created for you by the CloudFormation template.
  5. For Connection details, choose Select or enter a Lambda function.
  6. Choose mysql, then choose Next.
  7. Review the information and choose Create data source.
  8. Return to the Data sources page and choose mysql.
  9. On the connector details page, choose the link under Lambda function to access the Lambda console and inspect the function associated with this connector.
    mysql Data Soruce details
  10. Return to the Athena query editor.
  11. For Data source, choose mysql.
  12. For Database, choose the sales database.
  13. For Tables, you should see a listing of MySQL tables that are ready for you to query.
  14. Repeat these steps to set up the connectors for DynamoDB and Redis.

After all four data sources are configured, we can see the data sources on the Data source drop-down menu. All other databases and tables, like the lineitem table, which is stored on Amazon S3, are defined in the AWS Glue Data Catalog and can be accessed by choosing AwsDataCatalog as the data source.

This image shows AwsDataCatalog is being selected as Data Source

Analyze data with Athena

With our data sources configured, we are ready to start running queries and using federated views in a data mesh architecture. Let’s start by trying to find out how much profit was made on a given line of parts, broken out by supplier nation and year.

For such a query, we need to calculate, for each nation and year, the profit for parts ordered in each year that were filled by a supplier in each nation. Profit is defined as the sum of [(l_extendedprice*(1-l_discount)) - (ps_supplycost * l_quantity)] for all line items describing parts in the specified line.

Answering this question requires querying all four data sources—MySQL, DynamoDB, Redis, and Amazon S3—and is accomplished with the following SQL:

    n_name nation,
	year(CAST(o_orderdate AS date)) as o_year,
	((l_extendedprice * (1 - l_discount)) - (CAST(ps_supplycost AS double) * l_quantity)) as amount
    ((s_suppkey = l_suppkey)
    AND (ps_suppkey = l_suppkey)
	AND (ps_partkey = l_partkey)
	AND (p_partkey = l_partkey)
	AND (o_orderkey = l_orderkey)
	AND (s_nationkey = CAST(Regexp_extract(_key_, '.*-(.*)', 1) AS int)))

Running this query on the Athena console produces the following result.

Result of above query

This query is fairly complex: it involves multiple joins and requires special knowledge of the correct way to calculate profit metrics that other end-users may not possess.

To simplify the analysis experience for those users, we can hide this complexity behind a view. For more information on using views with federated data sources, see Querying federated views.

Use the following query to create the view in the data_lake database under the AwsDataCatalog data source:

CREATE OR REPLACE VIEW "data_lake"."federated_view" AS
    n_name nation,
	year(CAST(o_orderdate AS date)) as o_year,
	((l_extendedprice * (1 - l_discount)) - (CAST(ps_supplycost AS double) * l_quantity)) as amount
    ((s_suppkey = l_suppkey)
    AND (ps_suppkey = l_suppkey)
	AND (ps_partkey = l_partkey)
	AND (p_partkey = l_partkey)
	AND (o_orderkey = l_orderkey)
	AND (s_nationkey = CAST(Regexp_extract(_key_, '.*-(.*)', 1) AS int)))

Next, run a simple select query to validate the view was created successfully: SELECT * FROM federated_view limit 10

The result should be similar to our previous query.

With our view in place, we can perform new analyses to answer questions that would be challenging without the view due to the complex query syntax that would be required. For example, we can find the total profit by nation:

SELECT nation, sum(amount) AS total
from federated_view
GROUP BY nation 

Your results should resemble the following screenshot.

Result of above query

As you now see, the federated view makes it simpler for end-users to run queries on this data. Users are free to query a view of the data, defined by a knowledgeable data producer, rather than having to first acquire expertise in each underlying data source. Because Athena federated queries are processed where the data is stored, with this approach, we avoid duplicating data from the source system, saving valuable time and cost.

Use federated views in a multi-user model

So far, we have satisfied one of the principles of a data mesh: we created a data product (federated view) that is decoupled from its originating source and is available for on-demand analysis by consumers.

Next, we take our data mesh a step further by using federated views in a multi-user model. To keep it simple, assume we have one producer account, the account we used to create our four data sources and federated view, and one consumer account. Using the producer account, we give the consumer account permission to query the federated view from the consumer account.

The following figure depicts this setup and our simplified data mesh architecture.

Multi-user model setup

Follow these steps to share the connectors and AWS Glue Data Catalog resources from the producer, which includes our federated view, with the consumer account:

  1. Share the data sources mysql, redis, dynamo, and data_lake with the consumer account. For instructions, refer to Sharing a data source in Account A with Account B. Note that Account A represents the producer and Account B represents the consumer. Make sure you use the same data source names from earlier when sharing data. This is necessary for the federated view to work in a cross-account model.
  2. Next, share the producer account’s AWS Glue Data Catalog with the consumer account by following the steps in Cross-account access to AWS Glue data catalogs. For the data source name, use shared_federated_catalog.
  3. Switch to the consumer account, navigate to the Athena console, and verify that you see federated_view listed under Views in the shared_federated_catalog Data Catalog and data_lake database.
  4. Next, run a sample query on the shared view to see the query results.

Result of sample query

Clean up

To clean up the resources created for this post, complete the following steps:

  1. On the Amazon S3 console, empty the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code. Make sure you run this command on the correct bucket.
    aws s3 rm s3://athena-federation-workshop-<account-id> --recursive
  3. On the AWS CloudFormation console or the AWS CLI, delete the stack athena-federated-view-blog.


In this post, we demonstrated the functionality of Athena federated views. We created a view spanning four different federated data sources and ran queries against it. We also saw how federated views could be extended to a multi-user data mesh and ran queries from a consumer account.

To take advantage of federated views, ensure you are using Athena engine version 3 and upgrade your data source connectors to the latest version available. For information on how to upgrade a connector, see Updating a data source connector.

About the Authors

Saurabh Bhutyani is a Principal Big Data Specialist Solutions Architect at AWS. He is passionate about new technologies. He joined AWS in 2019 and works with customers to provide architectural guidance for running scalable analytics solutions and data mesh architectures using AWS analytics services like Amazon EMR, Amazon Athena, AWS Glue, AWS Lake Formation, and Amazon DataZone.

Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Orca Security’s journey to a petabyte-scale data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/orca-securitys-journey-to-a-petabyte-scale-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Eliad Gat and Oded Lifshiz from Orca Security.

With data becoming the driving force behind many industries today, having a modern data architecture is pivotal for organizations to be successful. One key component that plays a central role in modern data architectures is the data lake, which allows organizations to store and analyze large amounts of data in a cost-effective manner and run advanced analytics and machine learning (ML) at scale.

Orca Security is an industry-leading Cloud Security Platform that identifies, prioritizes, and remediates security risks and compliance issues across your AWS Cloud estate. Orca connects to your environment in minutes with patented SideScanning technology to provide complete coverage across vulnerabilities, malware, misconfigurations, lateral movement risk, weak and leaked passwords, overly permissive identities, and more.

The Orca Platform is powered by a state-of-the-art anomaly detection system that uses cutting-edge ML algorithms and big data capabilities to detect potential security threats and alert customers in real time, ensuring maximum security for their cloud environment. At the core of Orca’s anomaly detection system is its transactional data lake, which enables the company’s data scientists, analysts, data engineers, and ML specialists to extract valuable insights from vast amounts of data and deliver innovative cloud security solutions to its customers.

In this post, we describe Orca’s journey building a transactional data lake using Amazon Simple Storage Service (Amazon S3), Apache Iceberg, and AWS Analytics. We explore why Orca chose to build a transactional data lake and examine the key considerations that guided the selection of Apache Iceberg as the preferred table format.

In addition, we describe the Orca Platform architecture and the technologies used. Lastly, we discuss the challenges encountered throughout the project, present the solutions used to address them, and share valuable lessons learned.

Why did Orca build a data lake?

Prior to the creation of the data lake, Orca’s data was distributed among various data silos, each owned by a different team with its own data pipelines and technology stack. This setup led to several issues, including scaling difficulties as the data size grew, maintaining data quality, ensuring consistent and reliable data access, high costs associated with storage and processing, and difficulties supporting streaming use cases. Moreover, running advanced analytics and ML on disparate data sources proved challenging. To overcome these issues, Orca decided to build a data lake.

A data lake is a centralized data repository that enables organizations to store and manage large volumes of structured and unstructured data, eliminating data silos and facilitating advanced analytics and ML on the entire data. By decoupling storage and compute, data lakes promote cost-effective storage and processing of big data.

Why did Orca choose Apache Iceberg?

Orca considered several table formats that have evolved in recent years to support its transactional data lake. Amongst the options, Apache Iceberg stood out as the ideal choice because it met all of Orca’s requirements.

First, Orca sought a transactional table format that ensures data consistency and fault tolerance. Apache Iceberg’s transactional and ACID guarantees, which allow concurrent read and write operations while ensuring data consistency and simplified fault handling, fulfill this requirement. Furthermore, Apache Iceberg’s support for time travel and rollback capabilities makes it highly suitable for addressing data quality issues by reverting to a previous state in a consistent manner.

Second, a key requirement was to adopt an open table format that integrates with various processing engines. This was to avoid vendor lock-in and allow teams to choose the processing engine that best suits their needs. Apache Iceberg’s engine-agnostic and open design meets this requirement by supporting all popular processing engines, including Apache Spark, Amazon Athena, Apache Flink, Trino, Presto, and more.

In addition, given the substantial data volumes handled by the system, an efficient table format was required that can support querying petabytes of data very fast. Apache Iceberg’s architecture addresses this need by efficiently filtering and reducing scanned data, resulting in accelerated query times.

An additional requirement was to allow seamless schema changes without impacting end-users. Apache Iceberg’s range of features, including schema evolution, hidden partitions, and partition evolution, addresses this requirement.

Lastly, it was important for Orca to choose a table format that is widely adopted. Apache Iceberg’s growing and active community aligned with the requirement for a popular and community-backed table format.

Solution overview

Orca’s data lake is based on open-source technologies that seamlessly integrate with Apache Iceberg. The system ingests data from various sources such as cloud resources, cloud activity logs, and API access logs, and processes billions of messages, resulting in terabytes of data daily. This data is sent to Apache Kafka, which is hosted on Amazon Managed Streaming for Apache Kafka (Amazon MSK). It is then processed using Apache Spark Structured Streaming running on Amazon EMR and stored in the data lake. Amazon EMR streamlines the process of loading all required Iceberg packages and dependencies, ensuring that the data is stored in Apache Iceberg format and ready for consumption as quickly as possible.

The data lake is built on top of Amazon S3 using Apache Iceberg table format with Apache Parquet as the underlying file format. In addition, the AWS Glue Data Catalog enables data discovery, and AWS Identity and Access Management (IAM) enforces secure access controls for the lake and its operations.

The data lake serves as the foundation for a variety of capabilities that are supported by different engines.

Data pipelines built on Apache Spark and Athena SQL analyze and process the data stored in the data lake. These data pipelines generate valuable insights and curated data that are stored in Apache Iceberg tables for downstream usage. This data is then used by various applications for streaming analytics, business intelligence, and reporting.

Amazon SageMaker is used to build, train, and deploy a range of ML models. Specifically, the system uses Amazon SageMaker Processing jobs to process the data stored in the data lake, employing the AWS SDK for Pandas (previously known as AWS Wrangler) for various data transformation operations, including cleaning, normalization, and feature engineering. This ensures that the data is suitable for training purposes. Additionally, SageMaker training jobs are employed for training the models. After the models are trained, they are deployed and used to identify anomalies and alert customers in real time to potential security threats. The following diagram illustrates the solution architecture.

Orca security Data Lake Architecture

Challenges and lessons learned

Orca faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing EMR streaming ingestion for high throughput
  • Taming the small files problem for fast reads
  • Maximizing performance with Athena version 3
  • Maintaining Apache Iceberg tables
  • Managing data retention
  • Monitoring the data lake infrastructure and operations
  • Mitigating data quality issues

In this section, we describe each of these challenges and the solutions implemented to address them.

Determining optimal table partitioning

Determining optimal partitioning for each table is very important in order to optimize query performance and minimize the impact on teams querying the tables when partitioning changes. Apache Iceberg’s hidden partitions combined with partition transformations proved to be valuable in achieving this goal because it allowed for transparent changes to partitioning without impacting end-users. Additionally, partition evolution enables experimentation with various partitioning strategies to optimize cost and performance without requiring a rewrite of the table’s data every time.

For example, with these features, Orca was able to easily change several of its table partitioning from DAY to HOUR with no impact on user queries. Without this native Iceberg capability, they would have needed to coordinate the new schema with all the teams that query the tables and rewrite the entire data, which would have been a costly, time-consuming, and error-prone process.

Optimizing EMR streaming ingestion for high throughput

As mentioned previously, the system ingests billions of messages daily, resulting in terabytes of data processed and stored each day. Therefore, optimizing the EMR clusters for this type of load while maintaining high throughput and low costs has been an ongoing challenge. Orca addressed this in several ways.

First, Orca chose to use instance fleets with its EMR clusters because they allow optimized resource allocation by combining different instance types and sizes. Instance fleets improve resilience by allowing multiple Availability Zones to be configured. As a result, the cluster will launch in an Availability Zone with all the required instance types, preventing capacity limitations. Additionally, instance fleets can use both Amazon Elastic Compute Cloud (Amazon EC2) On-Demand and Spot instances, resulting in cost savings.

The process of sizing the cluster for high throughput and lower costs involved adjusting the number of core and task nodes, selecting suitable instance types, and fine-tuning CPU and memory configurations. Ultimately, Orca was able to find an optimal configuration consisting of on-demand core nodes and spot task nodes of varying sizes, which provided high throughput but also ensured compliance with SLAs.

Orca also found that using different Kafka Spark Structured Streaming properties, such as minOffsetsPerTrigger, maxOffsetsPerTrigger, and minPartitions, provided higher throughput and better control of the load. Using minPartitions, which enables better parallelism and distribution across a larger number of tasks, was particularly useful for consuming high lags quickly.

Lastly, when dealing with a high data ingestion rate, Amazon S3 may throttle the requests and return 503 errors. To address this scenario, Iceberg offers a table property called write.object-storage.enabled, which incorporates a hash prefix into the stored S3 object path. This approach effectively mitigates throttling problems.

Taming the small files problem for fast reads

A common challenge often encountered when ingesting streaming data into the data lake is the creation of many small files. This can have a negative impact on read performance when querying the data with Athena or Apache Spark. Having a high number of files leads to longer query planning and runtimes due to the need to process and read each file, resulting in overhead for file system operations and network communication. Additionally, this can result in higher costs due to the large number of S3 PUT and GET requests required.

To address this challenge, Apache Spark Structured Streaming provides the trigger mechanism, which can be used to tune the rate at which data is committed to Apache Iceberg tables. The commit rate has a direct impact on the number of files being produced. For instance, a higher commit rate, corresponding to a shorter time interval, results in lots of data files being produced.

In certain cases, launching the Spark cluster on an hourly basis and configuring the trigger to AvailableNow facilitated the processing of larger data batches and reduced the number of small files created. Although this approach led to cost savings, it did involve a trade-off of reduced data freshness. However, this trade-off was deemed acceptable for specific use cases.

In addition, to address preexisting small files within the data lake, Apache Iceberg offers a data files compaction operation that combines these smaller files into larger ones. Running this operation on a schedule is highly recommended to optimize the number and size of the files. Compaction also proves valuable in handling late-arriving data and enables the integration of this data into consolidated files.

Maximizing performance with Athena version 3

Orca was an early adopter of Athena version 3, Amazon’s implementation of the Trino query engine, which provides extensive support for Apache Iceberg. Whenever possible, Orca preferred using Athena over Apache Spark for data processing. This preference was driven by the simplicity and serverless architecture of Athena, which led to reduced costs and easier usage, unlike Spark, which typically required provisioning and managing a dedicated cluster at higher costs.

In addition, Orca used Athena as part of its model training and as the primary engine for ad hoc exploratory queries conducted by data scientists, business analysts, and engineers. However, for maintaining Iceberg tables and updating table properties, Apache Spark remained the more scalable and feature-rich option.

Maintaining Apache Iceberg tables

Ensuring optimal query performance and minimizing storage overhead became a significant challenge as the data lake grew to a petabyte scale. To address this challenge, Apache Iceberg offers several maintenance procedures, such as the following:

  • Data files compaction – This operation, as mentioned earlier, involves combining smaller files into larger ones and reorganizing the data within them. This operation not only reduces the number of files but also enables data sorting based on different columns or clustering similar data using z-ordering. Using Apache Iceberg’s compaction results in significant performance improvements, especially for large tables, making a noticeable difference in query performance between compacted and uncompacted data.
  • Expiring old snapshots – This operation provides a way to remove outdated snapshots and their associated data files, enabling Orca to maintain low storage costs.

Running these maintenance procedures efficiently and cost-effectively using Apache Spark, particularly the compaction operation, which operates on terabytes of data daily, requires careful consideration. This entails appropriately sizing the Spark cluster running on EMR and adjusting various settings such as CPU and memory.

In addition, using Apache Iceberg’s metadata tables proved to be very helpful in identifying issues related to the physical layout of Iceberg’s tables, which can directly impact query performance. Metadata tables offer insights into the physical data storage layout of the tables and offer the convenience of querying them with Athena version 3. By accessing the metadata tables, crucial information about tables’ data files, manifests, history, partitions, snapshots, and more can be obtained, which aids in understanding and optimizing the table’s data layout.

For instance, the following queries can uncover valuable information about the underlying data:

  • The number of files and their average size per partition:
    >SELECT partition, file_count, (total_size / file_count) AS avg_file_size FROM "db"."table$partitions"

  • The number of data files pointed to by each manifest:
    SELECT path, added_data_files_count + existing_data_files_count AS number_of_data_files FROM "db"."table$manifests"

  • Information about the data files:
    SELECT file_path, file_size_in_bytes FROM "db"."table$files"

  • Information related to data completeness:
    SELECT record_count, partition FROM "db"."table$partitions"

Managing data retention

Effective management of data retention in a petabyte-scale data lake is crucial to ensure low storage costs as well as to comply with GDPR. However, implementing such a process can be challenging when dealing with Iceberg data stored in S3 buckets, because deleting files based on simple S3 lifecycle policies could potentially cause table corruption. This is because Iceberg’s data files are referenced in manifest files, so any changes to data files must also be reflected in the manifests.

To address this challenge, certain considerations must be taken into account while handling data retention properly. Apache Iceberg provides two modes for handling deletes, namely copy-on-write (CoW), and merge-on-read (MoR). In CoW mode, Iceberg rewrites data files at the time of deletion and creates new data files, whereas in MoR mode, instead of rewriting the data files, a delete file is written that lists the position of deleted records in files. These files are then reconciled with the remaining data during read time.

In favor of faster read times, CoW mode is preferable and when used in conjunction with the expiring old snapshots operation, it allows for the hard deletion of data files that have exceeded the set retention period.

In addition, by storing the data sorted based on the field that will be utilized for deletion (for example, organizationID), it’s possible to reduce the number of files that require rewriting. This optimization significantly enhances the efficiency of the deletion process, resulting in improved deletion times.

Monitoring the data lake infrastructure and operations

Managing a data lake infrastructure is challenging due to the various components it encompasses, including those responsible for data ingestion, storage, processing, and querying.

Effective monitoring of all these components involves tracking resource utilization, data ingestion rates, query runtimes, and various other performance-related metrics, and is essential for maintaining optimal performance and detecting issues as soon as possible.

Monitoring Amazon EMR was crucial because it played a vital role in the system for data ingestion, processing, and maintenance. Orca monitored the cluster status and resource usage of Amazon EMR by utilizing the available metrics through Amazon CloudWatch. Furthermore, it used JMX Exporter and Prometheus to scrape specific Apache Spark metrics and create custom metrics to further improve the pipelines’ observability.

Another challenge emerged when attempting to further monitor the ingestion progress through Kafka lag. Although Kafka lag tracking is the standard method for monitoring ingestion progress, it posed a challenge because Spark Structured Streaming manages its offsets internally and doesn’t commit them back to Kafka. To overcome this, Orca utilized the progress of the Spark Structured Streaming Query Listener (StreamingQueryListener) to monitor the processed offsets, which were then committed to a dedicated Kafka consumer group for lag monitoring.

In addition, to ensure optimal query performance and identify potential performance issues, it was essential to monitor Athena queries. Orca addressed this by using key metrics from Athena and the AWS SDK for Pandas, specifically TotalExecutionTime and ProcessedBytes. These metrics helped identify any degradation in query performance and keep track of costs, which were based on the size of the data scanned.

Mitigating data quality issues

Apache Iceberg’s capabilities and overall architecture played a key role in mitigating data quality challenges.

One of the ways Apache Iceberg addresses these challenges is through its schema evolution capability, which enables users to modify or add columns to a table’s schema without rewriting the entire data. This feature prevents data quality issues that may arise due to schema changes, because the table’s schema is managed as part of the manifest files, ensuring safe changes.

Furthermore, Apache Iceberg’s time travel feature provides the ability to review a table’s history and roll back to a previous snapshot. This functionality has proven to be extremely useful in identifying potential data quality issues and swiftly resolving them by reverting to a previous state with known data integrity.

These robust capabilities ensure that data within the data lake remains accurate, consistent, and reliable.


Data lakes are an essential part of a modern data architecture, and now it’s easier than ever to create a robust, transactional, cost-effective, and high-performant data lake by using Apache Iceberg, Amazon S3, and AWS Analytics services such as Amazon EMR and Athena.

Since building the data lake, Orca has observed significant improvements. The data lake infrastructure has allowed Orca’s platform to have seamless scalability while reducing the cost of running its data pipelines by over 50% utilizing Amazon EMR. Additionally, query costs were reduced by more than 50% using the efficient querying capabilities of Apache Iceberg and Athena version 3.

Most importantly, the data lake has made a profound impact on Orca’s platform and continues to play a key role in its success, supporting new use cases such as change data capture (CDC) and others, and enabling the development of cutting-edge cloud security solutions.

If Orca’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how this solution can address them.
  • Reach out to experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. This hands-on experience will provide valuable insights into the complexities of a transactional data lake.

Embarking on a journey to a transactional data lake using Amazon S3, Apache Iceberg, and AWS Analytics can vastly improve your organization’s data infrastructure, enabling advanced analytics and machine learning, and unlocking insights that drive innovation.

About the Authors

Eliad Gat is a Big Data & AI/ML Architect at Orca Security. He has over 15 years of experience designing and building large-scale cloud-native distributed systems, specializing in big data, analytics, AI, and machine learning.

Oded Lifshiz is a Principal Software Engineer at Orca Security. He enjoys combining his passion for delivering innovative, data-driven solutions with his expertise in designing and building large-scale machine learning pipelines.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan also leads the Apache Iceberg Israel community.

Carlos Rodrigues is a Big Data Specialist Solutions Architect at Amazon Web Services. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Hudi and Apache Iceberg.

Sofia Zilberman is a Sr. Analytics Specialist Solutions Architect at Amazon Web Services. She has a track record of 15 years of creating large-scale, distributed processing systems. She remains passionate about big data technologies and architecture trends, and is constantly on the lookout for functional and technological innovations.

Build and share a business capability model with Amazon QuickSight

Post Syndicated from Abdul Qadir original https://aws.amazon.com/blogs/big-data/build-and-share-a-business-capability-model-with-amazon-quicksight/

The technology landscape has been evolving rapidly, with waves of change impacting IT from every angle. It is causing a ripple effect across IT organizations and shifting the way IT delivers applications and services.

The change factors impacting IT organizations include:

  • The shift from a traditional application model to a services-based application model (SaaS, PaaS)
  • The shift from a traditional infrastructure and hardware costing model to cloud-based containers (private and public clouds) with metered usage for resources (IaaS)
  • The shift from the lengthy traditional development and delivery cycles to continuous development and integration (DevOps)
  • The shift in application architecture from N-Tier to loosely coupled services

The portfolio of services delivering business capabilities are the new assets of IT organizations that need to be cataloged in a repository. The system must follow a well-defined business taxonomy that enhances discovery, analysis, and reuse by potential consumers, and avoids building redundant services. The traditional portfolio management tools within the organization need to be augmented with additional components that can manage the complexity of the services ecosystem.

This post provides a simple and quick way of building an extendable analytical system using Amazon QuickSight to better manage lines of business (LOBs) with a detailed list of business capabilities and APIs, deep analytical insights, and desired graphical visualizations from different dimensions. In addition, this tool enhances the discovery and reuse of existing business capabilities, avoids duplication of services, and shortens time-to-market.

Use case overview

Bob is a Senior Enterprise Architect. He recently joined a Tier 1 bank. His first assignment is to assess the bank’s capabilities to offer new financial products to its high-value retail clients. The only document given to Bob was PowerPoint slides and the names of the head of each department to get more information. The PowerPoint presentation provided high-level information, but it didn’t give an insight into how capable each department is to provide the required data through APIs for the new products. To collect that information, Bob gets in touch with the head of each department, who in turn refer him to their development leads, who in turn give him a bunch of technical documents that explain how APIs are being used.


Business analysts are familiar with business terminology and taxonomy, and often depend on the technology team to explain the technical assets associated with business capabilities. The business capabilities are the assets of the IT organization that need to be cataloged in a repository. The catalog must follow a well-defined business taxonomy that enhances discovery and reuse by consumers, and avoids building redundant services.

The better organized the catalog is, the higher the potential for reuse and the return on investment for the services transformation strategy. The catalog needs to be organized using some business functions taxonomy with a detailed list of capabilities and sub-capabilities. The following diagram illustrates an example of services information and interdependencies.

Example of services information and interdependencies

Defining and capturing a business capability model

If an enterprise doesn’t have a system to capture the business capability model, consider defining and finding a way to capture the model for better insight and visibility, and then map it with digital assets like APIs. The model should be able to showcase to LOBs their categories and capabilities. The following table includes some sample LOBs and their associations for a business that sells the services.





Manage Applicant Experience

Manage Application Activities

Process Application


Pursue Automated Leads

Sale Service

Engage Customer

Provide Needs Assessment Tools

Provide Service Information

After the map is defined and captured, each business capability can be mapped to APIs that are implemented for it. Each business capability then has visibility into all the associated digital assets and mapped metadata of the services, such as consumers of the API.

To capture the model, you can define a simple table to capture the information, and then you can perform further analysis on it with an analytical tool such as QuickSight.

In the following sample data model, each business LOB has several business categories and capabilities, and each capability can be mapped to multiple APIs. Also note that there’s not always a 1:1 mapping between a business capability, an API, and a service.

  • Business LOB – Recruitment, Sale Service
  • Business category – Process Application, Engage Customer
  • Business capabilities – Complete an Application, Follow-Ups
  • Digital assets – Recruitment API, Sale Service API

There are sets of other standard information that you can include in a data model, such as API consumers.

The following example shows a table structure to capture this information.

LOB table structure

The following figure visualizes the business capabilities and associated APIs.

Visualization of business capabilities and associated APIs

The remainder of the post highlights the key components to build the full solution end to end. The UI captures the business capabilities and associated APIs, and publishes the service information through a DevOps process. The solution also includes storage and a reporting tool that complement the applications portfolio management capability in place and expand its capabilities with the services portfolio.

Aligning APIs to a business capability model

To align APIs to a business capability model, you can follow these steps:

  1. Understand the business capabilities – Identify the key business capabilities of your organization and understand how they support the overall business strategy.
  2. Map the APIs to the capabilities – Review the existing APIs and map them to the corresponding business capabilities. This will help identify any gaps in the capabilities that can be addressed through new or updated APIs.
  3. Prioritize the APIs – Prioritize the development of new or updated APIs based on their importance to the business capabilities. This will ensure that the most critical capabilities are supported by the APIs.
  4. Implement governance – Implement a governance process to ensure that the APIs are aligned with the business capabilities and are used correctly. This can include setting standards for how the APIs are designed, developed, and deployed.
  5. Monitor and measure – Monitor the usage and performance of the APIs to measure their impact on the business capabilities. Use this information to make decisions about changes to the APIs over time.
  6. Regularly review and update – Review and update the mapping of the APIs to the business capabilities on a regular basis to ensure they remain aligned with the organization’s goals and objectives.

Maintenance and evolution of a business capability model

Building a business capability model is not a one-time exercise. It keeps evolving with business requirements and usage. Data management best practices should be followed as per your company’s guidelines to have consistent data end to end.

Solution overview

In this section, we introduce the ability to capture the business capabilities and associated APIs and make them available using the QuickSight business intelligence (BI) tool, and highlight its features.

The following approach provides the ability to manage business capability models and enable them to link business capabilities with enterprise digital assets, including services, APIs, and IT systems. This solution enables IT and business teams to further drill down into the model to see what has been implemented. These details provide value to architects and analysts to assess which services can be combined to provide new offerings and shorten time-to-market, enable reusability by consumers, and avoid building redundant services.

The following key components are required:

Organizations can use their existing UI framework (if available) to capture the information, or they can use one of the open-source services available in the market. Depending on the selection and capability of the open-source product, a user interface can be generated and customized.

Let’s look at each service in our solution in more detail:

  • Amplify – Amplify is a set of tools and services that can be used together or on their own, to help front-end web and mobile developers build scalable full stack applications, powered by AWS. With Amplify, you can configure app backends and connect your app in minutes, deploy static web apps in a few clicks, and easily manage app content outside the AWS Management Console. Amplify supports popular web frameworks including JavaScript, React, Angular, Vue, and Next.js, and mobile platforms including Android, iOS, React Native, Ionic, and Flutter. Get to market faster with AWS Amplify.
  • AppSync – AWS AppSync simplifies application development by creating a universal API for securely accessing, modifying, and combining data from multiple sources. AWS AppSync is a managed service that uses GraphQL so that applications can easily get only the data they need.
  • Athena – Athena is an interactive query service that makes it easy to analyze data directly in Amazon Simple Storage Service (Amazon S3) using standard SQL. In this solution, we use Athena as a data source for QuickSight.
  • Amazon Cognito – Amazon Cognito delivers frictionless customer identity and access management (CIAM) with a cost-effective and customizable platform. It easily connects the web application to the backend resources and web services.
  • DynamoDB – DynamoDB is a fully managed, serverless, key-value NoSQL database designed to run high-performance applications at any scale. DynamoDB offers built-in security, continuous backups, automated multi-Region replication, in-memory caching, and data import and export tools.
  • QuickSight – QuickSight is a serverless, cloud-based BI and reporting service that brings data insights to your teams and end-users through machine learning (ML)-powered dashboards and data visualizations, which can be accessed via QuickSight or embedded in apps and portals that your users access.

The following diagram illustrates the solution architecture.

Business capabilities insights solution architecture

In the following sections, we walk through the implementation and end-to-end integration steps.

Build a serverless web application with Amplify

The open-source Amplify provides a CLI, libraries, UI components and Amplify hosting to build full stack iOS, Android, Flutter, Web, and React Native apps. For instructions on building a serverless web application, refer to the following tutorial. For this post, we created the following GraphQL schema with amplify add api:

type BusinessCapability @model {
  company_id: ID!
  company_name: String!
  company_desc: String!
  lob_name: String!
  categoray: String!
  capability: String!
  digital_asset_type: String!
  digital_asset_name: String!
  digital_asset_info: String!

After we use Amplify to deploy the API in the cloud, a corresponding AppSync API and a DynamoDB table are created automatically.

You can use the Amplify UI library to generate a business capability intake form and bind the fields to your front-end code.

Amplify studio generated form

You can add authentication to your application using Amazon Cognito by running amplify add auth.

With that, you are now hosting a serverless web application for your business capabilities securely and at scale.

Set up Athena and the Athena DynamoDB data connector

The DynamoDB table generated by Amplify stores all the business capabilities. You can set up Athena and the Athena DynamoDB data connector so that you can query your tables with SQL. For more information, refer to Amazon Athena DynamoDB connector.

Enable QuickSight

Enable QuickSight in your AWS account and create the datasets. The source dataset is the Athena database and table that you created earlier. To connect, you need to allow access to query Athena and Amazon S3 via the admin user interface in QuickSight. Refer to accessing AWS resources for access requirements.

Sample reports

When all the components are up and running, you can design analyses and generate reports. For more information about gathering insights from the captured data, refer to Tutorial: Create an Amazon QuickSight analysis. You can export reports in PDF, and share analyses and reports with other users. The following screenshots are reports that reflects the relationship among LOBs, business capabilities, and APIs.

The first screenshot visualizes the capabilities and associated APIs. This enables the user to identify a set of APIs, and use the same API in new similar business functions.

Business Capability Visualization 1

The following screenshot visualizes LOBs, category, and capabilities. This enables the user to easily gain insights on these relationships.

Business Capabilities Visualization 2

Best practices

The following are some best practices for business capability modeling:

  • Define clear and measurable capabilities – Each capability should be defined in a way that is clear and measurable, so that it can be tracked and improved over time.
  • Involve key stakeholders – Involve key stakeholders in the modeling process to ensure that the capabilities accurately reflect the needs of the organization.
  • Use a consistent framework – Use a consistent framework to ensure that capabilities are defined and organized in a way that makes sense for the organization.
  • Regularly review and update – Review and update the capabilities regularly to ensure they remain relevant and aligned with the organization’s goals and objectives.
  • Use visual representations – Use visual representations, like diagrams or models, to help stakeholders understand and communicate the capabilities.
  • Implement a governance process – Implement a governance process to ensure that the capabilities are being used correctly and to make decisions about changes to the capabilities over time.


In this post, you learned how to build a system to manage a business capability model, and discover and visualize the results in QuickSight.

We hope that companies can use this solution to manage their enterprise capability model and enable users to explore business functions available for them to use within the organization. Business users and technical architects can now easily discover business capabilities and APIs, helping accelerate the creation and orchestration of new features. With the QuickSight web interface, you can filter through thousands of business capabilities, analyze the data for your business needs, and understand the technical requirements and how to combine existing technical capabilities into a new business capability.

Furthermore, you can use your data source to gain further insights from your data by setting up ML Insights in QuickSight and create graphical representations of your data using QuickSight visuals.

To learn more about how you can create, schedule, and share reports and data exports, see Amazon QuickSight Paginated Reports.

About the authors

Abdul Qadir is an AWS Solutions Architect based in New Jersey. He works with independent software vendors in the Northeast and provides customer guidance to build well-architected solutions on the AWS cloud platform.

Sharon Li is a solutions architect at AWS, based in the Boston, MA area. She works with enterprise customers, helping them solve difficult problems and build on AWS. Outside of work, she likes to spend time with her family and explore local restaurants.

How AWS helped Altron Group accelerate their vision for optimized customer engagement

Post Syndicated from Jason Yung original https://aws.amazon.com/blogs/big-data/how-aws-helped-altron-group-accelerate-their-vision-for-optimized-customer-engagement/

This is a guest post co-authored by Jacques Steyn, Senior Manager Professional Services at Altron Group.

Altron is a pioneer of providing data-driven solutions for their customers by combining technical expertise with in-depth customer understanding to provide highly differentiated technology solutions. Alongside their partner AWS, they participated in AWS Data-Driven Everything (D2E) workshops and a bespoke AWS Immersion Day workshop that catered to their needs to improve their engagement with their customers.

This post discusses the journey that took Altron from their initial goals, to technical implementation, to the business value created from understanding their customers and their unique opportunities better. They were able to think big but start small with a working solution involving rich business intelligence (BI) and insights provided to their key business areas.

Data-Driven Everything engagement

Altron has provided information technology services since 1965 across South Africa, the Middle East, and Australia. Although the group saw strong results at 2022-year end, the region continues to experience challenging operating conditions with global supply chains disrupted, electronic component shortages, and scarcity of IT talent.

To reflect the needs of their customers spread across different geographies and industries, Altron has organized their operating model across individual Operating Companies (OpCos). These are run autonomously with different sales teams, creating siloed operations and engagement with customers and making it difficult to have a holistic and unified sales motion.

To succeed further, their vision of data requires it to be accessible and actionable to all, with key roles and responsibilities defined by those who produce and consume data, as shown in the following figure. This allows for transparency, speed to action, and collaboration across the group while enabling the platform team to evangelize the use of data:

Altron engaged with AWS to seek advice on their data strategy and cloud modernization to bring their vision to fruition. The D2E program was selected to help identify the best way to think big but start small by working collaboratively to ideate on the opportunities to build data as a product, particularly focused on federating customer profile data in an agile and scalable approach.

Amazon mechanisms such as Working Backwards were employed to devise the most delightful and meaningful solution and put customers at the heart of the experience. The workshop helped devise the think big solution that starting with the Systems Integration (SI) OpCo as the first flywheel turn would be the best way to start small and prototype the initial data foundation collaboratively with AWS Solutions Architects.

Preparing for an AWS Immersion Day workshop

The typical preparation of an AWS Immersion Day involves identifying examples of common use case patterns and utilizing demonstration data. To maximize its success, the Immersion Day was stretched across multiple days as a hands-on workshop to enable Altron to bring their own data, build a robust data pipeline, and scale their long-term architecture. In addition, AWS and Altron identified and resolved any external dependencies, such as network connectivity to data sources and targets, where Altron was able to put the connectivity to the sources in place.

Identifying key use cases

After a number of preparation meetings to discuss business and technical aspects of the use case, AWS and Altron identified two uses cases to resolve their two business challenges:

  • Business intelligence for business-to-business accounts – Altron wanted to focus on their business-to-business (B2B) accounts and customer data. In particular, they wanted to enable their account managers, sales executives, and analysts to use actual data and facts to get a 360 view of their accounts.
    • Goals – Grow revenue, increase the conversion ratio of opportunities, reduce the average sales cycle, improve the customer renewal rate.
    • Target – Dashboards to be refreshed on a daily basis that would provide insights on sales, gross profit, sales pipelines, and customers.
  • Data quality for account and customer data – Altron wanted to enable data quality and data governance best practices.
    • Goals – Lay the foundation for a data platform that can be used in the future by internal and external stakeholders.

Conducting the Immersion Day workshop

Altron set aside 4 days for their Immersion Day, during which time AWS had assigned a dedicated Solutions Architect to work alongside them to build the following prototype architecture:

This solution includes the following components:

  1. AWS Glue is a serverless data integration service that makes it simple to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning, and application development. The Altron team created an AWS Glue crawler and configured it to run against Azure SQL to discover its tables. The AWS Glue crawler populates the table definition with its schema in AWS Glue Data Catalog.
  2. This step consists of two components:
    1. A set of AWS Glue PySpark jobs reads the source tables from Azure SQL and outputs the resulting data in the Amazon Simple Storage Service “Raw Zone”. Basic formatting and readability of the data is standardized here. The jobs run on a scheduled basis, according to the upstream data availability (which currently is daily).
    2. Users are able to manually upload reference files (CSV and Excel format) via the Amazon Web Services console directly to the Amazon S3 buckets. Depending on the frequency of upload, the Altron team will consider automated mechanisms and remove manual upload.
  3. The reporting zone is based on a set of Amazon Athena views, which are consumed for BI purposes. The Altron team used Athena to explore the source tables and create the views in SQL language. Depending on the needs, the Altron team will materialize these views or create corresponding AWS Glue jobs.
  4. Athena exposes the content of the reporting zone for consumption.
  5. The content of the reporting zone is ingested via SPICE in Amazon QuickSight. BI users create dashboards and reports in QuickSight. Business users can access QuickSight dashboards from their mobile, thanks to the QuickSight native application, configured to use single sign-on (SSO).
  6. An AWS Step Functions state machine orchestrates the run of the AWS Glue jobs. The Altron team will expand the state machine to include automated refresh of QuickSight SPICE datasets.
  7. To verify the data quality of the sources through statistically-relevant metrics, AWS Glue Data Quality runs data quality tasks on relevant AWS Glue tables. This can be run manually or scheduled via Amazon Eventbridge (Optional).

Generating business outcomes

In 4 days, the Altron SI team left the Immersion Day workshop with the following:

  • A data pipeline ingesting data from 21 sources (SQL tables and files) and combining them into three mastered and harmonized views that are cataloged for Altron’s B2B accounts.
  • A set of QuickSight dashboards to be consumed via browser and mobile.
  • Foundations for a data lake with data governance controls and data quality checks. The datasets used for the workshop originate from different systems; by integrating the datasets during the workshop implementation, the Altron team can have a comprehensive overview of their customers.

Altron’s sales teams are now able to quickly refresh dashboards encompassing previously disparate datasets that are now centralized to get insights about sales pipelines and forecasts on their desktop or mobile. The technical teams are equally adept at adjusting to business needs by autonomously onboarding new data sources and further enriching the user experience and trust in the data.


In this post, we walked you through the journey the Altron team took with AWS. The outcomes to identify the opportunities that were most pressing to Altron, applying a working backward approach and coming up with a best-fit architecture, led to the subsequent AWS Immersion Day to implement a working prototype that helped them become more data-driven.

With their new focus on AWS skills and mechanisms, increasing trust in their internal data, and understanding the importance of driving change in data literacy and mindset, Altron is better set up for success to best serve their customers in the region.

To find out more about how Altron and AWS can help work backward on your data strategy and employ the agile methodologies discussed in this post, check out Data Management. To learn more about how can help you turn your ideas into solutions, visit the D2E website and the series of AWS Immersion Days that you can choose from. For more hands-on bespoke options, contact your AWS Account Manager, who can provide more details.

Special thanks to everyone at Altron Group who helped contribute to the success of the D2E and Build Lab workshops:

  • The Analysts (Liesl Kok, Carmen Kotze)
  • Data Engineers (Banele Ngemntu, James Owen, Andrew Corry, Thembelani Mdlankomo)
  • QuickSight BI Developers (Ricardo De Gavino Dias, Simei Antoniades)
  • Cloud Administrator (Shamiel Galant)

About the authors

Jacques Steyn runs the Altron Data Analytics Professional Services. He has been leading the building of data warehouses and analytic solutions for the past 20 years. In his free time, he spends time with his family, whether it be on the golf , walking in the mountains, or camping in South Africa, Botswana, and Namibia.

Jason Yung is a Principal Analytics Specialist with Amazon Web Services. Working with Senior Executives across the Europe and Asia-Pacific Regions, he helps customers become data-driven by understanding their use cases and articulating business value through Amazon mechanisms. In his free time, he spends time looking after a very active 1-year-old daughter, alongside juggling geeky activities with respectable hobbies such as cooking sub-par food.

Michele Lamarca is a Senior Solutions Architect with Amazon Web Services. He helps architect and run Solutions Accelerators in Europe to enable customers to become hands-on with AWS services and build prototypes quickly to release the value of data in the organization. In his free time, he reads books and tries (hopelessly) to improve his jazz piano skills.

Hamza is a Specialist Solutions Architect with Amazon Web Services. He runs Solutions Accelerators in EMEA regions to help customers accelerate their journey to move from an idea into a solution in production. In his free time, he spends time with his family, meets with friends, swims in the municipal swimming pool, and learns new skills.

Reduce archive cost with serverless data archiving

Post Syndicated from Rostislav Markov original https://aws.amazon.com/blogs/architecture/reduce-archive-cost-with-serverless-data-archiving/

For regulatory reasons, decommissioning core business systems in financial services and insurance (FSI) markets requires data to remain accessible years after the application is retired. Traditionally, FSI companies either outsourced data archiving to third-party service providers, which maintained application replicas, or purchased vendor software to query and visualize archival data.

In this blog post, we present a more cost-efficient option with serverless data archiving on Amazon Web Services (AWS). In our experience, you can build your own cloud-native solution on Amazon Simple Storage Service (Amazon S3) at one-fifth of the price of third-party alternatives. If you are retiring legacy core business systems, consider serverless data archiving for cost-savings while keeping regulatory compliance.

Serverless data archiving and retrieval

Modern archiving solutions follow the principles of modern applications:

  • Serverless-first development, to reduce management overhead.
  • Cloud-native, to leverage native capabilities of AWS services, such as backup or disaster recovery, to avoid custom build.
  • Consumption-based pricing, since data archival is consumed irregularly.
  • Speed of delivery, as both implementation and archive operations need to be performed quickly to fulfill regulatory compliance.
  • Flexible data retention policies can be enforced in an automated manner.

AWS Storage and Analytics services offer the necessary building blocks for a modern serverless archiving and retrieval solution.

Data archiving can be implemented on top of Amazon S3) and AWS Glue.

  1. Amazon S3 storage tiers enable different data retention policies and retrieval service level agreements (SLAs). You can migrate data to Amazon S3 using AWS Database Migration Service; otherwise, consider another data transfer service, such as AWS DataSync or AWS Snowball.
  2. AWS Glue crawlers automatically infer both database and table schemas from your data in Amazon S3 and store the associated metadata in the AWS Glue Data Catalog.
  3. Amazon CloudWatch monitors the execution of AWS Glue crawlers and notifies of failures.

Figure 1 provides an overview of the solution.

Serverless data archiving and retrieval

Figure 1. Serverless data archiving and retrieval

Once the archival data is catalogued, Amazon Athena can be used for serverless data query operations using standard SQL.

  1. Amazon API Gateway receives the data retrieval requests and eases integration with other systems via REST, HTTPS, or WebSocket.
  2. AWS Lambda reads parametrization data/templates from Amazon S3 in order to construct the SQL queries. Alternatively, query templates can be stored as key-value entries in a NoSQL store, such as Amazon DynamoDB.
  3. Lambda functions trigger Athena with the constructed SQL query.
  4. Athena uses the AWS Glue Data Catalog to retrieve table metadata for the Amazon S3 (archival) data and to return the SQL query results.

How we built serverless data archiving

An early build-or-buy assessment compared vendor products with a custom-built solution using Amazon S3, AWS Glue, and a user frontend for data retrieval and visualization.

The total cost of ownership over a 10-year period for one insurance core system (Policy Admin System) was $0.25M to build and run the custom solution on AWS compared with >$1.1M for third-party alternatives. The implementation cost advantage of the custom-built solution was due to development efficiencies using AWS services. The lower run cost resulted from a decreased frequency of archival usage and paying only for what you use.

The data archiving solution was implemented with AWS services (Figure 2):

  1. Amazon S3 is used to persist archival data in Parquet format (optimized for analytics and compressed to reduce storage space) that is loaded from the legacy insurance core system. The archival data source was AS400/DB2 and moved with Informatica Cloud to Amazon S3.
  2. AWS Glue crawlers infer the database schema from objects in Amazon S3 and create tables in AWS Glue for the decommissioned application data.
  3. Lambda functions (Python) remove data records based on retention policies configured for each domain, such as customers, policies, claims, and receipts. A daily job (Control-M) initiates the retention process.
Exemplary implementation of serverless data archiving and retrieval for insurance core system

Figure 2. Exemplary implementation of serverless data archiving and retrieval for insurance core system

Retrieval operations are formulated and executed via Python functions in Lambda. The following AWS resources implement the retrieval logic:

  1. Athena is used to run SQL queries over the AWS Glue tables for the decommissioned application.
  2. Lambda functions (Python) build and execute queries for data retrieval. The functions render HMTL snippets using Jinja templating engine and Athena query results, returning the selected template filled with the requested archive data. Using Jinja as templating engine improved the speed of delivery and reduced the heavy lifting of frontend and backend changes when modeling retrieval operations by ~30% due to the decoupling between application layers. As a result, engineers only need to build an Athena query with the linked Jinja template.
  3. Amazon S3 stores templating configuration and queries (JSON files) used for query parametrization.
  4. Amazon API Gateway serves as single point of entry for API calls.

The user frontend for data retrieval and visualization is implemented as web application using React JavaScript library (with static content on Amazon S3) and Amazon CloudFront used for web content delivery.

The archiving solution enabled 80 use cases with 60 queries and reduced storage from three terabytes on source to only 35 gigabytes on Amazon S3. The success of the implementation depended on the following key factors:

  • Appropriate sponsorship from business across all areas (claims, actuarial, compliance, etc.)
  • Definition of SLAs for responding to courts, regulators, etc.
  • Minimum viable and mandatory approach
  • Prototype visualizations early on (fail fast)


Traditionally, FSI companies relied on vendor products for data archiving. In this post, we explored how to build a scalable solution on Amazon S3 and discussed key implementation considerations. We have demonstrated that AWS services enable FSI companies to build a serverless archiving solution while reaching and keeping regulatory compliance at a lower cost.

Learn more about some of the AWS services covered in this blog:

Three ways to accelerate incident response in the cloud: insights from re:Inforce 2023

Post Syndicated from Anne Grahn original https://aws.amazon.com/blogs/security/three-ways-to-accelerate-incident-response-in-the-cloud-insights-from-reinforce-2023/

AWS re:Inforce took place in Anaheim, California, on June 13–14, 2023. AWS customers, partners, and industry peers participated in hundreds of technical and non-technical security-focused sessions across six tracks, an Expo featuring AWS experts and AWS Security Competency Partners, and keynote and leadership sessions.

The threat detection and incident response track showcased how AWS customers can get the visibility they need to help improve their security posture, identify issues before they impact business, and investigate and respond quickly to security incidents across their environment.

With dozens of service and feature announcements—and innumerable best practices shared by AWS experts, customers, and partners—distilling highlights is a challenge. From an incident response perspective, three key themes emerged.

Proactively detect, contextualize, and visualize security events

When it comes to effectively responding to security events, rapid detection is key. Among the launches announced during the keynote was the expansion of Amazon Detective finding groups to include Amazon Inspector findings in addition to Amazon GuardDuty findings.

Detective, GuardDuty, and Inspector are part of a broad set of fully managed AWS security services that help you identify potential security risks, so that you can respond quickly and confidently.

Using machine learning, Detective finding groups can help you conduct faster investigations, identify the root cause of events, and map to the MITRE ATT&CK framework to quickly run security issues to ground. The finding group visualization panel shown in the following figure displays findings and entities involved in a finding group. This interactive visualization can help you analyze, understand, and triage the impact of finding groups.

Figure 1: Detective finding groups visualization panel

Figure 1: Detective finding groups visualization panel

With the expanded threat and vulnerability findings announced at re:Inforce, you can prioritize where to focus your time by answering questions such as “was this EC2 instance compromised because of a software vulnerability?” or “did this GuardDuty finding occur because of unintended network exposure?”

In the session Streamline security analysis with Amazon Detective, AWS Principal Product Manager Rich Vorwaller, AWS Senior Security Engineer Rima Tanash, and AWS Program Manager Jordan Kramer demonstrated how to use graph analysis techniques and machine learning in Detective to identify related findings and resources, and investigate them together to accelerate incident analysis.

In addition to Detective, you can also use Amazon Security Lake to contextualize and visualize security events. Security Lake became generally available on May 30, 2023, and several re:Inforce sessions focused on how you can use this new service to assist with investigations and incident response.

As detailed in the following figure, Security Lake automatically centralizes security data from AWS environments, SaaS providers, on-premises environments, and cloud sources into a purpose-built data lake stored in your account. Security Lake makes it simpler to analyze security data, gain a more comprehensive understanding of security across an entire organization, and improve the protection of workloads, applications, and data. Security Lake automates the collection and management of security data from multiple accounts and AWS Regions, so you can use your preferred analytics tools while retaining complete control and ownership over your security data. Security Lake has adopted the Open Cybersecurity Schema Framework (OCSF), an open standard. With OCSF support, the service normalizes and combines security data from AWS and a broad range of enterprise security data sources.

Figure 2: How Security Lake works

Figure 2: How Security Lake works

To date, 57 AWS security partners have announced integrations with Security Lake, and we now have more than 70 third-party sources, 16 analytics subscribers, and 13 service partners.

In Gaining insights from Amazon Security Lake, AWS Principal Solutions Architect Mark Keating and AWS Security Engineering Manager Keith Gilbert detailed how to get the most out of Security Lake. Addressing questions such as, “How do I get access to the data?” and “What tools can I use?,” they demonstrated how analytics services and security information and event management (SIEM) solutions can connect to and use data stored within Security Lake to investigate security events and identify trends across an organization. They emphasized how bringing together logs in multiple formats and normalizing them into a single format empowers security teams to gain valuable context from security data, and more effectively respond to events. Data can be queried with Amazon Athena, or pulled by Amazon OpenSearch Service or your SIEM system directly from Security Lake.

Build your security data lake with Amazon Security Lake featured AWS Product Manager Jonathan Garzon, AWS Product Solutions Architect Ross Warren, and Global CISO of Interpublic Group (IPG) Troy Wilkinson demonstrating how Security Lake helps address common challenges associated with analyzing enterprise security data, and detailing how IPG is using the service. Wilkinson noted that IPG’s objective is to bring security data together in one place, improve searches, and gain insights from their data that they haven’t been able to before.

“With Security Lake, we found that it was super simple to bring data in. Not just the third-party data and Amazon data, but also our on-premises data from custom apps that we built.” — Troy Wilkinson, global CISO, Interpublic Group

Use automation and machine learning to reduce mean time to response

Incident response automation can help free security analysts from repetitive tasks, so they can spend their time identifying and addressing high-priority security issues.

In How LLA reduces incident response time with AWS Systems Manager, telecommunications provider Liberty Latin America (LLA) detailed how they implemented a security framework to detect security issues and automate incident response in more than 180 AWS accounts accessed by internal stakeholders and third-party partners by using AWS Systems Manager Incident Manager, AWS Organizations, Amazon GuardDuty, and AWS Security Hub.

LLA operates in over 20 countries across Latin America and the Caribbean. After completing multiple acquisitions, LLA needed a centralized security operations team to handle incidents and notify the teams responsible for each AWS account. They used GuardDuty, Security Hub, and Systems Manager Incident Manager to automate and streamline detection and response, and they configured the services to initiate alerts whenever there was an issue requiring attention.

Speaking alongside AWS Principal Solutions Architect Jesus Federico and AWS Principal Product Manager Sarah Holberg, LLA Senior Manager of Cloud Services Joaquin Cameselle noted that when GuardDuty identifies a critical issue, it generates a new finding in Security Hub. This finding is then forwarded to Systems Manager Incident Manager through an Amazon EventBridge rule. This configuration helps ensure the involvement of the appropriate individuals associated with each account.

“We have deployed a security framework in Liberty Latin America to identify security issues and streamline incident response across over 180 AWS accounts. The framework that leverages AWS Systems Manager Incident Manager, Amazon GuardDuty, and AWS Security Hub enabled us to detect and respond to incidents with greater efficiency. As a result, we have reduced our reaction time by 90%, ensuring prompt engagement of the appropriate teams for each AWS account and facilitating visibility of issues for the central security team.” — Joaquin Cameselle, senior manager, cloud services, Liberty Latin America

How Citibank (Citi) advanced their containment capabilities through automation outlined how the National Institute of Standards and Technology (NIST) Incident Response framework is applied to AWS services, and highlighted Citi’s implementation of a highly scalable cloud incident response framework designed to support the 28 AWS services in their cloud environment.

After describing the four phases of the incident response process — preparation and prevention; detection and analysis; containment, eradication, and recovery; and post-incident activity—AWS ProServe Global Financial Services Senior Engagement Manager Harikumar Subramonion noted that, to fully benefit from the cloud, you need to embrace automation. Automation benefits the third phase of the incident response process by speeding up containment, and reducing mean time to response.

Citibank Head of Cloud Security Operations Elvis Velez and Vice President of Cloud Security Damien Burks described how Citi built the Cloud Containment Automation Framework (CCAF) from the ground up by using AWS Step Functions and AWS Lambda, enabling them to respond to events 24/7 without human error, and reduce the time it takes to contain resources from 4 hours to 15 minutes. Velez described how Citi uses adversary emulation exercises that use the MITRE ATT&CK Cloud Matrix to simulate realistic attacks on AWS environments, and continuously validate their ability to effectively contain incidents.

Innovate and do more with less

Security operations teams are often understaffed, making it difficult to keep up with alerts. According to data from CyberSeek, there are currently 69 workers available for every 100 cybersecurity job openings.

Effectively evaluating security and compliance posture is critical, despite resource constraints. In Centralizing security at scale with Security Hub and Intuit’s experience, AWS Senior Solutions Architect Craig Simon, AWS Senior Security Hub Product Manager Dora Karali, and Intuit Principal Software Engineer Matt Gravlin discussed how to ease security management with Security Hub. Fortune 500 financial software provider Intuit has approximately 2,000 AWS accounts, 10 million AWS resources, and receives 20 million findings a day from AWS services through Security Hub. Gravlin detailed Intuit’s Automated Compliance Platform (ACP), which combines Security Hub and AWS Config with an internal compliance solution to help Intuit reduce audit timelines, effectively manage remediation, and make compliance more consistent.

“By using Security Hub, we leveraged AWS expertise with their regulatory controls and best practice controls. It helped us keep up to date as new controls are released on a regular basis. We like Security Hub’s aggregation features that consolidate findings from other AWS services and third-party providers. I personally call it the super aggregator. A key component is the Security Hub to Amazon EventBridge integration. This allowed us to stream millions of findings on a daily basis to be inserted into our ACP database.” — Matt Gravlin, principal software engineer, Intuit

At AWS re:Inforce, we launched a new Security Hub capability for automating actions to update findings. You can now use rules to automatically update various fields in findings that match defined criteria. This allows you to automatically suppress findings, update the severity of findings according to organizational policies, change the workflow status of findings, and add notes. With automation rules, Security Hub provides you a simplified way to build automations directly from the Security Hub console and API. This reduces repetitive work for cloud security and DevOps engineers and can reduce mean time to response.

In Continuous innovation in AWS detection and response services, AWS Worldwide Security Specialist Senior Manager Himanshu Verma and GuardDuty Senior Manager Ryan Holland highlighted new features that can help you gain actionable insights that you can use to enhance your overall security posture. After mapping AWS security capabilities to the core functions of the NIST Cybersecurity Framework, Verma and Holland provided an overview of AWS threat detection and response services that included a technical demonstration.

Bolstering incident response with AWS Wickr enterprise integrations highlighted how incident responders can collaborate securely during a security event, even on a compromised network. AWS Senior Security Specialist Solutions Architect Wes Wood demonstrated an innovative approach to incident response communications by detailing how you can integrate the end-to-end encrypted collaboration service AWS Wickr Enterprise with GuardDuty and AWS WAF. Using Wickr Bots, you can build integrated workflows that incorporate GuardDuty and third-party findings into a more secure, out-of-band communication channel for dedicated teams.

Evolve your incident response maturity

AWS re:Inforce featured many more highlights on incident response, including How to run security incident response in your Amazon EKS environment and Investigating incidents with Amazon Security Lake and Jupyter notebooks code talks, as well as the announcement of our Cyber Insurance Partners program. Content presented throughout the conference made one thing clear: AWS is working harder than ever to help you gain the insights that you need to strengthen your organization’s security posture, and accelerate incident response in the cloud.

To watch AWS re:Inforce sessions on demand, see the AWS re:Inforce playlists on YouTube.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Anne Grahn

Anne Grahn

Anne is a Senior Worldwide Security GTM Specialist at AWS based in Chicago. She has more than a decade of experience in the security industry, and focuses on effectively communicating cybersecurity risk. She maintains a Certified Information Systems Security Professional (CISSP) certification.


Himanshu Verma

Himanshu is a Worldwide Specialist for AWS Security Services. In this role, he leads the go-to-market creation and execution for AWS Security Services, field enablement, and strategic customer advisement. Prior to AWS, he held several leadership roles in Product Management, engineering and development, working on various identity, information security, and data protection technologies. He obsesses brainstorming disruptive ideas, venturing outdoors, photography, and trying various “hole in the wall” food and drinking establishments around the globe.

Jesus Federico

Jesus Federico

Jesus is a Principal Solutions Architect for AWS in the telecommunications vertical, working to provide guidance and technical assistance to communication service providers on their cloud journey. He supports CSPs in designing and implementing secure, resilient, scalable, and high-performance applications in the cloud.

Centralize near-real-time governance through alerts on Amazon Redshift data warehouses for sensitive queries

Post Syndicated from Rajdip Chaudhuri original https://aws.amazon.com/blogs/big-data/centralize-near-real-time-governance-through-alerts-on-amazon-redshift-data-warehouses-for-sensitive-queries/

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud that delivers powerful and secure insights on all your data with the best price-performance. With Amazon Redshift, you can analyze your data to derive holistic insights about your business and your customers. In many organizations, one or multiple Amazon Redshift data warehouses run daily for data and analytics purposes. Therefore, over time, multiple Data Definition Language (DDL) or Data Control Language (DCL) queries, such as CREATE, ALTER, DROP, GRANT, or REVOKE SQL queries, are run on the Amazon Redshift data warehouse, which are sensitive in nature because they could lead to dropping tables or deleting data, causing disruptions or outages. Tracking such user queries as part of the centralized governance of the data warehouse helps stakeholders understand potential risks and take prompt action to mitigate them following the operational excellence pillar of the AWS Data Analytics Lens. Therefore, for a robust governance mechanism, it’s crucial to alert or notify the database and security administrators on the kind of sensitive queries that are run on the data warehouse, so that prompt remediation actions can be taken if needed.

To address this, in this post we show you how you can automate near-real-time notifications over a Slack channel when certain queries are run on the data warehouse. We also create a simple governance dashboard using a combination of Amazon DynamoDB, Amazon Athena, and Amazon QuickSight.

Solution overview

An Amazon Redshift data warehouse logs information about connections and user activities taking place in databases, which helps monitor the database for security and troubleshooting purposes. These logs can be stored in Amazon Simple Storage Service (Amazon S3) buckets or Amazon CloudWatch. Amazon Redshift logs information in the following log files, and this solution is based on using an Amazon Redshift audit log to CloudWatch as a destination:

  • Connection log – Logs authentication attempts, connections, and disconnections
  • User log – Logs information about changes to database user definitions
  • User activity log – Logs each query before it’s run on the database

The following diagram illustrates the solution architecture.

Solution Architecture

The solution workflow consists of the following steps:

  1. Audit logging is enabled in each Amazon Redshift data warehouse to capture the user activity log in CloudWatch.
  2. Subscription filters on CloudWatch capture the required DDL and DCL commands by providing filter criteria.
  3. The subscription filter triggers an AWS Lambda function for pattern matching.
  4. The Lambda function processes the event data and sends the notification over a Slack channel using a webhook.
  5. The Lambda function stores the data in a DynamoDB table over which a simple dashboard is built using Athena and QuickSight.


Before starting the implementation, make sure the following requirements are met:

  • You have an AWS account.
  • The AWS Region used for this post is us-east-1. However, this solution is relevant in any other Region where the necessary AWS services are available.
  • Permissions to create Slack a workspace.

Create and configure an Amazon Redshift cluster

To set up your cluster, complete the following steps:

  1. Create a provisioned Amazon Redshift data warehouse.

For this post, we use three Amazon Redshift data warehouses: demo-cluster-ou1, demo-cluster-ou2, and demo-cluster-ou3. In this post, all the Amazon Redshift data warehouses are provisioned clusters. However, the same solution applies for Amazon Redshift Serverless.

  1. To enable audit logging with CloudWatch as the log delivery destination, open an Amazon Redshift cluster and go to the Properties tab.
  2. On the Edit menu, choose Edit audit logging.

Redshift edit audit logging

  1. Select Turn on under Configure audit logging.
  2. Select CloudWatch for Log export type.
  3. Select all three options for User log, Connection log, and User activity log.
  4. Choose Save changes.

  1. Create a parameter group for the clusters with enable_user_activity_logging set as true for each of the clusters.
  2. Modify the cluster to attach the new parameter group to the Amazon Redshift cluster.

For this post, we create three custom parameter groups: custom-param-grp-1, custom-param-grp-2, and custom-param-grp-3 for three clusters.

Note, if you enable only the audit logging feature, but not the associated parameter, the database audit logs log information for only the connection log and user log, but not for the user activity log.

  1. On the CloudWatch console, choose Log groups under Logs in the navigation pane.
  2. Search for /aws/redshift/cluster/demo.

This will show all the log groups created for the Amazon Redshift clusters.

Create a DynamoDB audit table

To create your audit table, complete the following steps:

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Choose Create table.
  3. For Table name, enter demo_redshift_audit_logs.
  4. For Partition key, enter partKey with the data type as String.

  1. Keep the table settings as default.
  2. Choose Create table.

Create Slack resources

Slack Incoming Webhooks expect a JSON request with a message string corresponding to a "text" key. They also support message customization, such as adding a user name and icon, or overriding the webhook’s default channel. For more information, see Sending messages using Incoming Webhooks on the Slack website.

The following resources are created for this post:

  • A Slack workspace named demo_rc
  • A channel named #blog-demo in the newly created Slack workspace
  • A new Slack app in the Slack workspace named demo_redshift_ntfn (using the From Scratch option)
  • Note down the Incoming Webhook URL, which will be used in this post for sending the notifications

Create an IAM role and policy

In this section, we create an AWS Identity and Access Management (IAM) policy that will be attached to an IAM role. The role is then used to grant a Lambda function access to a DynamoDB table. The policy also includes permissions to allow the Lambda function to write log files to Amazon CloudWatch Logs.

  1. On the IAM console, choose Policies in navigation pane.
  2. Choose Create policy.
  3. In the Create policy section, choose the JSON tab and enter the following IAM policy. Make sure you replace your AWS account ID in the policy (replace XXXXXXXX with your AWS account ID).
    "Version": "2012-10-17",
    "Statement": [
            "Sid": "ReadWriteTable",
            "Effect": "Allow",
            "Action": [
            "Resource": [
            "Sid": "WriteLogStreamsAndGroups",
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:logs:*:XXXXXXXX:log-group:*"
  1. Choose Next: Tags, then choose Next: Review.
  2. Provide the policy name demo_post_policy and choose Create policy.

To apply demo_post_policy to a Lambda function, you first have to attach the policy to an IAM role.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. Select AWS service and then select Lambda.
  4. Choose Next.

  1. On the Add permissions page, search for demo_post_policy.
  2. Select demo_post_policy from the list of returned search results, then choose Next.

  1. On the Review page, enter demo_post_role for the role and an appropriate description, then choose Create role.

Create a Lambda function

We create a Lambda function with Python 3.9. In the following code, replace the slack_hook parameter with the Slack webhook you copied earlier:

import gzip
import base64
import json
import boto3
import uuid
import re
import urllib3

http = urllib3.PoolManager()
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table("demo_redshift_audit_logs")
slack_hook = "https://hooks.slack.com/services/xxxxxxx"

def exe_wrapper(data):
    cluster_name = (data['logStream'])
    for event in data['logEvents']:
        message = event['message']
        reg = re.match(r"'(?P<ts>\d{4}-\d\d-\d\dT\d\d:\d\d:\d\dZ).*?\bdb=(?P<db>\S*).*?\buser=(?P<user>\S*).*?LOG:\s+(?P<query>.*?);?$", message)
        if reg is not None:
            filter = reg.groupdict()
            ts = filter['ts']
            db = filter['db']
            user = filter['user']
            query = filter['query']
            query_type = ' '.join((query.split(" "))[0 : 2]).upper()
            object = query.split(" ")[2]
def put_dynamodb(timestamp,cluster,database,user,sql,query_type,object,event):
    table.put_item(Item = {
        'partKey': str(uuid.uuid4()),
        'redshiftCluster': cluster,
        'sqlTimestamp' : timestamp,
        'databaseName' : database,
        'userName': user,
        'sqlQuery': sql,
        'queryType' : query_type,
        'objectName': object,
        'rawData': event
def slack_api(cluster,database,user,sql,query_type,object):
    payload = {
	'channel': '#blog-demo',
	'username': 'demo_redshift_ntfn',
	'blocks': [{
			'type': 'section',
			'text': {
				'type': 'mrkdwn',
				'text': 'Detected *{}* command\n *Affected Object*: `{}`'.format(query_type, object)
			'type': 'divider'
			'type': 'section',
			'fields': [{
					'type': 'mrkdwn',
					'text': ':desktop_computer: *Cluster Name:*\n{}'.format(cluster)
					'type': 'mrkdwn',
					'text': ':label: *Query Type:*\n{}'.format(query_type)
					'type': 'mrkdwn',
					'text': ':card_index_dividers: *Database Name:*\n{}'.format(database)
					'type': 'mrkdwn',
					'text': ':technologist: *User Name:*\n{}'.format(user)
			'type': 'section',
			'text': {
				'type': 'mrkdwn',
				'text': ':page_facing_up: *SQL Query*\n ```{}```'.format(sql)
    encoded_msg = json.dumps(payload).encode('utf-8')
    resp = http.request('POST',slack_hook, body=encoded_msg)

def lambda_handler(event, context):
    print(f'Logging Event: {event}')
    print(f"Awslog: {event['awslogs']}")
    encoded_zipped_data = event['awslogs']['data']
    print(f'data: {encoded_zipped_data}')
    print(f'type: {type(encoded_zipped_data)}')
    zipped_data = base64.b64decode(encoded_zipped_data)
    data = json.loads(gzip.decompress(zipped_data))

Create your function with the following steps:

  1. On the Lambda console, choose Create function.
  2. Select Author from scratch and for Function name, enter demo_function.
  3. For Runtime, choose Python 3.9.
  4. For Execution role, select Use an existing role and choose demo_post_role as the IAM role.
  5. Choose Create function.

  1. On the Code tab, enter the preceding Lambda function and replace the Slack webhook URL.
  2. Choose Deploy.

Create a CloudWatch subscription filter

We need to create the CloudWatch subscription filter on the useractivitylog log group created by the Amazon Redshift clusters.

  1. On the CloudWatch console, navigate to the log group /aws/redshift/cluster/demo-cluster-ou1/useractivitylog.
  2. On the Subscription filters tab, on the Create menu, choose Create Lambda subscription filter.

  1. Choose demo_function as the Lambda function.
  2. For Log format, choose Other.
  3. Provide the subscription filter pattern as ?create ?alter ?drop ?grant ?revoke.
  4. Provide the filter name as Sensitive Queries demo-cluster-ou1.
  5. Test the filter by selecting the actual log stream. If it has any queries with a match pattern, then you can see some results. For testing, use the following pattern and choose Test pattern.
'2023-04-02T04:18:43Z UTC [ db=dev user=awsuser pid=100 userid=100 xid=100 ]' LOG: alter table my_table alter column string type varchar(16);
'2023-04-02T04:06:08Z UTC [ db=dev user=awsuser pid=100 userid=100 xid=200 ]' LOG: create user rs_user with password '***';

  1. Choose Start streaming.

  1. Repeat the same steps for /aws/redshift/cluster/demo-cluster-ou2/useractivitylog and /aws/redshift/cluster/demo-cluster-ou3/useractivitylog by giving unique subscription filter names.
  2. Complete the preceding steps to create a second subscription filter for each of the Amazon Redshift data warehouses with the filter pattern ?CREATE ?ALTER ?DROP ?GRANT ?REVOKE, ensuring uppercase SQL commands are also captured through this solution.

Test the solution

In this section, we test the solution in the three Amazon Redshift clusters that we created in the previous steps and check for the notifications of the commands on the Slack channel as per the CloudWatch subscription filters as well as data getting ingested in the DynamoDB table. We use the following commands to test the solution; however, this is not restricted to these commands only. You can check with other DDL commands as per the filter criteria in your Amazon Redshift cluster.

create schema sales;
create schema marketing;
create table dev.public.demo_test_table_1  (id int, string varchar(10));
create table dev.public.demo_test_table_2  (empid int, empname varchar(100));
alter table dev.public.category alter column catdesc type varchar(65);
drop table dev.public.demo_test_table_1;
drop table dev.public.demo_test_table_2;

In the Slack channel, details of the notifications look like the following screenshot.

To get the results in DynamoDB, complete the following steps:

  1. On the DynamoDB console, choose Explore items under Tables in the navigation pane.
  2. In the Tables pane, select demo_redshift_audit_logs.
  3. Select Scan and Run to get the results in the table.

Athena federation over the DynamoDB table

The Athena DynamoDB connector enables Athena to communicate with DynamoDB so that you can query your tables with SQL. As part of the prerequisites for this, deploy the connector to your AWS account using the Athena console or the AWS Serverless Application Repository. For more details, refer to Deploying a data source connector or Using the AWS Serverless Application Repository to deploy a data source connector. For this post, we use the Athena console.

  1. On the Athena console, under Administration in the navigation pane, choose Data sources.
  2. Choose Create data source.

  1. Select the data source as Amazon DynamoDB, then choose Next.

  1. For Data source name, enter dynamo_db.
  2. For Lambda function, choose Create Lambda function to open a new window with the Lambda console.

  1. Under Application settings, enter the following information:
    • For Application name, enter AthenaDynamoDBConnector.
    • For SpillBucket, enter the name of an S3 bucket.
    • For AthenaCatalogName, enter dynamo.
    • For DisableSpillEncryption, enter false.
    • For LambdaMemory, enter 3008.
    • For LambdaTimeout, enter 900.
    • For SpillPrefix, enter athena-spill-dynamo.

  1. Select I acknowledge that this app creates custom IAM roles and choose Deploy.
  2. Wait for the function to deploy, then return to the Athena window and choose the refresh icon next to Lambda function.
  3. Select the newly deployed Lambda function and choose Next.

  1. Review the information and choose Create data source.
  2. Navigate back to the query editor, then choose dynamo_db for Data source and default for Database.
  3. Run the following query in the editor to check the sample data:
SELECT partkey,
FROM dynamo_db.default.demo_redshift_audit_logs limit 10;

Visualize the data in QuickSight

In this section, we create a simple governance dashboard in QuickSight using Athena in direct query mode to query the record set, which is persistently stored in a DynamoDB table.

  1. Sign up for QuickSight on the QuickSight console.
  2. Select Amazon Athena as a resource.
  3. Choose Lambda and select the Lambda function created for DynamoDB federation.

  1. Create a new dataset in QuickSight with Athena as the source.
  2. Provide the name of the data source name as demo_blog.
  3. Choose dynamo_db for Catalog, default for Database, and demo_redshift_audit_logs for Table.
  4. Choose Edit/Preview data.

  1. Choose String in the sqlTimestamp column and choose Date.

  1. In the dialog box that appears, enter the data format yyyy-MM-dd'T'HH:mm:ssZZ.
  2. Choose Validate and Update.

  2. Choose Interactive sheet and choose CREATE.

This will take you to the visualization page to create the analysis on QuickSight.

  1. Create a governance dashboard with the appropriate visualization type.

Refer to the Amazon QuickSight learning videos in QuickSight community for basic to advanced level of authoring. The following screenshot is a sample visualization created on this data.

Clean up

Clean up your resources with the following steps:

  1. Delete all the Amazon Redshift clusters.
  2. Delete the Lambda function.
  3. Delete the CloudWatch log groups for Amazon Redshift and Lambda.
  4. Delete the Athena data source for DynamoDB.
  5. Delete the DynamoDB table.


Amazon Redshift is a powerful, fully managed data warehouse that can offer significantly increased performance and lower cost in the cloud. In this post, we discussed a pattern to implement a governance mechanism to identify and notify sensitive DDL/DCL queries on an Amazon Redshift data warehouse, and created a quick dashboard to enable the DBA and security team to take timely and prompt action as required. Additionally, you can extend this solution to include DDL commands used for Amazon Redshift data sharing across clusters.

Operational excellence is a critical part of the overall data governance on creating a modern data architecture, as it’s a great enabler to drive our customers’ business. Ideally, any data governance implementation is a combination of people, process, and technology that organizations use to ensure the quality and security of their data throughout its lifecycle. Use these instructions to set up your automated notification mechanism as sensitive queries are detected as well as create a quick dashboard on QuickSight to track the activities over time.

About the Authors

Rajdip Chaudhuri is a Senior Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer and movies.

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Harmonize data using AWS Glue and AWS Lake Formation FindMatches ML to build a customer 360 view

Post Syndicated from Nishchai JM original https://aws.amazon.com/blogs/big-data/harmonize-data-using-aws-glue-and-aws-lake-formation-findmatches-ml-to-build-a-customer-360-view/

In today’s digital world, data is generated by a large number of disparate sources and growing at an exponential rate. Companies are faced with the daunting task of ingesting all this data, cleansing it, and using it to provide outstanding customer experience.

Typically, companies ingest data from multiple sources into their data lake to derive valuable insights from the data. These sources are often related but use different naming conventions, which will prolong cleansing, slowing down the data processing and analytics cycle. This problem particularly impacts companies trying to build accurate, unified customer 360 profiles. There are customer records in this data that are semantic duplicates, that is, they represent the same user entity, but have different labels or values. It’s commonly referred to as a data harmonization or deduplication problem. The underlying schemas were implemented independently and don’t adhere to common keys that can be used for joins to deduplicate records using deterministic techniques. This has led to so-called fuzzy deduplication techniques to address the problem. These techniques utilize various machine learning (ML) based approaches.

In this post, we look at how we can use AWS Glue and the AWS Lake Formation ML transform FindMatches to harmonize (deduplicate) customer data coming from different sources to get a complete customer profile to be able to provide better customer experience. We use Amazon Neptune to visualize the customer data before and after the merge and harmonization.

Overview of solution

In this post, we go through the various steps to apply ML-based fuzzy matching to harmonize customer data across two different datasets for auto and property insurance. These datasets are synthetically generated and represent a common problem for entity records stored in multiple, disparate data sources with their own lineage that appear similar and semantically represent the same entity but don’t have matching keys (or keys that work consistently) for deterministic, rule-based matching. The following diagram shows our solution architecture.

We use an AWS Glue job to transform the auto insurance and property insurance customer source data to create a merged dataset containing fields that are common to both datasets (identifiers) that a human expert (data steward) would use to determine semantic matches. The merged dataset is then used to deduplicate customer records using an AWS Glue ML transform to create a harmonized dataset. We use Neptune to visualize the customer data before and after the merge and harmonization to see how the transform FindMacthes can bring all related customer data together to get a complete customer 360 view.

To demonstrate the solution, we use two separate data sources: one for property insurance customers and another for auto insurance customers, as illustrated in the following diagram.

The data is stored in an Amazon Simple Storage Service (Amazon S3) bucket, labeled as Raw Property and Auto Insurance data in the following architecture diagram. The diagram also describes detailed steps to process the raw insurance data into harmonized insurance data to avoid duplicates and build logical relations with related property and auto insurance data for the same customer.

The workflow includes the following steps:

  1. Catalog the raw property and auto insurance data, using an AWS Glue crawler, as tables in the AWS Glue Data Catalog.
  2. Transform raw insurance data into CSV format acceptable to Neptune Bulk Loader, using an AWS Glue extract, transform, and load (ETL) job.
  3. When the data is in CSV format, use an Amazon SageMaker Jupyter notebook to run a PySpark script to load the raw data into Neptune and visualize it in a Jupyter notebook.
  4. Run an AWS Glue ETL job to merge the raw property and auto insurance data into one dataset and catalog the merged dataset. This dataset will have duplicates and no relations are built between the auto and property insurance data.
  5. Create and train an AWS Glue ML transform to harmonize the merged data to remove duplicates and build relations between the related data.
  6. Run the AWS Glue ML transform job. The job also catalogs the harmonized data in the Data Catalog and transforms the harmonized insurance data into CSV format acceptable to Neptune Bulk Loader.
  7. When the data is in CSV format, use a Jupyter notebook to run a PySpark script to load the harmonized data into Neptune and visualize it in a Jupyter notebook.


To follow along with this walkthrough, you must have an AWS account. Your account should have permission to provision and run an AWS CloudFormation script to deploy the AWS services mentioned in the architecture diagram of the solution.

Provision required resources using AWS CloudFormation:

To launch the CloudFormation stack that configures the required resources for this solution in your AWS account, complete the following steps:

  1. Log in to your AWS account and choose Launch Stack:

  1. Follow the prompts on the AWS CloudFormation console to create the stack.
  2. When the launch is complete, navigate to the Outputs tab of the launched stack and note all the key-value pairs of the resources provisioned by the stack.

Verify the raw data and script files S3 bucket

On the CloudFormation stack’s Outputs tab, choose the value for S3BucketName. The S3 bucket name should be cloud360-s3bucketstack-xxxxxxxxxxxxxxxxxxxxxxxx and should contain folders similar to the following screenshot.

The following are some important folders:

  • auto_property_inputs – Contains raw auto and property data
  • merged_auto_property – Contains the merged data for auto and property insurance
  • output – Contains the delimited files (separate subdirectories)

Catalog the raw data

To help walk through the solution, the CloudFormation stack created and ran an AWS Glue crawler to catalog the property and auto insurance data. To learn more about creating and running AWS Glue crawlers, refer to Working with crawlers on the AWS Glue console. You should see the following tables created by the crawler in the c360_workshop_db AWS Glue database:

  • source_auto_address – Contains address data of customers with auto insurance
  • source_auto_customer – Contains auto insurance details of customers
  • source_auto_vehicles – Contains vehicle details of customers
  • source_property_addresses – Contains address data of customers with property insurance
  • source_property_customers – Contains property insurance details of customers

You can review the data using Amazon Athena. For more information about using Athena to query an AWS Glue table, refer to Running SQL queries using Amazon Athena. For example, you can run the following SQL query:

SELECT * FROM "c360_workshop_db"."source_auto_address" limit 10;

Convert the raw data into CSV files for Neptune

The CloudFormation stack created and ran the AWS Glue ETL job prep_neptune_data to convert the raw data into CSV format acceptable to Neptune Bulk Loader. To learn more about building an AWS Glue ETL job using AWS Glue Studio and to review the job created for this solution, refer to Creating ETL jobs with AWS Glue Studio.

Verify the completion of job run by navigating to the Runs tab and checking the status of most recent run.

Verify the CSV files created by the AWS Glue job in the S3 bucket under the output folder.

Load and visualize the raw data in Neptune

This section uses SageMaker Jupyter notebooks to load, query, explore, and visualize the raw property and auto insurance data in Neptune. Jupyter notebooks are web-based interactive platforms. We use Python scripts to analyze the data in a Jupyter notebook. A Jupyter notebook with the required Python scripts has already been provisioned by the CloudFormation stack.

  1. Start Jupyter Notebook.
  2. Choose the Neptune folder on the Files tab.

  1. Under the Customer360 folder, open the notebook explore_raw_insurance_data.ipynb.

  1. Run Steps 1–5 in the notebook to analyze and visualize the raw insurance data.

The rest of the instructions are inside the notebook itself. The following is a summary of the tasks for each step in the notebook:

  • Step 1: Retrieve Config – Run this cell to run the commands to connect to Neptune for Bulk Loader.
  • Step 2: Load Source Auto Data – Load the auto insurance data into Neptune as vertices and edges.
  • Step 3: Load Source Property Data – Load the property insurance data into Neptune as vertices and edges.
  • Step 4: UI Configuration – This block sets up the UI config and provides UI hints.
  • Step 5: Explore entire graph – The first block builds and displays a graph for all customers with more than four coverages of auto or property insurance policies. The second block displays the graph for four different records for a customer with the name James.

These are all records for the same customer, but because they’re not linked in any way, they appear as different customer records. The AWS Glue FindMatches ML transform job will identify these records as customer James, and the records provide complete visibility on all policies owned by James. The Neptune graph looks like the following example. The vertex covers represents the coverage of auto or property insurance by the owner (James in this case) and the vertex locatedAt represents the address of the property or vehicle that is covered by the owner’s insurance.

Merge the raw data and crawl the merged dataset

The CloudFormation stack created and ran the AWS Glue ETL job merge_auto_property to merge the raw property and auto insurance data into one dataset and catalog the resultant dataset in the Data Catalog. The AWS Glue ETL job does the following transforms on the raw data and merges the transformed data into one dataset:

  • Changes the following fields on the source table source_auto_customer:
    1. Changes policyid to id and data type to string
    2. Changes fname to first_name
    3. Changes lname to last_name
    4. Changes work to company
    5. Changes dob to date_of_birth
    6. Changes phone to home_phone
    7. Drops the fields birthdate, priority, policysince, and createddate
  • Changes the following fields on the source_property_customers:
    1. Changes customer_id to id and data type to string
    2. Changes social to ssn
    3. Drops the fields job, email, industry, city, state, zipcode, netnew, sales_rounded, sales_decimal, priority, and industry2
  • After converting the unique ID field in each table to string type and renaming it to id, the AWS Glue job appends the suffix -auto to all id fields in the source_auto_customer table and the suffix -property to all id fields in the source_propery_customer table before copying all the data from both tables into the merged_auto_property table.

Verify the new table created by the job in the Data Catalog and review the merged dataset using Athena using below Athena SQL query:

SELECT * FROM "c360_workshop_db"."merged_auto_property" limit 10

For more information about how to review the data in the merged_auto_property table, refer to Running SQL queries using Amazon Athena.

Create, teach, and tune the Lake Formation ML transform

The merged AWS Glue job created a Data Catalog called merged_auto_property. Preview the table in Athena Query Editor and download the dataset as a CSV from the Athena console. You can open the CSV file for quick comparison of duplicates.

The rows with IDs 11376-property and 11377-property are mostly same except for the last two digits of their SSN, but these are mostly human errors. The fuzzy matches are easy to spot by a human expert or data steward with domain knowledge of how this data was generated, cleansed, and processed in the various source systems. Although a human expert can identify those duplicates on a small dataset, it becomes tedious when dealing with thousands of records. The AWS Glue ML transform builds on this intuition and provides an easy-to-use ML-based algorithm to automatically apply this approach to large datasets efficiently.

Create the FindMatches ML transform

  1. On the AWS Glue console, expand Data Integration and ETL in the navigation pane.
  2. Under Data classification tools, choose Record Matching.

This will open the ML transforms page.

  1. Choose Create transform.
  2. For Name, enter c360-ml-transform.
  3. For Existing IAM role, choose GlueServiceRoleLab.
  4. For Worker type, choose G.2X (Recommended).
  5. For Number of workers, enter 10.
  6. For Glue version, choose as Spark 2.4 (Glue Version 2.0).
  7. Keep the other values as default and choose Next.

  1. For Database, choose c360_workshop_db.
  2. For Table, choose merged_auto_property.
  3. For Primary key, select id.
  4. Choose Next.

  1. In the Choose tuning options section, you can tune performance and cost metrics available for the ML transform. We stay with the default trade-offs for a balanced approach.

We have specified these values to achieve balanced results. If needed, you can adjust these values later by selecting the transform and using the Tune menu.

  1. Review the values and choose Create ML transform.

The ML transform is now created with the status Needs training.

Teach the transform to identify the duplicates

In this step, we teach the transform by providing labeled examples of matching and non-matching records. You can create your labeling set yourself or allow AWS Glue to generate the labeling set based on heuristics. AWS Glue extracts records from your source data and suggests potential matching records. The file will contain approximately 100 data samples for you to work with.

  1. On the AWS Glue console, navigate to the ML transforms page.
  2. Select the transform c360-ml-transform and choose Train model.

  1. Select I have labels and choose Browse S3 to upload labels from Amazon S3.

Two labeled files have been created for this example. We upload these files to teach the ML transform.

  1. Navigate to the folder label in your S3 bucket, select the labeled file (Label-1-iteration.csv), and choose Choose. And Click “Upload labeling file from S3”.
  2. A green banner appears for successful uploads.
  3. Upload another label file (Label-2-iteration.csv) and select Append to my existing labels.
  4. Wait for the successful upload, then choose Next.

  1. Review the details in the Estimate quality metrics section and choose Close.

Verify that the ML transform status is Ready for use. Note that the label count is 200 because we successfully uploaded two labeled files to teach the transform. Now we can use it in an AWS Glue ETL job for fuzzy matching of the full dataset.

Before proceeding to the next steps, note the transform ID (tfm-xxxxxxx) for the created ML transform.

Harmonize the data, catalog the harmonized data, and convert the data into CSV files for Neptune

In this step, we run an AWS Glue ML transform job to find matches in the merged data. The job also catalogs the harmonized dataset in the Data Catalog and converts the merged [A1] dataset into CSV files for Neptune to show the relations in the matched records.

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job perform_ml_dedup.

  1. On the job details page, expand Additional properties.
  2. Under Job parameters, enter the transform ID you saved earlier and save the settings.

    1. Choose Run and monitor the job status for completion.

  1. Run the following query in Athena to review the data in the new table ml_matched_auto_property, created and cataloged by the AWS Glue job, and observe the results:
SELECT * FROM c360_workshop_db.ml_matched_auto_property WHERE first_name like 'Jam%' and last_name like 'Sanchez%';

The job has added a new column called match_id. If multiple records follow the match criteria, then all matching records have the same match_id.

Match IDs play a crucial role in data harmonization using Lake Formation FindMatches. Each row is assigned a unique integer match ID based on matching criteria such as first_name, last_name, SSN, or date_of_birth, as defined in the uploaded label file. For instance, match ID 25769803941 is assigned to all records that meet the match criteria, such as row 1, 2, 4, and 5 which share the same last_name, SSN, and date_of_birth. Consequently, the properties with ID 19801-property, 29801-auto, 19800-property, and 29800-auto all share the same match ID. It’s important to take note of the match ID because it will be utilized for Neptune Gremlin queries.

The output of the AWS Glue job also has created two files, master_vertex.csv and master_edge.csv, in the S3 bucket output/master_data. We use these files to load data into the Neptune database to find the relationship among different entities.

Load and visualize the harmonized data in Neptune

This section uses Jupyter notebooks to load, query, explore, and visualize the ML matched auto and property insurance data in Neptune. Complete the following steps:

  1. Start Jupyter Notebook.
  2. Choose the Neptune folder on the Files tab.
  3. Under the Customer360 folder, choose the notebook. explore_harmonized_insurance_data.ipynb.
  4. Run Steps 1–5 in the notebook to analyze and visualize the raw insurance data.

The rest of the instructions are inside the notebook itself. The following is a summary of the tasks for each step in the notebook:

  • Step 1. Retrieve Config – Run this cell to run the commands to connect to Neptune for Bulk Loader.
  • Step 2. Load Harmonized Customer Data – Load the final vertex and edge files into Neptune.
  • Step 3. Initialize Neptune node traversals – This block sets up the UI config and provides UI hints.
  • Step 4. Exploring Customer 360 graph – Replace the Match_id 25769803941 copied from the previous step into g.V('REPLACE_ME')( If its not replaced already ) and run the cell.

This displays the graph for four different records for a customer with first_name, and James and JamE are is now connected with the SameAs vertex. The Neptune graph helps connect different entities with match criteria; the AWS Glue FindMatches ML transform job has identified these records as customer James and the records show the Match_id is the same for them. The following diagram shows an example of the Neptune graph. The vertex covers represents the coverage of auto or property insurance by the owner (James in this case) and the vertex locatedAt represents the address of the property or vehicle that is covered by the owner’s insurance.

Clean up

To avoid incurring additional charges to your account, on the AWS CloudFormation console, select the stack that you provisioned as part of this post and delete it.


In this post, we showed how to use the AWS Lake Formation FindMatch transform for fuzzy matching data on a data lake to link records if there are no join keys and group records with similar match IDs. You can use Amazon Neptune to establish the relationship between records and visualize the connect graph for deriving insights.

We encourage you to explore our range of services and see how they can help you achieve your goals. For more data and analytics blog posts, check out AWS Blogs.

About the Authors

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Varad Ram is Senior Solutions Architect in Amazon Web Services. He likes to help customers adopt to cloud technologies and is particularly interested in artificial intelligence. He believes deep learning will power future technology growth. In his spare time, he like to be outdoor with his daughter and son.

Narendra Gupta is a Specialist Solutions Architect at AWS, helping customers on their cloud journey with a focus on AWS analytics services. Outside of work, Narendra enjoys learning new technologies, watching movies, and visiting new places

Arun A K is a Big Data Solutions Architect with AWS. He works with customers to provide architectural guidance for running analytics solutions on the cloud. In his free time, Arun loves to enjoy quality time with his family

Enable business users to analyze large datasets in your data lake with Amazon QuickSight

Post Syndicated from Eliad Maimon original https://aws.amazon.com/blogs/big-data/enable-business-users-to-analyze-large-datasets-in-your-data-lake-with-amazon-quicksight/

This blog post is co-written with Ori Nakar from Imperva.

Imperva Cloud WAF protects hundreds of thousands of websites and blocks billions of security events every day. Events and many other security data types are stored in Imperva’s Threat Research Multi-Region data lake.

Imperva harnesses data to improve their business outcomes. To enable this transformation to a data-driven organization, Imperva brings together data from structured, semi-structured, and unstructured sources into a data lake. As part of their solution, they are using Amazon QuickSight to unlock insights from their data.

Imperva’s data lake is based on Amazon Simple Storage Service (Amazon S3), where data is continually loaded. Imperva’s data lake has a few dozen different datasets, in the scale of petabytes. Each day, TBs of new data is added to the data lake, which is then transformed, aggregated, partitioned, and compressed.

In this post, we explain how Imperva’s solution enables users across the organization to explore, visualize, and analyze data using Amazon Redshift Serverless, Amazon Athena, and QuickSight.

Challenges and needs

A modern data strategy gives you a comprehensive plan to manage, access, analyze, and act on data. AWS provides the most complete set of services for the entire end-to-end data journey for all workloads, all types of data, and all desired business outcomes. In turn, this makes AWS the best place to unlock value from your data and turn it into insight.

Redshift Serverless is a serverless option of Amazon Redshift that allows you to run and scale analytics without having to provision and manage data warehouse clusters. Redshift Serverless automatically provisions and intelligently scales data warehouse capacity to deliver high performance for all your analytics. You just need to load and query your data, and you only pay for the compute used for the duration of the workloads on a per-second basis. Redshift Serverless is ideal when it’s difficult to predict compute needs such as variable workloads, periodic workloads with idle time, and steady-state workloads with spikes.

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, straightforward to use, and makes it simple for anyone with SQL skills to quickly analyze large-scale datasets in multiple Regions.

QuickSight is a cloud-native business intelligence (BI) service that you can use to visually analyze data and share interactive dashboards with all users in the organization. QuickSight is fully managed and serverless, requires no client downloads for dashboard creation, and has a pay-per-session pricing model that allows you to pay for dashboard consumption. Imperva uses QuickSight to enable users with no technical expertise, from different teams such as marketing, product, sales, and others, to extract insight from the data without the help of data or research teams.

QuickSight offers SPICE, an in-memory, cloud-native data store that allows end-users to interactively explore data. SPICE provides consistently fast query performance and automatically scales for high concurrency. With SPICE, you save time and cost because you don’t need to retrieve data from the data source (whether a database or data warehouse) every time you change an analysis or update a visual, and you remove the load of concurrent access or analytical complexity off the underlying data source with the data.

In order for QuickSight to consume data from the data lake, some of the data undergoes additional transformations, filters, joins, and aggregations. Imperva cleans their data by filtering incomplete records, reducing the number of records by aggregations, and applying internal logic to curate millions of security incidents out of hundreds of millions of records.

Imperva had the following requirements for their solution:

  • High performance with low query latency to enable interactive dashboards
  • Continuously update and append data to queryable sources from the data lake
  • Data freshness of up to 1 day
  • Low cost
  • Engineering efficiency

The challenge faced by Imperva and many other companies is how to create a big data extract, transform, and load (ETL) pipeline solution that fits these requirements.

In this post, we review two approaches Imperva implemented to address their challenges and meet their requirements. The solutions can be easily implemented while maintaining engineering efficiency, especially with the introduction of Redshift Serverless.

Imperva’s solutions

Imperva needed to have the data lake’s data available through QuickSight continuously. The following solutions were chosen to connect the data lake to QuickSight:

  • QuickSight caching layer, SPICE – Use Athena to query the data into a QuickSight SPICE dataset
  • Redshift Serverless – Copy the data to Redshift Serverless and use it as a data source

Our recommendation is to use a solution based on the use case. Each solution has its own advantages and challenges, which we discuss as part of this post.

The high-level flow is the following:

  • Data is continuously updated from the data lake into either Redshift Serverless or the QuickSight caching layer, SPICE
  • An internal user can create an analysis and publish it as a dashboard for other internal or external users

The following architecture diagram shows the high-level flow.

High-level flow

In the following sections, we discuss the details about the flow and the different solutions, including a comparison between them, which can help you choose the right solution for you.

Solution 1: Query with Athena and import to SPICE

QuickSight provides inherent capabilities to upload data using Athena into SPICE, which is a straightforward approach that meets Imperva’s requirements regarding simple data management. For example, it suits stable data flows without frequent exceptions, which may result in SPICE full refresh.

You can use Athena to load data into a QuickSight SPICE dataset, and then use the SPICE incremental upload option to load new data to the dataset. A QuickSight dataset will be connected to a table or a view accessible by Athena. A time column (like day or hour) is used for incremental updates. The following table summarizes the options and details.

Option Description Pros/Cons
Existing table Use the built-in option by QuickSight. Not flexible—the table is imported as is in the data lake.
Dedicated view

A view will let you better control the data in your dataset. It allows joining data, aggregation, or choosing a filter like the date you want to start importing data from.

Note that QuickSight allows building a dataset based on custom SQL, but this option doesn’t allow incremental updates.

Large Athena resource consumption on a full refresh.
Dedicated ETL

Create a dedicated ETL process, which is similar to a view, but unlike the view, it allows reuse of the results in case of a full refresh.

In case your ETL or view contains grouping or other complex operations, you know that these operations will be done only by the ETL process, according to the schedule you define.

Most flexible, but requires ETL development and implementation and additional Amazon S3 storage.

The following architecture diagram details the options for loading data by Athena into SPICE.

Architecture diagram details the options for loading data by Athena into SPICE

The following code provides a SQL example for a view creation. We assume the existence of two tables, customers and events, with one join column called customer_id. The view is used to do the following:

  • Aggregate the data from daily to weekly, and reduce the number of rows
  • Control the start date of the dataset (in this case, 30 weeks back)
  • Join the data to add more columns (customer_type) and filter it
CREATE VIEW my_dataset AS
SELECT DATE_ADD('day', -DAY_OF_WEEK(day) + 1, day) AS first_day_of_week,
       customer_type, event_type, COUNT(events) AS total_events
FROM my_events INNER JOIN my_customers USING (customer_id)
WHERE customer_type NOT IN ('Reseller')
GROUP BY 1, 2, 3

Solution 2: Load data into Redshift Serverless

Redshift Serverless provides full visibility to the data, which can be viewed or edited at any time. For example, if there is a delay in adding data to the data lake or the data isn’t properly added, with Redshift Serverless, you can edit data using SQL statements or retry data loading. Redshift Serverless is a scalable solution that doesn’t have a dataset size limitation.

Redshift Serverless is used as a serving layer for the datasets that are to be used in QuickSight. The pricing model for Redshift Serverless is based on storage utilization and the run of queries; idle compute resources have no associated cost. Setting up a cluster is simple and doesn’t require you to choose node types or amount of storage. You simply load the data to tables you create and start working.

To create a new dataset, you need to create an Amazon Redshift table and run the following process every time data is added:

  1. Transform the data using an ETL process (optional):
    • Read data from the tables.
    • Transform to the QuickSight dataset schema.
    • Write the data to an S3 bucket and load it to Amazon Redshift.
  2. Delete old data if it exists to avoid duplicate data.
  3. Load the data using the COPY command.

The following architecture diagram details the options to load data into Redshift Serverless with or without an ETL process.

Architecture diagram details the options to load data into Redshift Serverless with or without an ETL process

The Amazon Redshift COPY command is simple and fast. For example, to copy daily partition Parquet data, use the following code:

COPY my_table
FROM 's3://my_bucket/my_table/day=2022-01-01'
IAM_ROLE 'my_role' 

Use the following COPY command to load the output file of the ETL process. Values will be truncated according to Amazon Redshift column size. The column truncation is important because, unlike in the data lake, in Amazon Redshift, the column size must be set. This option prevents COPY failures:

COPY my_table
FROM 's3://my_bucket/my_table/day=2022-01-01'
IAM_ROLE 'my_role' 

The Amazon Redshift COPY operation provides many benefits and options. It supports multiple formats as well as column mapping, escaping, and more. It also allows more control over data format, object size, and options to tune the COPY operation for improved performance. Unlike data in the data lake, Amazon Redshift has column length specifications. We use TRUNCATECOLUMNS to truncates the data in columns to the appropriate number of characters so that it fits the column specification.

Using this method provides full control over the data. In case of a problem, we can repair parts of the table by deleting old data and loading the data again. It’s also possible to use the QuickSight dataset JOIN option, which is not available in SPICE when using incremental update.

Additional benefit of this approach is that the data is available for other clients and services looking to use the same data, such as SQL clients or notebooks servers such as Apache Zeppelin.


QuickSight allows Imperva to expose business data to various departments within an organization. In the post, we explored approaches for importing data from a data lake to QuickSight, whether continuously or incrementally.

However, it’s important to note that there is no one-size-fits-all solution; the optimal approach will depend on the specific use case. Both options—continuous and incremental updates—are scalable and flexible, with no significant cost differences observed for our dataset and access patterns.

Imperva found incremental refresh to be very useful and uses it for simple data management. For more complex datasets, Imperva has benefitted from the greater scalability and flexibility provided by Redshift Serverless.

In cases where a higher degree of control over the datasets was required, Imperva chose Redshift Serverless so that data issues could be addressed promptly by deleting, updating, or inserting new records as necessary.

With the integration of dashboards, individuals can now access data that was previously inaccessible to them. Moreover, QuickSight has played a crucial role in streamlining our data distribution processes, enabling data accessibility across all departments within the organization.

To learn more, visit Amazon QuickSight.

About the Authors

Eliad Maimon is a Senior Startups Solutions Architect at AWS in Tel-Aviv with over 20 years of experience in architecting, building, and maintaining software products. He creates architectural best practices and collaborates with customers to leverage cloud and innovation, transforming businesses and disrupting markets. Eliad is specializing in machine learning on AWS, with a focus in areas such as generative AI, MLOps, and Amazon SageMaker.

Ori Nakar is a principal cyber-security researcher, a data engineer, and a data scientist at Imperva Threat Research group. Ori has many years of experience as a software engineer and engineering manager, focused on cloud technologies and big data infrastructure.

Accelerate data science feature engineering on transactional data lakes using Amazon Athena with Apache Iceberg

Post Syndicated from Vivek Gautam original https://aws.amazon.com/blogs/big-data/accelerate-data-science-feature-engineering-on-transactional-data-lakes-using-amazon-athena-with-apache-iceberg/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) and data sources residing in AWS, on-premises, or other cloud systems using SQL or Python. Athena is built on open-source Trino and Presto engines, and Apache Spark frameworks, with no provisioning or configuration effort required. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Apache Iceberg is an open table format for very large analytic datasets. It manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

Feature engineering is a process of identifying and transforming raw data (images, text files, videos, and so on), backfilling missing data, and adding one or more meaningful data elements to provide context so a machine learning (ML) model can learn from it. Data labeling is required for various use cases, including forecasting, computer vision, natural language processing, and speech recognition.

Combined with the capabilities of Athena, Apache Iceberg delivers a simplified workflow for data scientists to create new data features without needing to copy or recreate the entire dataset. You can create features using standard SQL on Athena without using any other service for feature engineering. Data scientists can reduce the time spent preparing and copying datasets, and instead focus on data feature engineering, experimentation, and analyzing data at scale.

In this post, we review the benefits of using Athena with the Apache Iceberg open table format and how it simplifies common feature engineering tasks for data scientists. We demonstrate how Athena can convert an existing table in Apache Iceberg format, then add columns, delete columns, and modify the data in the table without recreating or copying the dataset, and use these capabilities to create new features on Apache Iceberg tables.

Solution overview

Data scientists are generally accustomed to working with large datasets. Datasets are usually stored in either JSON, CSV, ORC, or Apache Parquet format, or similar read-optimized formats for fast read performance. Data scientists often create new data features, and backfill such data features with aggregated and ancillary data. Historically, this task was accomplished by creating a view on top of the table with the underlying data in Apache Parquet format, where such columns and data were added at runtime or by creating a new table with additional columns. Although this workflow is well-suited for many use cases, it’s inefficient for large datasets, because data would need to be generated at runtime or datasets would need to be copied and transformed.

Athena has introduced ACID (Atomicity, Consistency, Isolation, Durability) transaction capabilities that add INSERT, UPDATE, DELETE, MERGE, and time travel operations built on Apache Iceberg tables. These capabilities enable data scientists to create new data features and drop existing data features on existing datasets without worrying about copying or transforming the dataset or abstracting it with a view. Data scientists can focus on feature engineering work and avoid copying and transforming the datasets.

The Athena Iceberg UPDATE operation writes Apache Iceberg position delete files and newly updated rows as data files in the same transaction. You can make record corrections via a single UPDATE statement.

With the release of Athena engine version 3, the capabilities for Apache Iceberg tables are enhanced with the support for operations such as CREATE TABLE AS SELECT (CTAS) and MERGE commands that streamline the lifecycle management of your Iceberg data. CTAS makes it fast and efficient to create tables from other formats such as Apache Paquet, and MERGE INTO conditional updates, deletes, or inserts rows into an Iceberg table. A single statement can combine update, delete, and insert actions.


Set up an Athena workgroup with Athena engine version 3 to use CTAS and MERGE commands with an Apache Iceberg table. To upgrade your existing Athena engine to version 3 in your Athena workgroup, follow the instructions in Upgrade to Athena engine version 3 to increase query performance and access more analytics features or refer to Changing the engine version in the Athena console.


For demonstration, we use an Apache Parquet table that contains several million records of randomly distributed fictitious sales data from the last several years stored in an S3 bucket. Download the dataset, unzip it to your local computer, and upload it to your S3 bucket. In this post, we uploaded our dataset to s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/.

The following table shows the layout for the table customer_orders.

Column Name Data Type Description
orderkey string Order number for the order
custkey string Customer identification number
orderstatus string Status of the order
totalprice string Total price of the order
orderdate string Date of the order
orderpriority string Priority of the order
clerk string Name of the clerk who processed the order
shippriority string Priority on the shipping
name string Customer name
address string Customer address
nationkey string Customer nation key
phone string Customer phone number
acctbal string Customer account balance
mktsegment string Customer market segment

Perform feature engineering

As a data scientist, we want to perform feature engineering on the customer orders data by adding calculated one year total purchases and one year average purchases for each customer in the existing dataset. For demonstration purposes, we created the customer_orders table in the sampledb database using Athena as shown in the following DDL command. (You can use any of your existing datasets and follow the steps mentioned in this post.) The customer_orders dataset was generated and stored in the S3 bucket location s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/ in Parquet format. This table is not an Apache Iceberg table.

CREATE EXTERNAL TABLE sampledb.customer_orders(
  `orderkey` string, 
  `custkey` string, 
  `orderstatus` string, 
  `totalprice` string, 
  `orderdate` string, 
  `orderpriority` string, 
  `clerk` string, 
  `shippriority` string, 
  `name` string, 
  `address` string, 
  `nationkey` string, 
  `phone` string, 
  `acctbal` string, 
  `mktsegment` string)

Validate the data in the table by running a query:

from sampledb.customer_orders 
limit 10;

We want to add new features to this table to get a deeper understanding of customer sales, which can result in faster model training and more valuable insights. To add new features to the dataset, convert the customer_orders Athena table to Apache Iceberg table on Athena. Issue a CTAS query statement to create a new table with Apache Iceberg format from the customer_orders table. While doing so, a new feature is added to get the total purchase amount in the past year (max year of the dataset) by each customer.

In the following CTAS query, a new column named one_year_sales_aggregate with the default value as 0.0 of data type double is added and table_type is set to ICEBERG:

CREATE TABLE  sampledb.customers_orders_aggregate
WITH (table_type = 'ICEBERG',
   format = 'PARQUET', 
   location = 's3://sample-iceberg-datasets-xxxxxxxxxxxx/sampledb/customer_orders_aggregate', 
   is_external = false
0.0 as one_year_sales_aggregate
from sampledb.customer_orders;

Issue the following query to verify the data in the Apache Iceberg table with the new column one_year_sales_aggregate values as 0.0:

SELECT custkey, totalprice, one_year_sales_aggregate 
from sampledb.customers_orders_aggregate 
limit 10;

We want to populate the values for the new feature one_year_sales_aggregate in the dataset to get the total purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the Apache Iceberg table using Athena to populate values for the one_year_sales_aggregate feature:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y ') as    orderdate, 
            sum(CAST(totalprice as double)) as one_year_sales_aggregate
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y ') = (select date_format(max(CAST(orderdate as date)), '%Y ') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y ')) sales_one_year_agg
    ON (coa.custkey = sales_one_year_agg.custkey)
        THEN UPDATE SET one_year_sales_aggregate = sales_one_year_agg.one_year_sales_aggregate;

Issue the following query to validate the updated value for total spend by each customer in the past year:

SELECT custkey, totalprice, one_year_sales_aggregate
from sampledb.customers_orders_aggregate limit 10;

We decide to add another feature onto an existing Apache Iceberg table to compute and store the average purchase amount in the past year by each customer. Issue an ALTER query statement to add a new column to an existing table for feature one_year_sales_average:

ALTER TABLE sampledb.customers_orders_aggregate
ADD COLUMNS (one_year_sales_average double);

Before populating the values to this new feature, you can set the default value for the feature one_year_sales_average to 0.0. Using the same Apache Iceberg table on Athena, issue an UPDATE query statement to populate the value for the new feature as 0.0:

UPDATE sampledb.customers_orders_aggregate
SET one_year_sales_average = 0.0;

Issue the following query to verify the updated value for average spend by each customer in the past year is set to 0.0:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Now we want to populate the values for the new feature one_year_sales_average in the dataset to get the average purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the existing Apache Iceberg table on Athena using the Athena engine to populate values for the feature one_year_sales_average:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y') as orderdate, 
            avg(CAST(totalprice as double)) as one_year_sales_average
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y') = (select date_format(max(CAST(orderdate as date)), '%Y') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y')) sales_one_year_avg
    ON (coa.custkey = sales_one_year_avg.custkey)
        THEN UPDATE SET one_year_sales_average = sales_one_year_avg.one_year_sales_average;

Issue the following query to verify the updated values for average spend by each customer:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Once additional data features have been added to the dataset, data scientists generally proceed to train ML models and make inferences using Amazon Sagemaker or equivalent toolset.


In this post, we demonstrated how to perform feature engineering using Athena with Apache Iceberg. We also demonstrated using the CTAS query to create an Apache Iceberg table on Athena from an existing dataset in Apache Parquet format, adding new features in an existing Apache Iceberg table on Athena using the ALTER query, and using UPDATE and MERGE query statements to update the feature values of existing columns.

We encourage you to use CTAS queries to create tables quickly and efficiently, and use the MERGE query statement to synchronize tables in one step to simplify data preparations and update tasks when transforming the features using Athena with Apache Iceberg. If you have comments or feedback, please leave them in the comments section.

About the Authors

Vivek Gautam is a Data Architect with specialization in data lakes at AWS Professional Services. He works with enterprise customers building data products, analytics platforms, and solutions on AWS. When not building and designing modern data platforms, Vivek is a food enthusiast who also likes to explore new travel destinations and go on hikes.

Mikhail Vaynshteyn is a Solutions Architect with Amazon Web Services. Mikhail works with healthcare and life sciences customers to build solutions that help improve patients’ outcomes. Mikhail specializes in data analytics services.

Naresh Gautam is a Data Analytics and AI/ML leader at AWS with 20 years of experience, who enjoys helping customers architect highly available, high-performance, and cost-effective data analytics and AI/ML solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

Harsha Tadiparthi is a specialist Principal Solutions Architect, Analytics at AWS. He enjoys solving complex customer problems in databases and analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.

How Cargotec uses metadata replication to enable cross-account data sharing

Post Syndicated from Sumesh M R original https://aws.amazon.com/blogs/big-data/how-cargotec-uses-metadata-replication-to-enable-cross-account-data-sharing/

This is a guest blog post co-written with Sumesh M R from Cargotec and Tero Karttunen from Knowit Finland.

Cargotec (Nasdaq Helsinki: CGCBV) is a Finnish company that specializes in cargo handling solutions and services. They are headquartered in Helsinki, Finland, and operates globally in over 100 countries. With its leading cargo handling solutions and services, they are pioneers in their field. Through their unique position in ports, at sea, and on roads, they optimize global cargo flows and create sustainable customer value.

Cargotec captures terabytes of IoT telemetry data from their machinery operated by numerous customers across the globe. This data needs to be ingested into a data lake, transformed, and made available for analytics, machine learning (ML), and visualization. For this, Cargotec built an Amazon Simple Storage Service (Amazon S3) data lake and cataloged the data assets in AWS Glue Data Catalog. They chose AWS Glue as their preferred data integration tool due to its serverless nature, low maintenance, ability to control compute resources in advance, and scale when needed.

In this blog, we discuss the technical challenges faced by Cargotec in replicating their AWS Glue metadata across AWS accounts, and how they navigated these challenges successfully to enable cross-account data sharing.  By sharing their story, we hope to inspire readers facing similar challenges and provide insights into how our services can be customized to meet your specific needs.


Like many customers, Cargotec’s data lake is distributed across multiple AWS accounts that are owned by different teams. Cargotec wanted to find a solution to share datasets across accounts and use Amazon Athena to query them. To share the datasets, they needed a way to share access to the data and access to catalog metadata in the form of tables and views. Cargotec’s use cases also required them to create views that span tables and views across catalogs. Cargotec’s implementation covers three discrete AWS accounts, 25 databases, 150 tables, and 10 views.

Solution overview

Cargotec required a single catalog per account that contained metadata from their other AWS accounts. The solution that best fit their needs was to replicate metadata using an in-house version of a publicly available utility called Metastore Migration utility. Cargotec extended the utility by changing the overall orchestration layer by adding an Amazon SQS notification and an AWS Lambda. The approach was to programmatically copy and make available each catalog entity (databases, tables, and views) to all consumer accounts. This makes the tables or views local to the account where the query is being run, while the data still remains in its source S3 bucket.

Cargotec’s solution architecture

The following diagram summarizes the architecture and overall flow of events in Cargotec’s design.

Solution Architecture

Catalog entries from a source account are programmatically replicated to multiple target accounts using the following series of steps.

  1. An AWS Glue job (metadata exporter) runs daily on the source account. It reads the table and partition information from the source AWS Glue Data Catalog. Since the target account is used for analytical purposes and does not require real-time schema changes, the metadata exporter runs only once a day. Cargotec uses partition projection, which ensures that the new partitions are available in real-time.
  2. The job then writes the metadata to an S3 bucket in the same account. Please note that the solution doesn’t involve movement of the data across accounts. The target accounts read data from the source account S3 buckets. For guidance on setting up the right permissions, please see the Amazon Athena User Guide.
  3. After the metadata export has been completed, the AWS Glue job pushes a notification to an Amazon Simple Notification Service (Amazon SNS) topic. This message contains the S3 path to the latest metadata export. The SNS notification is Cargotec’s customization to the existing open-source utility.
  4. Every target account runs an AWS Lambda function that is notified when the source account SNS topic receives a push. In short, there are multiple subscriber Lambda functions (one per target account) for the source account SNS topics that get triggered when an export job is completed.
  5. Once triggered, the Lambda function then initiates an AWS Glue job (metadata importer) on the respective target account. The job receives as input the source account’s S3 path to the metadata that has been recently exported.
  6. Based on the path provided, the metadata importer reads the exported metadata from the source S3 bucket.
  7. The metadata importer now uses this information to create or update the corresponding catalog information in the target account.

All along the way, any errors are published to a separate SNS topic for logging and monitoring purposes. With this approach, Cargotec was able to create and consume views that span tables and views from multiple catalogs spread across different AWS accounts.


The core of the catalog replication utility is two AWS Glue scripts:

  • Metadata exporter – An AWS Glue job that reads the source data catalog and creates an export of the databases, tables, and partitions in an S3 bucket in the source account.
  • Metadata importer – An AWS Glue job that reads the export that was created by the metadata exporter and applies the metadata to target databases. This code is triggered by a Lambda function once files are written to S3. The job runs in the target account.

Metadata exporter

This section provides details on the AWS Glue job that exports the AWS Glue Data Catalog into an S3 location. The source code for the application is hosted the AWS Glue GitHub. Though this may need to be customized to suit your needs, we will go over the core components of the code in this blog.

Metadata exporter inputs

The application takes a few job input parameters as described below:

  • --mode key accepts either to-s3 or to-jdbc. The latter is used when the code is moving the metadata directly into a JDBC Hive Metastore. In the case of Cargotec, since we are moving the metadata to files on S3, the value for --mode will remain to-s3.
  • --output-path accepts an S3 location to which the exported metadata should be written. The code creates subdirectories corresponding to databases, tables, and partitions.
  • --database-names accepts a semicolon-separated list of databases on the source catalog that need to be replicated to the target

Reading the catalog

The metadata about the database, tables, and partitions are read from the AWS Glue catalog.

dyf = glue_context.create_dynamic_frame.from_options(
            connection_options = {
                            'catalog.name': ‘datacatalog’,
                            'catalog.database': database,
                            'catalog.region': region

The above code snippet reads the metadata into an AWS Glue DynamicFrame. The frame is then converted to a Spark DataFrame. It is filtered into individual DataFrames based on it being either part of a database, table, or partition. A schema is attached to the data frame using one of the below:

        StructField('items', ArrayType(
        StructField('type', StringType(), False)
        StructField('database', StringType(), False),
        StructField('type', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_TABLE_ITEM_SCHEMA, False), True)
        StructField('database', StringType(), False),
        StructField('table', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_PARTITION_ITEM_SCHEMA, False), True),
        StructField('type', StringType(), False)

For details on the individual item schema, refer to the schema definition on GitHub.

Persisting the metadata

After converting to a DataFrame with schema, it is persisted to the S3 location marked by the output-path parameter

databases.write.format('json').mode('overwrite').save(output_path + 'databases')
tables.write.format('json').mode('overwrite').save(output_path + 'tables')
partitions.write.format('json').mode('overwrite').save(output_path + 'partitions')

Exploring the output

Navigate to the S3 bucket that contains the output location, and you should be able to see the output metadata in format. An example export for a table would look like the following code snippet.

    "database": "default",
    "type": "table",
    "item": {
        "createTime": "1651241372000",
        "lastAccessTime": "0",
        "owner": "spark",
        "retention": 0,
        "name": "an_example_table",
        "tableType": "EXTERNAL_TABLE",
        "parameters": {
            "totalSize": "2734148",
            "EXTERNAL": "TRUE",
            "last_commit_time_sync": "20220429140907",
            "spark.sql.sources.schema.part.0": "{redacted_schema}",
            "numFiles": "1",
            "transient_lastDdlTime": "1651241371",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "hudi"
        "partitionKeys": [],
        "storageDescriptor": {
            "inputFormat": "org.apache.hudi.hadoop.HoodieParquetInputFormat",
            "compressed": false,
            "storedAsSubDirectories": false,
            "location": "s3://redacted_bucket_name/table/an_example_table",
            "numberOfBuckets": -1,
            "outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "bucketColumns": [],
            "columns": [{
                    "name": "_hoodie_commit_time",
                    "type": "string"
                    "name": "_hoodie_commit_seqno",
                    "type": "string"
            "parameters": {},
            "serdeInfo": {
                "serializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                "parameters": {
                    "hoodie.query.as.ro.table": "false",
                    "path": "s3://redacted_bucket_name/table/an_example_table",
                    "serialization.format": "1"
            "skewedInfo": {
                "skewedColumnNames": [],
                "skewedColumnValueLocationMaps": {},
                "skewedColumnValues": []
            "sortColumns": []

Once the export job is complete, the output S3 path will be pushed to an SNS topic. A Lambda function at the target account processes this message and invokes the import AWS Glue job by passing the S3 import location.

Metadata importer

The import job runs on the target account. The code for the job is available on GitHub. As with the exporter, you may need to customize it to suit your specific requirements, but the code as-is should work for most scenarios.

Metadata importer inputs

The inputs to the application are provided as job parameters. Below is a list of parameters that are used for the import process:

  • --mode key accepts either from-s3 or from-jdbc. The latter is used when migration is from a JDBC source to the AWS Glue Data Catalog. At Cargotec, the metadata is already written to Amazon S3, and hence the value for this key is always set to from-s3.
  • --region key accepts a valid AWS Region for the AWS Glue Catalog. The target Region is specified using this key.
  • --database-input-path key accepts the path to the file containing the database metadata. This is the output of the previous import job.
  • --table-input-path key accepts the path to the file containing the table metadata. This is the output of the previous import job.
  • --partition-input-path key accepts the path to the file containing the partition metadata. This is the output of the previous import job.

Reading the metadata

The metadata, as previously discussed, are files on Amazon S3. They are read into individual spark data frames with their respective schema information

databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA)
tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA)
partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA)

Loading the catalog

Once the spark data frames are read, they are converted to AWS Glue DynamicFrame and then loaded to the catalog, as shown in the following snippet.

               'catalog.name': datacatalog_name, 
               'catalog.region': region
                'catalog.name': datacatalog_name, 
                'catalog.region': region
                 'catalog.name': datacatalog_name, 
                 'catalog.region': region

Once the job concludes, you can query the target AWS Glue catalog to ensure the tables from the source have been synced with the destination. To keep things simple and easy to manage, instead of implementing a mechanism to identify tables that change over time, Cargotec updates the catalog information of all databases or tables that are configured in the export job.


Though the setup works effectively for Cargotec’s current business requirements, there are a few drawbacks to this approach, which are highlighted below:

  1. The solution involves code. Customizations were made to the existing open-source utility to be able to publish an SNS notification once an export is complete and a Lambda function to trigger the import process.
  2. The export process on the source account is a scheduled job. Hence there is no real-time sync between the source and target accounts. This was not a requirement for Cargotec’s business process.
  3. For tables that don’t use Athena partition projection, query results may be outdated until the new partitions are added to the metastore through MSCK REPAIR TABLE, ALTER TABLE ADD PARTITION, AWS Glue crawler, and so on.
  4. The current approach requires syncing all the tables across the source and target. If the requirement is to capture only the ones that changed instead of a scheduled daily export, the design needs to change and could benefit from the Amazon EventBridge integration with AWS Glue. An example implementation of using AWS Glue APIs to identify changes is shown in Identify source schema changes using AWS Glue.


In this blog post, we have explored a solution for cross-account sharing of data and tables that makes it possible for Cargotec to create views that combine data from multiple AWS accounts. We’re excited to share Cargotec’s success and believe the post has provided you with valuable insights and inspiration for your own projects.

We encourage you to explore our range of services and see how they can help you achieve your goals. Lastly, for more data and analytics blogs, feel free to bookmark the AWS Blogs.

About the Authors

Sumesh M R is a Full Stack Machine Learning Architect at Cargotec. He has several years of software engineering and ML background. Sumesh is an expert in Sagemaker and other AWS ML/Analytics services. He is passionate about data science and loves to explore the latest ML libraries and techniques. Before joining Cargotec, he worked as a Solution Architect at TCS. In his spare time, he loves to play cricket and badminton.

 Tero Karttunen is a Senior Cloud Architect at Knowit Finland. He advises clients on architecting and adopting Data Architectures that best serve their Data Analytics and Machine Learning needs. He has helped Cargotec in their data journey for more than two years. Outside of work, he enjoys running, winter sports, and role-playing games.

Arun A K is a Big Data Specialist Solutions Architect at AWS.  He works with customers to provide architectural guidance for running analytics solutions on AWS Glue, AWS Lake Formation, Amazon Athena, and Amazon EMR. In his free time, he likes to spend time with his friends and family.

Introducing Athena Provisioned Capacity

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/introducing-athena-provisioned-capacity/

Today we launch the ability to provision capacity to run your Amazon Athena queries.

Athena is a query service that makes it simple to analyze data in Amazon Simple Storage Service (Amazon S3) data lakes and 30 different data sources, including on-premises data sources or other cloud systems, using standard SQL queries. Athena is serverless, so there is no infrastructure to manage, and–until today–you pay only for the queries that you run. Starting today, you can get dedicated capacity for your queries and use new workload management features to prioritize, control, and scale your most important queries, paying only for the capacity you provision.

At AWS, 90 percent of the new services and features are driven by your direct feedback. Many of you Athena customers told us that, when running a large volume of queries, you sometimes experience queuing, which might slow down some applications or business processes. To work around this, you typically create a query prioritization mechanism to prioritize mission-critical queries over less critical, interactive, or exploratory queries. This prioritization mechanism helps to get the highest priority queries run first, at the price of building and maintaining code or business processes outside of Athena itself. You also told us it is difficult to forecast your Athena costs. Athena charges by the volume of data scanned, which is often difficult to predict as it depends on the size of your data set, the construction of the user queries, and the storage format for the data.

We heard this feedback, and today, we introduce the capability to provision dedicated query processing capacity at scale. With provisioned capacity, you provision a dedicated set of compute resources to run your queries. This always-on capacity can serve your business-critical queries with near-zero latency and no queuing. It gives you control over workload performance characteristics such as cost, concurrency, and query prioritization. Similar to provisioned capacity for other AWS services, you pay only for the capacity provisioned, not for the actual usage. With provisioned capacity, your Athena bills are predictable, and you do not have to limit user queries to stay within your monthly budget. I’ll share more about the billing model down below.

Behind the scenes, Athena maintains a large pool of compute in each AWS Region that it operates in. You can think of this as one large pool of compute, divided logically across customers. When you reserve capacity in Athena, the capacity is held for your exclusive use. You can choose which queries run on the capacity you provisioned and which run on Athena’s multi-tenant, on-demand capacity. Multiple queries can share the capacity you provisioned. You may add additional capacity units at any time, based on your evolving business requirements. You may also adjust the provisioned capacity down after a minimum period of time of 8 hours.

The unit of capacity is a Data Processing Unit (DPU). A single DPU is equivalent to four vCPU and 16 Gb RAM. The minimum capacity you may provision is 24 DPU for 8 hours. This new provisioned capacity for Athena is ideal for those of you running any volume of queries, but the sweet spot to start using provisioned capacity is when you spend $100 or more per month on Athena.

The number of DPUs you need depends on your goals and analysis patterns. For example, if you need queries to start immediately and without queuing, you should provision enough DPUs to meet your peak concurrent query demand. Provisioning fewer DPUs than your peak demand is allowed, but may result in queuing. When this occurs, queries are held in a queue and executed when capacity is available. If your goal is to run queries within a fixed budget, you can use the AWS Pricing Calculator to determine the number of DPUs that meets your budget. Lastly, remember that data size, storage format, and query construction influence the number of DPU a query requires. You can increase query performance by compressing, partitioning, and converting your data into columnar formats. Athena’s documentation provides you with guidelines to determine how much capacity you might require to run multiple queries at the same time.

How Does It Work?
Getting started is a three-step process. I navigate to the Athena page in the AWS Management Console and select Capacity Reservations on the left-side navigation menu.
(The console you see on this demo is based on the new Cloudscape open-source design system, yours might still see the traditional design on your AWS account.)

Athena Capacity Reservation landing page in the console

I select the Create capacity reservation button at the top right of the page.

On the Create capacity reservation page, I enter a Capacity reservation name and the number of DPUs I want to provision.

Athena Capacity Reservation - Create Reservation

I select Review to review my choices, and I select Create capacity reservation to create my reservation. After a brief period of time, the capacity reservation status becomes ✅ Active.

Athena Capacity Reservation - Status

The third and last step is to create a workgroup and assign the workgroup to the provisioned capacity. A workgroup is an Athena mechanism allowing you to separate users, teams, applications, or workloads to set limits on the amount of data each query or the entire workgroup can process and to track costs.

Queries belonging to the assigned workgroup will run on the capacity you provisioned. Capacity may be shared with multiple workgroups as long as they all use the same Athena engine version. This concept, depicted in the diagram below, is surfaced through a capacity allocation policy, which defines how capacity is assigned over workgroups. This gives you the flexibility to run queries with more or less capacity, depending on your business needs.

Athena Capacity Reservation - shared workgroups

To create a workgroup, I navigate to the Workgroups section of the Athena page. Then, I select Create workgroup.

Athena Capacity Reservation - Create Workgroup

I make sure the analytics engine selected in the reservation matches the one in the workgroup.

Athena Capacity Reservation - select analytic engineThen, I go back to the capacity reservation I just created, and I select Add workgroups to add the workgroup I just created.

Athena Capacity Reservation - Add workgroup

That’s it! Now that the configuration is ready, I can run my queries. Existing queries will run on the provisioned capacity unmodified. I make sure to select the workgroup I just created when I run queries. I choose a workgroup on the top right side of the query editor, or use the --work-group argument on the AWS command line, such as:

aws athena start-query-execution --work-group AWSNewsBlog

Athena Capacity Reservation - Select workgroup

Availability and Pricing
As I explained in the introduction, we charge for the number of DPUs you provisioned and the duration. The minimum duration is 8 hours, and after that, billing is per minute. You can release the provisioned capacity at any time. Cancellations within the minimum duration period are billed for the full term, and capacity is deallocated as soon as all currently running queries are terminated.

Queries run from a workgroup assigned to a provisioned capacity are not billed for the amount of data scanned. You effectively pay a flat rate depending on the provisioned capacity, not the usage. If you have excess capacity, you can reduce the number of DPUs you provisioned or add workgroups to consume the excess capacity.

As usual, the Athena pricing page has all the details.

Athena provisioned capacity is available today in US East (Ohio, N. Virginia), US West (Oregon), Asia Pacific (Singapore, Sydney, Tokyo), and Europe (Ireland, Stockholm) AWS Regions.

Go and provision your Athena capacity today!

— seb