Tag Archives: AWS Big Data

Simplify operational data processing in data lakes using AWS Glue and Apache Hudi

Post Syndicated from Ravi Itha original https://aws.amazon.com/blogs/big-data/simplify-operational-data-processing-in-data-lakes-using-aws-glue-and-apache-hudi/

The Analytics specialty practice of AWS Professional Services (AWS ProServe) helps customers across the globe with modern data architecture implementations on the AWS Cloud. A modern data architecture is an evolutionary architecture pattern designed to integrate a data lake, data warehouse, and purpose-built stores with a unified governance model. It focuses on defining standards and patterns to integrate data producers and consumers and move data between data lakes and purpose-built data stores securely and efficiently. Out of the many data producer systems that feed data to a data lake, operational databases are most prevalent, where operational data is stored, transformed, analyzed, and finally used to enhance business operations of an organization. With the emergence of open storage formats such as Apache Hudi and its native support from AWS Glue for Apache Spark, many AWS customers have started adding transactional and incremental data processing capabilities to their data lakes.

AWS has invested in native service integration with Apache Hudi and published technical contents to enable you to use Apache Hudi with AWS Glue (for example, refer to Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 1: Getting Started). In AWS ProServe-led customer engagements, the use cases we work on usually come with technical complexity and scalability requirements. In this post, we discuss a common use case in relation to operational data processing and the solution we built using Apache Hudi and AWS Glue.

Use case overview

AnyCompany Travel and Hospitality wanted to build a data processing framework to seamlessly ingest and process data coming from operational databases (used by reservation and booking systems) in a data lake before applying machine learning (ML) techniques to provide a personalized experience to its users. Due to the sheer volume of direct and indirect sales channels the company has, its booking and promotions data are organized in hundreds of operational databases with thousands of tables. Of those tables, some are larger (such as in terms of record volume) than others, and some are updated more frequently than others. In the data lake, the data to be organized in the following storage zones:

  1. Source-aligned datasets – These have an identical structure to their counterparts at the source
  2. Aggregated datasets – These datasets are created based on one or more source-aligned datasets
  3. Consumer-aligned datasets – These are derived from a combination of source-aligned, aggregated, and reference datasets enriched with relevant business and transformation logics, usually fed as inputs to ML pipelines or any consumer applications

The following are the data ingestion and processing requirements:

  1. Replicate data from operational databases to the data lake, including insert, update, and delete operations.
  2. Keep the source-aligned datasets up to date (typically within the range of 10 minutes to a day) in relation to their counterparts in the operational databases, ensuring analytics pipelines refresh consumer-aligned datasets for downstream ML pipelines in a timely fashion. Moreover, the framework should consume compute resources as optimally as possible per the size of the operational tables.
  3. To minimize DevOps and operational overhead, the company wanted to templatize the source code wherever possible. For example, to create source-aligned datasets in the data lake for 3,000 operational tables, the company didn’t want to deploy 3,000 separate data processing jobs. The smaller the number of jobs and scripts, the better.
  4. The company wanted the ability to continue processing operational data in the secondary Region in the rare event of primary Region failure.

As you can guess, the Apache Hudi framework can solve the first requirement. Therefore, we will put our emphasis on the other requirements. We begin with a Data lake reference architecture followed by an overview of operational data processing framework. By showing you our open-source solution on GitHub, we delve into framework components and walk through their design and implementation aspects. Finally, by testing the framework, we summarize how it meets the aforementioned requirements.

Data lake reference architecture

Let’s begin with a big picture: a data lake solves a variety of analytics and ML use cases dealing with internal and external data producers and consumers. The following diagram represents a generic data lake architecture. To ingest data from operational databases to an Amazon Simple Storage Service (Amazon S3) staging bucket of the data lake, either AWS Database Migration Service (AWS DMS) or any AWS partner solution from AWS Marketplace that has support for change data capture (CDC) can fulfill the requirement. AWS Glue is used to create source-aligned and consumer-aligned datasets and separate AWS Glue jobs to do feature engineering part of ML engineering and operations. Amazon Athena is used for interactive querying and AWS Lake Formation is used for access controls.

Data Lake Reference Architecture

Operational data processing framework

The operational data processing (ODP) framework contains three components: File Manager, File Processor, and Configuration Manager. Each component runs independently to solve a portion of the operational data processing use case. We have open-sourced this framework on GitHub—you can clone the code repo and inspect it while we walk you through the design and implementation of the framework components. The source code is organized in three folders, one for each component, and if you customize and adopt this framework for your use case, we recommend promoting these folders as separate code repositories in your version control system. Consider using the following repository names:

  1. aws-glue-hudi-odp-framework-file-manager
  2. aws-glue-hudi-odp-framework-file-processor
  3. aws-glue-hudi-odp-framework-config-manager

With this modular approach, you can independently deploy the components to your data lake environment by following your preferred CI/CD processes. As illustrated in the preceding diagram, these components are deployed in conjunction with a CDC solution.

Component 1: File Manager

File Manager detects files emitted by a CDC process such as AWS DMS and tracks them in an Amazon DynamoDB table. As shown in the following diagram, it consists of an Amazon EventBridge event rule, an Amazon Simple Queue Service (Amazon SQS) queue, an AWS Lambda function, and a DynamoDB table. The EventBridge rule uses Amazon S3 Event Notifications to detect the arrival of CDC files in the S3 bucket. The event rule forwards the object event notifications to the SQS queue as messages. The File Manager Lambda function consumes those messages, parses the metadata, and inserts the metadata to the DynamoDB table odpf_file_tracker. These records will then be processed by File Processor, which we discuss in the next section.

ODPF Component: File Manager

Component 2: File Processor

File Processor is the workhorse of the ODP framework. It processes files from the S3 staging bucket, creates source-aligned datasets in the raw S3 bucket, and adds or updates metadata for the datasets (AWS Glue tables) in the AWS Glue Data Catalog.

We use the following terminology when discussing File Processor:

  1. Refresh cadence – This represents the data ingestion frequency (for example, 10 minutes). It usually goes with AWS Glue worker type (one of G.1X, G.2X, G.4X, G.8X, G.025X, and so on) and batch size.
  2. Table configuration – This includes the Hudi configuration (primary key, partition key, pre-combined key, and table type (Copy on Write or Merge on Read)), table data storage mode (historical or current snapshot), S3 bucket used to store source-aligned datasets, AWS Glue database name, AWS Glue table name, and refresh cadence.
  3. Batch size – This numeric value is used to split tables into smaller batches and process their respective CDC files in parallel. For example, a configuration of 50 tables with a 10-minute refresh cadence and a batch size of 5 results in a total of 10 AWS Glue job runs, each processing CDC files for 5 tables.
  4. Table data storage mode – There are two options:
    • Historical – This table in the data lake stores historical updates to records (always append).
    • Current snapshot – This table in the data lake stores latest versioned records (upserts) with the ability to use Hudi time travel for historical updates.
  5. File processing state machine – It processes CDC files that belong to tables that share a common refresh cadence.
  6. EventBridge rule association with the file processing state machine – We use a dedicated EventBridge rule for each refresh cadence with the file processing state machine as target.
  7. File processing AWS Glue job – This is a configuration-driven AWS Glue extract, transform, and load (ETL) job that processes CDC files for one or more tables.

File Processor is implemented as a state machine using AWS Step Functions. Let’s use an example to understand this. The following diagram illustrates running File Processor state machine with a configuration that includes 18 operational tables, a refresh cadence of 10 minutes, a batch size of 5, and an AWS Glue worker type of G.1X.

ODP framework component: File Processor

The workflow includes the following steps:

  1. The EventBridge rule triggers the File Processor state machine every 10 minutes.
  2. Being the first state in the state machine, the Batch Manager Lambda function reads configurations from DynamoDB tables.
  3. The Lambda function creates four batches: three of them will be mapped to five operational tables each, and the fourth one is mapped to three operational tables. Then it feeds the batches to the Step Functions Map state.
  4. For each item in the Map state, the File Processor Trigger Lambda function will be invoked, which in turn runs the File Processor AWS Glue job.
  5. Each AWS Glue job performs the following actions:
    • Checks the status of an operational table and acquires a lock when it is not processed by any other job. The odpf_file_processing_tracker DynamoDB table is used for this purpose. When a lock is acquired, it inserts a record in the DynamoDB table with the status updating_table for the first time; otherwise, it updates the record.
    • Processes the CDC files for the given operational table from the S3 staging bucket and creates a source-aligned dataset in the S3 raw bucket. It also updates technical metadata in the AWS Glue Data Catalog.
    • Updates the status of the operational table to completed in the odpf_file_processing_tracker table. In case of processing errors, it updates the status to refresh_error and logs the stack trace.
    • It also inserts this record into the odpf_file_processing_tracker_history DynamoDB table along with additional details such as insert, update, and delete row counts.
    • Moves the records that belong to successfully processed CDC files from odpf_file_tracker to the odpf_file_tracker_history table with file_ingestion_status set to raw_file_processed.
    • Moves to the next operational table in the given batch.
    • Note: a failure to process CDC files for one of the operational tables of a given batch does not impact the processing of other operational tables.

Component 3: Configuration Manager

Configuration Manager is used to insert configuration details to the odpf_batch_config and odpf_raw_table_config tables. To keep this post concise, we provide two architecture patterns in the code repo and leave the implementation details to you.

Solution overview

Let’s test the ODP framework by replicating data from 18 operational tables to a data lake and creating source-aligned datasets with 10-minute refresh cadence. We use Amazon Relational Database Service (Amazon RDS) for MySQL to set up an operational database with 18 tables, upload the New York City Taxi – Yellow Trip Data dataset, set up AWS DMS to replicate data to Amazon S3, process the files using the framework, and finally validate the data using Amazon Athena.

Create S3 buckets

For instructions on creating an S3 bucket, refer to Creating a bucket. For this post, we create the following buckets:

  1. odpf-demo-staging-EXAMPLE-BUCKET – You will use this to migrate operational data using AWS DMS
  2. odpf-demo-raw-EXAMPLE-BUCKET – You will use this to store source-aligned datasets
  3. odpf-demo-code-artifacts-EXAMPLE-BUCKET – You will use this to store code artifacts

Deploy File Manager and File Processor

Deploy File Manager and File Processor by following instructions from this README and this README, respectively.

Set up Amazon RDS for MySQL

Complete the following steps to set up Amazon RDS for MySQL as the operational data source:

  1. Provision Amazon RDS for MySQL. For instructions, refer to Create and Connect to a MySQL Database with Amazon RDS.
  2. Connect to the database instance using MySQL Workbench or DBeaver.
  3. Create a database (schema) by running the SQL command CREATE DATABASE taxi_trips;.
  4. Create 18 tables by running the SQL commands in the ops_table_sample_ddl.sql script.

Populate data to the operational data source

Complete the following steps to populate data to the operational data source:

  1. To download the New York City Taxi – Yellow Trip Data dataset for January 2021 (Parquet file), navigate to NYC TLC Trip Record Data, expand 2021, and choose Yellow Taxi Trip records. A file called yellow_tripdata_2021-01.parquet will be downloaded to your computer.
  2. On the Amazon S3 console, open the bucket odpf-demo-staging-EXAMPLE-BUCKET and create a folder called nyc_yellow_trip_data.
  3. Upload the yellow_tripdata_2021-01.parquet file to the folder.
  4. Navigate to the bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET and create a folder called glue_scripts.
  5. Download the file load_nyc_taxi_data_to_rds_mysql.py from the GitHub repo and upload it to the folder.
  6. Create an AWS Identity and Access Management (IAM) policy called load_nyc_taxi_data_to_rds_mysql_s3_policy. For instructions, refer to Creating policies using the JSON editor. Use the odpf_setup_test_data_glue_job_s3_policy.json policy definition.
  7. Create an IAM role called load_nyc_taxi_data_to_rds_mysql_glue_role. Attach the policy created in the previous step.
  8. On the AWS Glue console, create a connection for Amazon RDS for MySQL. For instructions, refer to Adding a JDBC connection using your own JDBC drivers and Setting up a VPC to connect to Amazon RDS data stores over JDBC for AWS Glue. Name the connection as odpf_demo_rds_connection.
  9. In the navigation pane of the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  10. Choose the file load_nyc_taxi_data_to_rds_mysql.py and choose Create.
  11. Complete the following steps to create your job:
    • Provide a name for the job, such as load_nyc_taxi_data_to_rds_mysql.
    • For IAM role, choose load_nyc_taxi_data_to_rds_mysql_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Advanced properties, Connections, select the connection you created earlier.
    • Under Job parameters, add the following parameters:
      • input_sample_data_path = s3://odpf-demo-staging-EXAMPLE-BUCKET/nyc_yellow_trip_data/yellow_tripdata_2021-01.parquet
      • schema_name = taxi_trips
      • table_name = table_1
      • rds_connection_name = odpf_demo_rds_connection
    • Choose Save.
  12. On the Actions menu, run the job.
  13. Go back to your MySQL Workbench or DBeaver and validate the record count by running the SQL command select count(1) row_count from taxi_trips.table_1. You will get an output of 1369769.
  14. Populate the remaining 17 tables by running the SQL commands from the populate_17_ops_tables_rds_mysql.sql script.
  15. Get the row count from the 18 tables by running the SQL commands from the ops_data_validation_query_rds_mysql.sql script. The following screenshot shows the output.
    Record volumes (for 18 Tables) in Operational Database

Configure DynamoDB tables

Complete the following steps to configure the DynamoDB tables:

  1. Download file load_ops_table_configs_to_ddb.py from the GitHub repo and upload it to the folder glue_scripts in the S3 bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET.
  2. Create an IAM policy called load_ops_table_configs_to_ddb_ddb_policy. Use the odpf_setup_test_data_glue_job_ddb_policy.json policy definition.
  3. Create an IAM role called load_ops_table_configs_to_ddb_glue_role. Attach the policy created in the previous step.
  4. On the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  5. Choose the file load_ops_table_configs_to_ddb.py and choose Create.
  6. Complete the following steps to create a job:
    • Provide a name, such as load_ops_table_configs_to_ddb.
    • For IAM role, choose load_ops_table_configs_to_ddb_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Job parameters, add the following parameters
      • batch_config_ddb_table_name = odpf_batch_config
      • raw_table_config_ddb_table_name = odpf_demo_taxi_trips_raw
      • aws_region = e.g., us-west-1
    • Choose Save.
  7. On the Actions menu, run the job.
  8. On the DynamoDB console, get the item count from the tables. You will find 1 item in the odpf_batch_config table and 18 items in the odpf_demo_taxi_trips_raw table.

Set up a database in AWS Glue

Complete the following steps to create a database:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Databases.
  2. Create a database called odpf_demo_taxi_trips_raw.

Set up AWS DMS for CDC

Complete the following steps to set up AWS DMS for CDC:

  1. Create an AWS DMS replication instance. For Instance class, choose dms.t3.medium.
  2. Create a source endpoint for Amazon RDS for MySQL.
  3. Create target endpoint for Amazon S3. To configure the S3 endpoint settings, use the JSON definition from dms_s3_endpoint_setting.json.
  4. Create an AWS DMS task.
    • Use the source and target endpoints created in the previous steps.
    • To create AWS DMS task mapping rules, use the JSON definition from dms_task_mapping_rules.json.
    • Under Migration task startup configuration, select Automatically on create.
  5. When the AWS DMS task starts running, you will see a task summary similar to the following screenshot.
    DMS Task Summary
  6. In the Table statistics section, you will see an output similar to the following screenshot. Here, the Full load rows and Total rows columns are important metrics whose counts should match with the record volumes of the 18 tables in the operational data source.
    DMS Task Statistics
  7. As a result of successful full load completion, you will find Parquet files in the S3 staging bucket—one Parquet file per table in a dedicated folder, similar to the following screenshot. Similarly, you will find 17 such folders in the bucket.
    DMS Output in S3 Staging Bucket for Table 1

File Manager output

The File Manager Lambda function consumes messages from the SQS queue, extracts metadata for the CDC files, and inserts one item per file to the odpf_file_tracker DynamoDB table. When you check the items, you will find 18 items with file_ingestion_status set to raw_file_landed, as shown in the following screenshot.

CDC Files in File Tracker DynamoDB Table

File Processor output

  1. On the subsequent tenth minute (since the activation of the EventBridge rule), the event rule triggers the File Processor state machine. On the Step Functions console, you will notice that the state machine is invoked, as shown in the following screenshot.
    File Processor State Machine Run Summary
  2. As shown in the following screenshot, the Batch Generator Lambda function creates four batches and constructs a Map state for parallel running of the File Processor Trigger Lambda function.
    File Processor State Machine Run Details
  3. Then, the File Processor Trigger Lambda function runs the File Processor Glue Job, as shown in the following screenshot.
    File Processor Glue Job Parallel Runs
  4. Then, you will notice that the File Processor Glue Job runs create source-aligned datasets in Hudi format in the S3 raw bucket. For Table 1, you will see an output similar to the following screenshot. There will be 17 such folders in the S3 raw bucket.
    Data in S3 raw bucket
  5. Finally, in AWS Glue Data Catalog, you will notice 18 tables created in the odpf_demo_taxi_trips_raw database, similar to the following screenshot.
    Tables in Glue Database

Data validation

Complete the following steps to validate the data:

  1. On the Amazon Athena console, open the query editor, and select a workgroup or create a new workgroup.
  2. Choose AwsDataCatalog for Data source and odpf_demo_taxi_trips_raw for Database.
  3. Run the raw_data_validation_query_athena.sql SQL query. You will get an output similar to the following screenshot.
    Raw Data Validation via Amazon Athena

Validation summary: The counts in Amazon Athena match with the counts of the operational tables and it proves that the ODP framework has processed all the files and records successfully. This concludes the demo. To test additional scenarios, refer to Extended Testing in the code repo.

Outcomes

Let’s review how the ODP framework addressed the aforementioned requirements.

  1. As discussed earlier in this post, by logically grouping tables by refresh cadence and associating them to EventBridge rules, we ensured that the source-aligned tables are refreshed by the File Processor AWS Glue jobs. With the AWS Glue worker type configuration setting, we selected the appropriate compute resources while running the AWS Glue jobs (the instances of the AWS Glue job).
  2. By applying table-specific configurations (from odpf_batch_config and odpf_raw_table_config) dynamically, we were able to use one AWS Glue job to process CDC files for 18 tables.
  3. You can use this framework to support a variety of data migration use cases that require quicker data migration from on-premises storage systems to data lakes or analytics platforms on AWS. You can reuse File Manager as is and customize File Processor to work with other storage frameworks such as Apache Iceberg, Delta Lake, and purpose-built data stores such as Amazon Aurora and Amazon Redshift.
  4. To understand how the ODP framework met the company’s disaster recovery (DR) design criterion, we first need to understand the DR architecture strategy at a high level. The DR architecture strategy has the following aspects:
    • One AWS account and two AWS Regions are used for primary and secondary environments.
    • The data lake infrastructure in the secondary Region is kept in sync with the one in the primary Region.
    • Data is stored in S3 buckets, metadata data is stored in the AWS Glue Data Catalog, and access controls in Lake Formation are replicated from the primary to secondary Region.
    • The data lake source and target systems have their respective DR environments.
    • CI/CD tooling (version control, CI server, and so on) are to be made highly available.
    • The DevOps team needs to be able to deploy CI/CD pipelines of analytics frameworks (such as this ODP framework) to either the primary or secondary Region.
    • As you can imagine, disaster recovery on AWS is a vast subject, so we keep our discussion to the last design aspect.

By designing the ODP framework with three components and externalizing operational table configurations to DynamoDB global tables, the company was able to deploy the framework components to the secondary Region (in the rare event of a single-Region failure) and continue to process CDC files from the point it last processed in the primary Region. Because the CDC file tracking and processing audit data is replicated to the DynamoDB replica tables in the secondary Region, the File Manager microservice and File Processor can seamlessly run.

Clean up

When you’re finished testing this framework, you can delete the provisioned AWS resources to avoid any further charges.

Conclusion

In this post, we took a real-world operational data processing use case and presented you the framework we developed at AWS ProServe. We hope this post and the operational data processing framework using AWS Glue and Apache Hudi will expedite your journey in integrating operational databases into your modern data platforms built on AWS.


About the authors

Ravi-IthaRavi Itha is a Principal Consultant at AWS Professional Services with specialization in data and analytics and generalist background in application development. Ravi helps customers with enterprise data strategy initiatives across insurance, airlines, pharmaceutical, and financial services industries. In his 6-year tenure at Amazon, Ravi has helped the AWS builder community by publishing approximately 15 open-source solutions (accessible via GitHub handle), four blogs, and reference architectures. Outside of work, he is passionate about reading India Knowledge Systems and practicing Yoga Asanas.

srinivas-kandiSrinivas Kandi is a Data Architect at AWS Professional Services. He leads customer engagements related to data lakes, analytics, and data warehouse modernizations. He enjoys reading history and civilizations.

Derive operational insights from application logs using Automated Data Analytics on AWS

Post Syndicated from Aparajithan Vaidyanathan original https://aws.amazon.com/blogs/big-data/derive-operational-insights-from-application-logs-using-automated-data-analytics-on-aws/

Automated Data Analytics (ADA) on AWS is an AWS solution that enables you to derive meaningful insights from data in a matter of minutes through a simple and intuitive user interface. ADA offers an AWS-native data analytics platform that is ready to use out of the box by data analysts for a variety of use cases. With ADA, teams can ingest, transform, govern, and query diverse datasets from a range of data sources without requiring specialist technical skills. ADA provides a set of pre-built connectors to ingest data from a wide range of sources including Amazon Simple Storage Service (Amazon S3), Amazon Kinesis Data Streams, Amazon CloudWatch, Amazon CloudTrail, and Amazon DynamoDB as well as many others.

ADA provides a foundational platform that can be used by data analysts in a diverse set of use cases including IT, finance, marketing, sales, and security. ADA’s out-of-the-box CloudWatch data connector allows data ingestion from CloudWatch logs in the same AWS account in which ADA has been deployed, or from a different AWS account.

In this post, we demonstrate how an application developer or application tester is able to use ADA to derive operational insights of applications running in AWS. We also demonstrate how you can use the ADA solution to connect to different data sources in AWS. We first deploy the ADA solution into an AWS account and set up the ADA solution by creating data products using data connectors. We then use the ADA Query Workbench to join the separate datasets and query the correlated data, using familiar Structured Query Language (SQL), to gain insights. We also demonstrate how ADA can be integrated with business intelligence (BI) tools such as Tableau to visualize the data and to build reports.

Solution overview

In this section, we present the solution architecture for the demo and explain the workflow. For the purposes of demonstration, the bespoke application is simulated using an AWS Lambda function that emits logs in Apache Log Format at a preset interval using Amazon EventBridge. This standard format can be produced by many different web servers and be read by many log analysis programs. The application (Lambda function) logs are sent to a CloudWatch log group. The historical application logs are stored in an S3 bucket for reference and for querying purposes. A lookup table with a list of HTTP status codes along with the descriptions is stored in a DynamoDB table. These three serve as sources from which data is ingested into ADA for correlation, query, and analysis. We deploy the ADA solution into an AWS account and set up ADA. We then create the data products within ADA for the CloudWatch log group, S3 bucket, and DynamoDB. As the data products are configured, ADA provisions data pipelines to ingest the data from the sources. With the ADA Query Workbench, you can query the ingested data using plain SQL for application troubleshooting or issue diagnosis.

The following diagram provides an overview of the architecture and workflow of using ADA to gain insights into application logs.

The workflow includes the following steps:

  1. A Lambda function is scheduled to be triggered at 2-minute intervals using EventBridge.
  2. The Lambda function emits logs that are stored at a specified CloudWatch log group under /aws/lambda/CdkStack-AdaLogGenLambdaFunction. The application logs are generated using the Apache Log Format schema but stored in the CloudWatch log group in JSON format.
  3. The data products for CloudWatch, Amazon S3, and DynamoDB are created in ADA. The CloudWatch data product connects to the CloudWatch log group where the application (Lambda function) logs are stored. The Amazon S3 connector connects to an S3 bucket folder where the historical logs are stored. The DynamoDB connector connects to a DynamoDB table where the status codes that are referred by the application and historical logs are stored.
  4. For each of the data products, ADA deploys the data pipeline infrastructure to ingest data from the sources. When the data ingestion is complete, you can write queries using SQL via the ADA Query Workbench.
  5. You can log in to the ADA portal and compose SQL queries from the Query Workbench to gain insights in to the application logs. You can optionally save the query and share the query with other ADA users in the same domain. The ADA query feature is powered by Amazon Athena, which is a serverless, interactive analytics service that provides a simplified, flexible way to analyze petabytes of data.
  6. Tableau is configured to access the ADA data products via ADA egress endpoints. You then create a dashboard with two charts. The first chart is a heat map that shows the prevalence of HTTP error codes correlated with the application API endpoints. The second chart is a bar chart that shows the top 10 application APIs with a total count of HTTP error codes from the historical data.

Prerequisites

For this post, you need to complete the following prerequisites:

  1. Install the AWS Command Line Interface (AWS CLI), AWS Cloud Development Kit (AWS CDK) prerequisites, TypeScript-specific prerequisites, and git.
  2. Deploy the ADA solution in your AWS account in the us-east-1 Region.
    1. Provide an admin email while launching the ADA AWS CloudFormation stack. This is needed for ADA to send the root user password. An admin phone number is required to receive a one-time password message if multi-factor authentication (MFA) is enabled. For this demo, MFA is not enabled.
  3. Build and deploy the sample application (available on the GitHub repo) solution so that the following resources can be provisioned in your account in the us-east-1 Region:
    1. A Lambda function that simulates the logging application and an EventBridge rule that invokes the application function at 2-minute intervals.
    2. An S3 bucket with the relevant bucket policies and a CSV file that contains the historical application logs.
    3. A DynamoDB table with the lookup data.
    4. Relevant AWS Identity and Access Management (IAM) roles and permissions required for the services.
  4. Optionally, install Tableau Desktop, a third-party BI provider. For this post, we use Tableau Desktop version 2021.2. There is a cost involved in using a licensed version of the Tableau Desktop application. For additional details, refer to the Tableau licensing information.

Deploy and set up ADA

After ADA is deployed successfully, you can log in using the admin email provided during the installation. You then create a domain named CW_Domain. A domain is a user-defined collection of data products. For example, a domain might be a team or a project. Domains provide a structured way for users to organize their data products and manage access permissions.

  1. On the ADA console, choose Domains in the navigation pane.
  2. Choose Create domain.
  3. Enter a name (CW_Domain) and description, then choose Submit.

Set up the sample application infrastructure using AWS CDK

The AWS CDK solution that deploys the demo application is hosted on GitHub. The steps to clone the repo and to set up the AWS CDK project are detailed in this section. Before you run these commands, be sure to configure your AWS credentials. Create a folder, open the terminal, and navigate to the folder where the AWS CDK solution needs to be installed. Run the following code:

gh repo clone aws-samples/operational-insights-with-automated-data-analytics-on-aws
cd operational-insights-with-automated-data-analytics-on-aws
npm install
npm run build
cdk synth
cdk deploy

These steps perform the following actions:

  • Install the library dependencies
  • Build the project
  • Generate a valid CloudFormation template
  • Deploy the stack using AWS CloudFormation in your AWS account

The deployment takes about 1–2 minutes and creates the DynamoDB lookup table, Lambda function, and S3 bucket containing the historical log files as outputs. Copy these values to a text editing application, such as Notepad.

Create ADA data products

We create three different data products for this demo, one for each data source that you’ll be querying to gain operational insights. A data product is a dataset (a collection of data such as a table or a CSV file) that has been successfully imported into ADA and that can be queried.

Create a CloudWatch data product

First, we create a data product for the application logs by setting up ADA to ingest the CloudWatch log group for the sample application (Lambda function). Use the CdkStack.LambdaFunction output to get the Lambda function ARN and locate the corresponding CloudWatch log group ARN on the CloudWatch console.

Then complete the following steps:

  1. On the ADA console, navigate to the ADA domain and create a CloudWatch data product.
  2. For Name¸ enter a name.
  3. For Source type, choose Amazon CloudWatch.
  4. Disable Automatic PII.

ADA has a feature that automatically detects personally identifiable information (PII) data during import that is enabled by default. For this demo, we disable this option for the data product because the discovery of PII data is not in the scope of this demo.

  1. Choose Next.
  2. Search for and choose the CloudWatch log group ARN copied from the previous step.
  3. Copy the log group ARN.
  4. On the data product page, enter the log group ARN.
  5. For CloudWatch Query, enter a query that you want ADA to get from the log group.

In this demo, we query the @message field because we’re interested in getting the application logs from the log group.

  1. Select how the data updates are triggered after initial import.

ADA can be configured to ingest the data from the source at flexible intervals (up to 15 minutes or later) or on demand. For the demo, we set the data updates to run hourly.

  1. Choose Next.

Next, ADA will connect to the log group and query the schema. Because the logs are in Apache Log Format, we transform the logs into separate fields so that we can run queries on the specific log fields. ADA provides four default transformations and supports custom transformation through a Python script. In this demo, we run a custom Python script to transform the JSON message field into Apache Log Format fields.

  1. Choose Transform schema.
  2. Choose Create new transform.
  3. Upload the apache-log-extractor-transform.py script from the /asset/transform_logs/ folder.
  4. Choose Submit.

ADA will transform the CloudWatch logs using the script and present the processed schema.

  1. Choose Next.
  2. In the last step, review the steps and choose Submit.

ADA will start the data processing, create the data pipelines, and prepare the CloudWatch log groups to be queried from the Query Workbench. This process will take a few minutes to complete and will be shown on the ADA console under Data Products.

Create an Amazon S3 data product

We repeat the steps to add the historical logs from the Amazon S3 data source and look up reference data from the DynamoDB table. For these two data sources, we don’t create custom transforms because the data formats are in CSV (for historical logs) and key attributes (for reference lookup data).

  1. On the ADA console, create a new data product.
  2. Enter a name (hist_logs) and choose Amazon S3.
  3. Copy the Amazon S3 URI (the text after arn:aws:s3:::) from the CdkStack.S3 output variable and navigate to the Amazon S3 console.
  4. In the search box, enter the copied text, open the S3 bucket, select the /logs folder, and choose Copy S3 URI.

The historical logs are stored in this path.

  1. Navigate back to the ADA console and enter the copied S3 URI for S3 location.
  2. For Update Trigger, select On Demand because the historical logs are updated at an unspecified frequency.
  3. For Update Policy, select Append to append newly imported data to the existing data.
  4. Choose Next.

ADA processes the schema for the files in the selected folder path. Because the logs are in CSV format, ADA is able to read the column names without requiring additional transformations. However, the columns status_code and request_size are inferred as long type by ADA. We want to keep the column data types consistent among the data products so that we can join the data tables and query the data. The column status_code will be used to create joins across the data tables.

  1. Choose Transform schema to change the data types of the two columns to string data type.

Note the highlighted column names in the Schema preview pane prior to applying the data type transformations.

  1. In the Transform plan pane, under Built-in transforms, choose Apply Mapping.

This option allows you to change the data type from one type to another.

  1. In the Apply Mapping section, deselect Drop other fields.

If this option is not disabled, only the transformed columns will be preserved and all other columns will be dropped. Because we want to retain all the columns, we disable this option.

  1. Under Field Mappings¸ for Old name and New name, enter status_code and for New type, enter string.
  2. Choose Add Item.
  3. For Old name and New name¸ enter request_size and for New data type, enter string.
  4. Choose Submit.

ADA will apply the mapping transformation on the Amazon S3 data source. Note the column types in the Schema preview pane.

  1. Choose View sample to preview the data with the transformation applied.

ADA will display the PII data acknowledgement to ensure that either only authorized users can view the data or that the dataset doesn’t contain any PII data.

  1. Choose Agree to continue to view the sample data.

Note that the schema is identical to the CloudWatch log group schema because both the current application and historical application logs are in Apache Log Format.

  1. In the final step, review the configuration and choose Submit.

ADA starts processing the data from the Amazon S3 source, creates the backend infrastructure, and prepares the data product. This process takes a few minutes depending upon the size of the data.

Create a DynamoDB data product

Lastly, we create a DynamoDB data product. Complete the following steps:

  1. On the ADA console, create a new data product.
  2. Enter a name (lookup) and choose Amazon DynamoDB.
  3. Enter the Cdk.DynamoDBTable output variable for DynamoDB Table ARN.

This table contains key attributes that will be used as a lookup table in this demo. For the lookup data, we are using the HTTP codes and long and short descriptions of the codes. You can also use PostgreSQL, MySQL, or a CSV file source as an alternative.

  1. For Update Trigger, select On-Demand.

The updates will be on demand because the lookup is mostly for reference purpose while querying and any updates to the lookup data can be updated in ADA using on-demand triggers.

  1. Choose Next.

ADA reads the schema from the underlying DynamoDB schema and presents the column name and type for optional transformation. We will proceed with the default schema selection because the column types are consistent with the types from the CloudWatch log group and Amazon S3 CSV data source. Having data types that are consistent across the data sources allows us to write queries to fetch records by joining the tables using the column fields. For example, the column key in the DynamoDB schema corresponds to the status_code in the Amazon S3 and CloudWatch data products. We can write queries that can join the three tables using the column name key. An example is shown in the next section.

  1. Choose Continue with current schema.
  2. Review the configuration and choose Submit.

ADA will process the data from the DynamoDB table data source and prepare the data product. Depending upon the size of the data, this process takes a few minutes.

Now we have all the three data products processed by ADA and available for you to run queries.

Use the Query Workbench to query the data

ADA allows you to run queries against the data products while abstracting the data source and making it accessible using SQL (Structured Query Language). You can write queries and join the tables just as you would query against tables in a relational database. We demonstrate ADA’s querying capability via two user scenarios. In both the scenarios, we join an application log dataset to the error codes lookup table. In the first use case, we query the current application logs to identify the top 10 most accessed application endpoints along with the corresponding HTTP status codes:

--Query the top 10 Application endpoints along with the corresponding HTTP request type and HTTP status code.

SELECT logs.endpoint AS Application_EndPoint, logs.http_request AS REQUEST, count(logs.endpoint) as Endpoint_Count, ref.key as HTTP_Status_Code, ref.short as Description
FROM cw_domain.cloud_watch_application_logs logs
INNER JOIN cw_domain.lookup ref ON logs.status_code = ref.key
where logs.status_code LIKE '4%%' OR logs.status_code LIKE '5%%' -- = '/v1/server'
GROUP BY logs.endpoint, logs.http_request, ref.key, ref.short
ORDER BY Endpoint_Count DESC
LIMIT 10

In the second example, we query the historical logs table to get the top 10 application endpoints with the most errors to understand the endpoint call pattern:

-- Query Historical Logs to get the top 10 Application Endpoints with most number of errors along with an explanation of the error code.

SELECT endpoint as Application_EndPoint, count(status_code) as Error_Count, ref.long as Description FROM cw_domain.hist_logs hist
INNER JOIN cw_domain.lookup ref ON hist.status_code = ref.key
WHERE hist.status_code LIKE '4%%' OR hist.status_code LIKE '5%%'
GROUP BY endpoint, status_code, ref.long
ORDER BY Error_Count desc
LIMIT 10

In addition to querying, you can optionally save the query and share the saved query with other users in the same domain. The shared queries are accessible directly from the Query Workbench. The query results can also be exported to CSV format.

Visualize ADA data products in Tableau

ADA offers the ability to connect to third-party BI tools to visualize data and create reports from the ADA data products. In this demo, we use ADA’s native integration with Tableau to visualize the data from the three data products we configured earlier. Using Tableau’s Athena connector and following the steps in Tableau configuration, you can configure ADA as a data source in Tableau. After a successful connection has been established between Tableau and ADA, Tableau will populate the three data products under the Tableau catalog cw_domain.

We then establish a relationship across the three databases using the HTTP status code as the joining column, as shown in the following screenshot. Tableau allows us to work in online and offline mode with the data sources. In online mode, Tableau will connect to ADA and query the data products live. In offline mode, we can use the Extract option to extract the data from ADA and import the data in to Tableau. In this demo, we import the data in to Tableau to make the querying more responsive. We then save the Tableau workbook. We can inspect the data from the data sources by choosing the database and Update Now.

With the data source configurations in place in Tableau, we can create custom reports, charts, and visualizations on the ADA data products. Let’s consider two use cases for visualizations.

As shown in the following figure, we visualized the frequency of the HTTP errors by application endpoints using Tableau’s built-in heat map chart. We filtered out the HTTP status codes to only include error codes in the 4xx and 5xx range.

We also created a bar chart to depict the application endpoints from the historical logs ordered by the count of HTTP error codes. In this chart, we can see that the /v1/server/admin endpoint has generated the most HTTP error status codes.

Clean up

Cleaning up the sample application infrastructure is a two-step process. First, to remove the infrastructure provisioned for the purposes of this demo, run the following command in the terminal:

cdk destroy

For the following question, enter y and AWS CDK will delete the resources deployed for the demo:

Are you sure you want to delete: CdkStack (y/n)? y

Alternatively, you can remove the resources via the AWS CloudFormation console by navigating to the CdkStack stack and choosing Delete.

The second step is to uninstall ADA. For instructions, refer to Uninstall the solution.

Conclusion

In this post, we demonstrated how to use the ADA solution to derive insights from application logs stored across two different data sources. We demonstrated how to install ADA on an AWS account and deploy the demo components using AWS CDK. We created data products in ADA and configured the data products with the respective data sources using the ADA’s built-in data connectors. We demonstrated how to query the data products using standard SQL queries and generate insights on the log data. We also connected the Tableau Desktop client, a third-party BI product, to ADA and demonstrated how to build visualizations against the data products.

ADA automates the process of ingesting, transforming, governing, and querying diverse datasets and simplifying the lifecycle management of data. ADA’s pre-built connectors allow you to ingest data from diverse data sources. Software teams with basic knowledge of AWS products and services will be able to set up an operational data analytics platform in a few hours and provide secure access to the data. The data can then be easily and quickly queried using an intuitive and standalone web user interface.

Try out ADA today to easily manage and gain insights from data.


About the authors

Aparajithan Vaidyanathan is a Principal Enterprise Solutions Architect at AWS. He supports enterprise customers migrate and modernize their workloads on AWS cloud. He is a Cloud Architect with 23+ years of experience designing and developing enterprise, large-scale and distributed software systems. He specializes in Machine Learning & Data Analytics with focus on Data and Feature Engineering domain. He is an aspiring marathon runner and his hobbies include hiking, bike riding and spending time with his wife and two boys.

Rashim Rahman is a Software Developer based out of Sydney, Australia with 10+ years of experience in software development and architecture. He works primarily on building large scale open-source AWS solutions for common customer use cases and business problems. In his spare time, he enjoys sports and spending time with friends and family.

Hafiz Saadullah is a Principal Technical Product Manager at Amazon Web Services. Hafiz focuses on AWS Solutions, designed to help customers by addressing common business problems and use cases.

The art and science of data product portfolio management

Post Syndicated from Faris Haddad original https://aws.amazon.com/blogs/big-data/the-art-and-science-of-data-product-portfolio-management/

This post is the first in a series dedicated to the art and science of practical data mesh implementation (for an overview of data mesh, read the original whitepaper The data mesh shift). The series attempts to bridge the gap between the tenets of data mesh and its real-life implementation by deep-diving into the functional and non-functional capabilities essential to a working operating model, laying out the decisions that need to be made for each capability, and describing the key business and technical processes required to implement them. Taken together, the posts in this series lay out some possible operating models for data mesh within an organization.

Kudzu

Kudzu—or kuzu (クズ)—is native to Japan and southeast China. First introduced to the southeastern United States in 1876 as a promising solution for erosion control, it now represents a cautionary tale about unintended consequences, as Kudzu’s speed of growth outcompetes everything from native grasses to tree systems by growing over and shading them from the sunlight they need to photosynthesize—eventually leading to species extinction and loss of biodiversity. The story of Kudzu offers a powerful analogy to the dangers and consequences of implementing data mesh architectures without fully understanding or appreciating how they are intended to be used. When the “Kudzu” of unmanaged pseudo-data products (methods of sharing data that masquerade as data products while failing to fulfill the myriad obligations associated with them) has overwhelmed the local ecosystem of true data products, eradication is costly and prone to failure, and can represent significant wasted effort and resources, as well as lost time.

Desert

While Kudzu was taking over the south in the 1930s, desertification caused by extensive deforestation was overwhelming the Midwest, with large tracts of land becoming barren and residents forced to leave and find other places to make a living. In the same way, overly restrictive data governance practices that either prevent data products from taking root at all, or pare them back too aggressively (deforestation), can over time create “data deserts” that drive both the producers and consumers of data within an organization to look elsewhere for their data needs. At the same time, unstructured approaches to data mesh management that don’t have a vision for what types of products should exist and how to ensure they are developed are at high risk of creating the same effect through simple neglect. This is due to a  common misconception about data mesh as a data strategy, which is that it is effectively self-organizing—meaning that once presented with the opportunity, data owners within the organization will spring to the responsibilities and obligations associated with publishing high-quality data products. In reality, the work of a data producer is often thankless, and without clear incentive strategies, organizations may end up with data deserts that create more data governance issues as producers and consumers go elsewhere to seek out the data they need to perform work.

Bonsai

Bonsai (盆栽) is an art form originating from an ancient Chinese tradition called penjing (盆景), and later shaped by the minimalist teachings of Zen Buddhism into the practice we know and recognize today. The patient practice of Bonsai offers useful analogies to the concepts and processes required to avoid the chaos of Kudzu as well as the specter of organizational data deserts. Bonsai artists carefully observe the naturally occurring buds that are produced by the tree and encourage those that add to the overall aesthetics of the tree, while pruning those that don’t work well with their neighbors. The same ideas apply equally well to data products within a data mesh—by encouraging the growth and adoption of those data products that add value to our data mesh, and continuously pruning those that do not, we maximize the value and sustainability of our data mesh implementations. In a similar vein, Bonsai artists must balance their vision for the shape of the tree with a respect for the natural characteristics and innate structure of the species they have chosen to work with—to ignore the biology of the tree would be disastrous to the longevity of the tree, as well as to the quality of the art itself. In the same way, organizations seeking to implement successful data mesh strategies must respect the nature and structure (legal, political, commercial, technology) of their organizations in their implementation.

Of the key capabilities proposed for the implementation of a sustainable data mesh operating model, the one that is most relevant to the problems we’ve described—and explore later in this post—is data product portfolio management.

Overview of data product portfolio management

Data mesh architectures are, by their nature, ideal for implementation within federated organizations, with decentralized ownership of data and clear legal, regulatory, or commercial boundaries between entities or lines of business. The same organizational characteristics that make data mesh architectures valuable, however, also put them at risk of turning into one of the twin nightmares of Kudzu or data deserts.

To define the shape and nature of an organizational data mesh, a number of key questions need to be answered, including but not limited to:

  • What are the key data domains within the organization? What are the key data products within these domains needed to solve current business problems? How do we iterate on this discovery process to add value while we are mapping our domains?
  • Who are the consumers in our organization, and what logical, regulatory, physical, or commercial boundaries might separate them from producers and their data products?
  • How do we encourage the development and maintenance of key data products in a decentralized organization?
  • How do we monitor data products against their SLAs, and ensure alerting and escalation on failure so that the organization is protected from bad data?
  • How do we enable those we see as being autonomous producers and consumers with the right skills, the right tools, and the right mindset to actually want to (and be able to) take more ownership of independently publishing data as a product and consuming it responsibly?
  • What is the lifecycle of a data product? When do new data products get created, and who is allowed to create them? When are data products deprecated, and who is accountable for the consequences to their consumers?
  • How do we define “risk” and “value” in the context of data products, and how can we measure this? Whose responsibility is it to justify the existence of a given data product?

To answer questions such as these and plan accordingly, organizations must implement data product portfolio management (DPPM). DPPM does not exist in a vacuum—by its nature, DPPM is closely related to and interdependent with enterprise architecture practices like business capability management and project portfolio management. DPPM itself may therefore also be considered, in part, an enterprise architecture practice.

As an enterprise architecture practice, DPPM is responsible for its implementation, which should reside within a function whose remit is appropriately global and cross-functional. This may be within the CDO office for those organizations that have a CDO or equivalent central data function, or the enterprise architecture team in organizations that do not.

Goals of DPPM

The goals of DPPM can be summarized as follows:

  • Protect value – DPPM protects the value of the organizational data strategy by developing, implementing, and enforcing frameworks to measure the contribution of data products to organizational goals in objective terms. Examples may include associated revenue, savings, or reductions in operational losses. Earlier in their lifecycle, data products may be measured by alternative metrics, including adoption (number of consumers) and level of activity (releases, interaction with consumers, and so on). In the pursuit of this goal, the DPPM capability is accountable for engaging with the business to continuously priorities where data as a product can add value and align delivery priority accordingly. Strategies for measuring value and prioritizing data products are explored later in this post.
  • Manage risk – All data products introduce risk to the organization—risk of wasted money and effort through non-adoption, risk of operational loss associated with improper use, and risk of failure on the part of the data product to meet requirements on availability, completeness, or quality. These risks are exacerbated in the case of proliferation of low-quality or unsupervised data products. DPPM seeks to understand and measure these risks on an individual and aggregated basis. This is a particularly challenging goal because what constitutes risk associated with the existence of a particular data product is determined largely by its consumers and is likely to change over time (though like entropy, is only ever likely to increase).
  • Guide evolution – The final goal of DPPM is to guide the evolution of the data product landscape to meet overarching organizational data goals, such as mutually exclusive or collectively exhaustive domains and data products, the identification and enablement of single-threaded ownership of product definitions, or the agile inclusion of new sources of data and creation of products to serve tactical or strategic business goals. Some principles for the management of data mesh evolution, and the evaluation of data products against organizational goals, are explored later in this post.

Challenges of DPPM

In this section, we explore some of the challenges of DPPM, and the pragmatic ways some of these challenges could be addressed.

Infancy

Data mesh as a concept is still relatively new. As such, there is little standardization associated with practical operating models for building and managing data mesh architectures, and no access to fully fledged out-of-the-box reference operating models, frameworks, or tools to support the practice of DPPM.

Some elements of DPPM are supported in disparate tools (for example, some data catalogs include basic community features that contribute to measuring value), but not in a holistic way. Over time, standardization of the processes associated with DPPM will likely occur as a side-effect of commoditization, driven by the popularity and adoption of new services that take on and automate more of the undifferentiated heavy lifting associated with mesh supervision. In the meantime, however, organizations adopting data mesh architectures are left largely to their own devices around how to operate them effectively.

Resistance

The purest expression of democracy is anarchy, and the more federated an organization is (itself a supporting factor in choosing data mesh architectures), the more resistance may be observed to any forms of centralized governance. This is a challenge for DPPM, because in some way it must come together in one place. Just as the Bonsai artist knows the vision for the entire tree, there must be a cohesive vision for and ability to guide the evolution of a data mesh, no matter how broadly federated and autonomous individual domains or data products might be.

Balancing this with the need to respect the natural shape (and culture) of an organization, however, requires organizations that implement DPPM to think about how to do so in a way that doesn’t conflict with the reality of the organization. This might mean, for example, that DPPM may need to happen at several layers—at minimum within data domains, possibly within lines of business, and then at an enterprise level through appropriate data committees, guilds, or other structures that bring stakeholders together. All of this complicates the processes and collaboration needed to perform DPPM effectively.

Maturity

Data mesh architectures, and therefore DPPM, presume relatively high levels of data maturity within an organization—a clear data strategy, understanding of data ownership and stewardship, principles and policies that govern the use of data, and a moderate-to-high level of education and training around data within the organization. A lack of data maturity within the organization, or a weak or immature enterprise architecture function, will face significant hurdles in the implementation of any data mesh architecture, let alone a strong and useful DPPM practice.

In reality, however, data maturity is not uniform across organizations. Even in seemingly low-maturity organizations, there are often teams who are more mature and have a higher appetite to engage. By leaning into these teams and showing value through them first, then using them as evangelists, organizations can gain maturity while benefitting earlier from the advantages of data mesh strategies.

The following sections explore the implementation of DPPM along the lines of people, process, and technology, as well as describing the key characteristics of data products—scope, value, risk, uniqueness, and fitness—and how they relate to data mesh practices.

People

To implement DPPM effectively, a wide variety of stakeholders in the organization may need to be involved in one capacity or another. The following table suggests some key roles, but it’s up to an individual organization to determine how and if these map to their own roles and functions.

Function RACI Role Responsibility
Senior Leadership A Chief Data Officer Ultimately accountable for organizational data strategy and implementation. Approves changes to DPPM principles and operating model. Acts as chair of, and appoints members to, the data council.
. R Data Council** Stakeholder body representing organizational governance around data strategy. Acts as steering body for the governance of DPPM as a practice (KPI monitoring, maturity assessments, auditing, and so on). Approves changes to guidelines and methodologies. Approves changes to data product portfolio (discussed later in this post). Approves and governs centrally funded and prioritized data product development activities.
Enterprise Architecture AR Head of Enterprise Architecture Responsible for development and enforcement of data strategy. Accountable and responsible for the design and implementation of DPPM as an organizational capability.
. R Domain Architect Responsible for the implementing screening, data product analysis, periodic evaluation, and optimal portfolio selection practices. Responsible for the development of methodologies and their selection criteria.
Legal & Compliance C Legal & Compliance Officer Consults on permissibility of data products with reference to local regulation. Consults on permissibility of data sharing with reference to local regulation or commercial agreements.
. C Data Privacy Officer Consults on permissibility of data use with reference to local data privacy law. Consults on permissibility of cross-entity or border data sharing with reference to data privacy law.
Information Security RC Information Security Officer Consults on maturity assessments (discussed later in this post) for information security-relevant data product capabilities. Approves changes to data product technology architecture. Approves changes to IAM procedures relating to data products.
Business Functions A Data Domain Owner Ultimately accountable for the appropriate use of domain data, as well as its quality and availability. Accountable for domain data products. Approves changes to the domain data model and domain data product portfolio.
c R Data Domain Steward Responsible for implementing data domain responsibilities, including operational (day-to-day) governance of domain data products. Approves use of domain data in new data products, and performs regular (such as yearly) attestation of data products using domain data.
. A Data Owner Ultimately accountable for the appropriate use of owned data (for example, CRM data), as well as its quality and availability.
. R Data Steward Responsible for implementing data responsibilities. Approves use of owned data in new data products, and performs regular (such as yearly) attestation of data products using owned data.
. AR Data Product Owner Accountable and responsible for the design, development, and delivery of data products against their stated SLOs. Contributes to data product analysis and portfolio adjustment practices for own data products.

** The data council typically consists of permanent representatives from each function (data domain owners), enterprise architecture, and the chief data officer or equivalent.

Process

The following diagram illustrates the strategic, tactical, and operational practices associated with DPPM. Some considerations for the implementation of these practices is explored in more detail in this post, though their specific interpretation and implementation is dependent on the individual organization.

Boundaries

When reading this section, it’s important to bear in mind the impact of boundaries—although strategy development may be established as a global practice, other practices within DPPM must respect relevant organizational boundaries (which may be physical, geographical, operational, legal, commercial, or regulatory in nature). In some cases, the existence of boundaries may require some or all tactical and operational practices to be duplicated within each associated boundary. For example, an insurance company with a property and casualty legal entity in North America and a life entity in Germany may need to implement DPPM separately within each entity.

Strategy development

This practice deals with answering questions associated with the overall data mesh strategy, including the following:

  • The overall scope (data domains, participating entities, and so on) of the data mesh
  • The degree of freedom of participating entities in their definition and implementation of the data mesh (for example, a mesh of meshes vs. a single mesh)
  • The distribution of responsibilities for activities and capabilities associated with the data mesh (degree of democratization)
  • The definition and documentation of key performance indicators (KPIs) against which the data mesh should be governed (such as risk and value)
  • The governance operating model (including this practice)

Key deliverables include the following:

  • Organizational guidelines for operational processes around pre-screening and screening of data products
  • Well-defined KPIs that guide methodology development and selection for practices like data product analysis, screening, and optimal portfolio selection
  • Allocation of organizational resources (people, budget, time) to the implementation of tactical processes around methodology development, optimal portfolio selection, and portfolio adjustment

Key considerations

In this section, we discuss some key considerations for strategy development.

Data mesh structure

This diagram illustrates the analogous relationship between data products in a data mesh, and the structure of the mesh itself.

The following considerations relate to screening, data product analysis, and optimal portfolio selection.

  • Trunk (core data products) – Core data products are those that are central to the organization’s ability to function, and from which the majority of other data products are derived. These may be data products consumed in the implementation of key business activities, or associated with critical processes such as regulatory reporting and risk management. Organizational governance for these data products typically favors availability and data accuracy over agility.
  • Branch (cross-domain data products) – Cross-domain data products represent the most common cross-domain use cases for data (for example, joining customer data with product data). These data products may be widely used across business functions to support reporting and analytics, and—to a lesser extent—operational processes. Because these data products may consume a variety of sources, organizational governance may favor a balanced view on agility vs. reliability, accepting some degree of risk in return for being able to adapt to changes in data sources. Data product versioning can offer mitigation of risks associated with change.
  • Leaf (everything else) – These are the myriad data products that may arise within a data mesh, either as permanent additions to support individual teams and use cases or as temporary data products to fill data gaps or support time-limited initiatives. Because the number of these data products may be high and risks are typically limited to a single process or a small part of the organization, organizational governance typically favors a light touch and may prefer to govern through guidelines and best practices, rather than through active participation in the data product lifecycle.

Data products vs. data definitions

The following figure illustrates how data definitions are defined and inherited throughout the lineage of data products.

In a data mesh architecture, data products may inherit data from each other (one data product consumes another in its data pipeline) or independently publish data within (or related to) the same domain. For example, a customer data product may be inherited by a customer support data product, while another the customer journey data product may directly publish customer-relevant data from independent sources. When no standards are applied to how domain data attributes are used and published, data products even within the same data domain may lose interoperability because it becomes difficult or impossible to join them together for reporting or analytics purposes.

To prevent this, it can be useful to distinguish between data products and data definitions. Typically, organizations will select a single-threaded owner (often a data owner or steward, or a domain data owner or steward) who is responsible for defining minimal data definitions for common and reusable data entities within data domains. For example, a data owner responsible for the sales and marketing data domain may identify a customer data product as a reusable data entity within the domain and publish a minimal data definition that all producers of customer-relevant data must incorporate within their data products, to ensure that all data products associated with customer data are interoperable.

DPPM can assist in the identification and production of data definitions as part of its data product analysis activities, as well as enforce their incorporation as part of oversight of data product development.

Service management thinking

These considerations relate to data product analysis, periodic evaluation, and methodology selection.

Data products are services provided to the organization or externally to customers and partners. As such, it may make sense to adapt a service management framework like ITIL, in combination with the ITIL Maturity Model, for use in evaluating the fitness of data products for their scope and audience, as well as in describing the roles, processes, and acceptable technologies that should form the operating model for any data product.

At the operational level, the stakeholders required to implement each practice may change depending on the scope of the data product. For example, the release management practice for a core data product may require involvement of the data council, whereas the same practice for a team data product may only involve the team or functional head. To avoid creating decision-making bottlenecks, organizations should aim to minimize the number of stakeholders in each case and focus on single-threaded owners wherever possible.

The following table proposes a subset of capabilities and how they might be applied to data products of different scopes. Suggested target maturity levels, between 1 and 5, are included for each scope. (1= Initial, 5= Optimizing)

Target Maturity Data Product Scope.
4 – 5 3 – 4 2 – 3 2
Capability    Core
  Cross-Domain
  Function / Team
  Personal
Information Security Management X X X X
Knowledge Management X X X .
Release Management X X X .
Service-Level Management X X X .
Measurement and Reporting X X . .
Availability Management X X . .
Capacity and Performance Management X X . .
Incident Management X X . .
Monitoring and Event Management X X . .
Service Validation and Testing X X . .

Methodology development

This practice deals with the development of concrete, objective frameworks, metrics, and processes for the measurement of data product value and risk. Because the driving factors behind risk and value are not necessarily the same between products, it may be necessary to develop several methodologies or variants thereof.

Key deliverables include the following:

  • Well-defined frameworks for measuring risk and value of data products, as well as for determining the optimal portfolio of data products
  • Operationally feasible, measurable metrics associated with value and risk

Key considerations

A key consideration for assessing data products is that of consumer value or risk vs. uniqueness. The following diagram illustrates how value and risk of a data product are driven by its consumers.

Data products don’t inherently present risk or add value, but rather indirectly pose—in an aggregated fashion—the risk and value created by their consumers.

In a consumer-centric value and risk model, governance of consumers ensures that all data use meets the following requirements:

  • Is associated with a business case justifying the use of data (for example, new business, cost reduction through business process automation, and so on)
  • Is regularly evaluated with reference to the risk associated with the use case (for example, regulatory reporting

The value and risk associated with the linked data products are then calculated as an aggregation. Where organizations already track use cases associated with data, either as part of data privacy governance or as a by-product of the access approval process, these existing systems and databases can be reused or extended.

Conversely, where data products overlap with each other, their value to the organization is reduced accordingly, because redundancies between data products represent an inefficient use of resources and increase organizational complexity associated with data quality management.

To ensure that the model is operationally feasible (see the key deliverables of methodology development), it may be sufficient to consider simple aggregations, rather than attempting to calculate value and risk attribution at a product or use case level.

Optimal portfolio selection

This practice deals with the determination of which combination of data products (existing, new, or potential) would best meet the organization’s current and known future needs. This practice takes input from data product analysis and data product proposals, as well as other enterprise architecture practices (for example, business architecture), and considers trade-offs between data-debt and time-to-value, as well as other considerations such as redundancy between data products to determine the optimal mix of permanent and temporary data products at any given point in time.

Because the number of data products in an organization may become significant over time, it may be useful to apply heuristics to the problem of optimal portfolio selection. For example, it may be sufficient to consider core and cross-domain data products (trunk and branches) during quarterly portfolio reviews, with other data products (leaves) audited on a yearly basis.

Key deliverables include the following:

  • A target state definition for the data mesh, including all relevant data products
  • An indication of organizational priorities for use by the portfolio adjustment practice

Key considerations

The following are key considerations regarding the data product half-life:

  • Long-term or strategic data products – These data products fill a long-term organizational need, are often associated with key source systems in various domains, and anchor the overall data strategy. Over time, as an organization’s data mesh matures, long-term data products should form the bulk of the mesh.
  • Time-bound data products – These data products fill a gap in data strategy and allow the organization to move on data opportunities until core data products can be updated. An example of this might be data products created and used in the context of mergers and acquisitions transactions and post-acquisition, to provide consistent data for reporting and business intelligence until mid-term and long-term application consolidation has taken place. Time-bound data products are considered as data-debt and should be managed accordingly.
  • Purpose-driven data products – These data products serve a narrow, finite purpose. Purpose-driven data products may or may not be time-bound, but are characterized primarily by a strict set of consumers known in advance. Examples of this might include:
    • Data products developed to support system-of-record harmonization between lines of business (for example, deduplication of customer records between insurance lines of business using separate CRM systems
    • Data products created explicitly for the monitoring of other data products (data quality, update frequency, and so on)

Portfolio adjustment

This practice implements the feasibility analysis, planning and project management, as well as communication and organizational change management activities associated with changes to the optimal portfolio. As part of this practice, a gap analysis is conducted between the current and target data product portfolio, and a set of required actions and estimated time and effort prepared for review by the organization. During such a period, data products may be marked for development (new data products to fill a need), changes, consolidation (merging two or more data products into a single data product), or deprecation. Several iterations of optimal portfolio selection and portfolio adjustment may be required to find an appropriate balance between optimality and feasibility of implementation.

Key deliverables include the following:

  • A gap analysis between the current and target data product portfolio, as well as proposed remediation activities
  • High-level project plans and effort or budget assessments associated with required changes, for approval by relevant stakeholders (such as the data council)

Data product proposals

This practice organizes the collection and prioritization of requests for new, or changes to existing, data products within the organization. Its implementation may be adapted from or managed by existing demand management processes within the organization.

Key deliverables include a registry of demand against new or existing data products, including metadata on source systems, attributes, known use cases, proposed data product owners, and suggested organizational priority.

Methodology selection

This practice is associated with the identification and application of the most appropriate methodologies (such as value and risk) during data product analysis, screening, and optimal portfolio selection. The selection of an appropriate methodology for the type, maturity, and scope of a data product (or an entire portfolio) is a key element in avoiding either a “Kudzu” mesh or a “data desert.”

Key deliverables include reusable selection criteria for mapping methodologies to data products during data product analysis, screening, and optimal portfolio selection.

Pre-screening

This optional practice is primarily a mechanism to avoid unnecessary time and effort in the comparatively expensive practice of data product analysis by offering simple self-service applications of guidelines to the evaluation of data products. An example might include the automated approval of data products that fall under the classification of personal data products, requiring only attestation on the part of the requester that they will uphold the relevant portions of the guideline that governs such data products.

Key deliverables include tools and checklists for the self-service evaluation of data products against guidelines and automated registration of approved data products.

Data product analysis

This practice incorporates guidelines, methodologies, as well as (where available) metadata relating to data products (performance against SLOs, service management metrics, degree of overlap with other data products) to establish an understanding of the value and risk associated with individual data products, as well as gaps between current and target capability maturities, and compliance with published product definitions and standards.

Key deliverables include a summary of findings for a particular data product, including scores for relevant value, risk, and maturity metrics, as well as operational gaps requiring remediation and recommendations on next steps (repair, enhance, decommission, and so on).

Screening

This optional practice is a mechanism to reduce complexity in optimal portfolio selection by ensuring the early removal of data products from consideration that fail to meet value or risk targets, or have been identified as redundant to other data products already available in the organization.

Key deliverables include a list of data products that should be slated for removal (direct-to-decommissioning).

Data product development

This practice is not performed directly under DPPM, but is managed in part by the portfolio adjustment practice, and may be governed by standards that are developed as part of DPPM. In the context of DPPM, this practice is primarily associated with ensuring that data products are developed according to the specifications agreed as part of portfolio adjustment.

Key deliverables include project management and software or service development deliverables and artefacts.

Data product decommissioning

This practice manages the decommissioning of data products and the migration of affected consumers to new or other data products where relevant. Unmanaged decommissioning of data products, especially those with many downstream consumers, can threaten the stability of the entire data mesh, as well as have significant consequences to business functions.

Key deliverables include a decommissioning plan, including stakeholder assessment and sign-off, timelines, migration plans for affected consumers, and back-out strategies.

Periodic evaluation

This practice manages the calendar and implementation of periodic reviews of the data mesh, both in its entirety as well as at the data product level, and is primarily an exercise in project management.

Key deliverables include the following:

  • yearly review calendar, published and made available to all data product owners and affected stakeholders
  • Project management deliverables and artefacts, including evidence of evaluations having been performed against each data product

Technology

Although most practices within DPPM don’t rely heavily on technology and automation, some key supporting applications and services are required to implement DPPM effectively:

  • Data catalog – Core to the delivery of DPPM is the organizational data catalog. Beyond providing transparency into what data products exist within an organization, a data catalog can provide key insights into data lineage between data products (key to the implementation of portfolio adjustment) and adoption of data products by the organization. The data catalog can also be used to capture and make available both the documented as well as the realized SLO for any given data product, and—through the use of a business glossary—assist in the identification of redundancy between data products.
  • Service management – Service management solutions (such as ServiceNOW) used in the context of data product management offer important insights into the fitness of data products by capturing and tracking incidents, problems, requests, and other metrics against data products.
  • Demand management – A demand management solution supports self-service implementation and automation of data product proposal and pre-screening activities, as well as prioritization activities associated with selection and development of data products.

Conclusion

Although this post focused on implementing DPPM in the context of a data mesh, this capability—like data product thinking—is not exclusive to data mesh architectures. The practices outlined here can be practiced at any scale to ensure that the production and use of data within the organization is always in line with its current and future needs, that governance is implemented in a consistent way, and that the organization can have Bonsai, not Kudzu.

For more information about data mesh and data management, refer to the following:

In upcoming posts, we will cover other aspects of data mesh operating models, including data mesh supervision and service management models for data product owners.


About the Authors


Maximilian Mayrhofer
is a Principal Solutions Architect working in the AWS Financial Services EMEA Go-to-Market team. He has over 12 years experience in digital transformation within private banking and asset management. In his free time, he is an avid reader of science fiction and enjoys bouldering.


Faris Haddad
is the Data & Insights Lead in the AABG Strategic Pursuits team. He helps enterprises successfully become data-driven.

Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-kinesis-data-streams-and-amazon-dynamodb/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, easy, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the widely used cloud data warehouse. You can run and scale analytics in seconds on all your data without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift streaming ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use SQL (Structured Query Language) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss a solution that uses Amazon Redshift streaming ingestion to provide near-real-time analytics.

Overview of solution

We walk through an example pipeline to ingest data from an Amazon DynamoDB source table in near-real time using Kinesis Data Streams in combination with Amazon Redshift streaming ingestion. We also walk through using PartiQL in Amazon Redshift to unnest nested JSON documents and build fact and dimension tables that are used in your data warehouse refresh. The solution uses Kinesis Data Streams to capture item-level changes from an application DynamoDB table.

As shown in the following reference architecture, DynamoDB table data changes are streamed into Amazon Redshift through Kinesis Data Streams and Amazon Redshift streaming ingestion for near-real-time analytics dashboard visualization using Amazon QuickSight.

The process flow includes the following steps:

  1. Create a Kinesis data stream and turn on the data stream from DynamoDB to capture item-level changes in your DynamoDB table.
  2. Create a streaming materialized view in your Amazon Redshift cluster to consume live streaming data from the data stream.
  3. The streaming data gets ingested into a JSON payload. Use a combination of a PartiQL statement and dot notation to unnest the JSON document into data columns of a staging table in Amazon Redshift.
  4. Create fact and dimension tables in the Amazon Redshift cluster and keep loading the latest data at regular intervals from the staging table using transformation logic.
  5. Establish connectivity between a QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

Prerequisites

You must have the following:

Set up a Kinesis data stream

To configure your Kinesis data stream, complete the following steps:

  1. Create a Kinesis data stream called demo-data-stream. For instructions, refer to Step 1 in Set up streaming ETL pipelines.

Configure the stream to capture changes from the DynamoDB table.

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Open your table.
  3. On the Exports and streams tab, choose Turn on under Amazon Kinesis data stream details.

  1. For Destination Kinesis data stream, choose demo-data-stream.
  2. Choose Turn on stream.

Item-level changes in the DynamoDB table should now be flowing to the Kinesis data stream.

  1. To verify if the data is entering the stream, on the Kinesis Data Streams console, open demo-data-stream.
  2. On the Monitoring tab, find the PutRecord success – average (Percent) and PutRecord – sum (Bytes) metrics to validate record ingestion.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to Steps 1 and 2 in Getting started with streaming ingestion from Amazon Kinesis Data Streams.
  2. Launch the Query Editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Amazon Redshift cluster for the next steps.
  3. Create an external schema:
CREATE EXTERNAL SCHEMA demo_schema
FROM KINESIS
IAM_ROLE { default | 'iam-role-arn' };
  1. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session or cluster level.
  2. Create a materialized view to consume the stream data and store stream records in semi-structured SUPER format:
CREATE MATERIALIZED VIEW demo_stream_vw AS
    SELECT approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_parse(kinesis_data) as payload    
    FROM demo_schema."demo-data-stream";
  1. Refresh the view, which triggers Amazon Redshift to read from the stream and load data into the materialized view:
REFRESH MATERIALIZED VIEW demo_stream_vw;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions on how to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the Kinesis data stream to the payload column of the streaming materialized view demo_stream_vw:

{
  "awsRegion": "us-east-1",
  "eventID": "6d24680a-6d12-49e2-8a6b-86ffdc7306c1",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "sample-dynamoDB",
  "dynamodb": {
    "ApproximateCreationDateTime": 1657294923614,
    "Keys": {
      "pk": {
        "S": "CUSTOMER#CUST_123"
      },
      "sk": {
        "S": "TRANSACTION#2022-07-08T23:59:59Z#CUST_345"
      }
    },
    "NewImage": {
      "completionDateTime": {
        "S": "2022-07-08T23:59:59Z"
      },
      "OutofPockPercent": {
        "N": 50.00
      },
      "calculationRequirements": {
        "M": {
          "dependentIds": {
            "L": [
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              },
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_890"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              }
            ]
          }
        }
      },
      "Event": {
        "S": "SAMPLE"
      },
      "Provider": {
        "S": "PV-123"
      },
      "OutofPockAmount": {
        "N": 1000
      },
      "lastCalculationDateTime": {
        "S": "2022-07-08T00:00:00Z"
      },
      "sk": {
        "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
      },
      "OutofPockMax": {
        "N": 2000
      },
      "pk": {
        "S": "CUSTOMER#CUST_123"
      }
    },
    "SizeBytes": 694
  },
  "eventSource": "aws:dynamodb"
}

We can use dot notation to unnest the JSON document. But in addition to that, we should use a PartiQL statement to handle arrays if applicable. For example, in the preceding JSON document, there is an array under the element:

"dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L".

The following SQL query uses a combination of dot notation and a PartiQL statement to unnest the JSON document:

select 
substring(a."payload"."dynamodb"."Keys"."pk"."S"::varchar, position('#' in "payload"."dynamodb"."Keys"."pk"."S"::varchar)+1) as Customer_ID,
substring(a."payload"."dynamodb"."Keys"."sk"."S"::varchar, position('#TRANSACTION' in "payload"."dynamodb"."Keys"."sk"."S"::varchar)+1) as Transaction_ID,
substring(b."M"."sk"."S"::varchar, position('#CUSTOMER' in b."M"."sk"."S"::varchar)+1) Dependent_ID,
a."payload"."dynamodb"."NewImage"."OutofPockMax"."N"::int as OutofPocket_Max,
a."payload"."dynamodb"."NewImage"."OutofPockPercent"."N"::decimal(5,2) as OutofPocket_Percent,
a."payload"."dynamodb"."NewImage"."OutofPockAmount"."N"::int as OutofPock_Amount,
a."payload"."dynamodb"."NewImage"."Provider"."S"::varchar as Provider,
a."payload"."dynamodb"."NewImage"."completionDateTime"."S"::timestamptz as Completion_DateTime,
a."payload"."eventName"::varchar Event_Name,
a.approximate_arrival_timestamp
from demo_stream_vw a
left outer join a."payload"."dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L" b on true;

The query unnests the JSON document to the following result set.

Precompute the result set using a materialized view

Optionally, to precompute and store the unnested result set from the preceding query, you can create a materialized view and schedule it to refresh at regular intervals. In this post, we maintain the preceding unnested data in a materialized view called mv_demo_super_unnest, which will be refreshed at regular intervals and used for further processing.

To capture the latest data from the DynamoDB table, the Amazon Redshift streaming materialized view needs to be refreshed at regular intervals, and then the incremental data should be transformed and loaded into the final fact and dimension table. To avoid reprocessing the same data, a metadata table can be maintained at Amazon Redshift to keep track of each ELT process with status, start time, and end time, as explained in the following section.

Maintain an audit table in Amazon Redshift

The following is a sample DDL of a metadata table that is maintained for each process or job:

create table MetaData_ETL
(
JobName varchar(100),
StartDate timestamp, 
EndDate timestamp, 
Status varchar(50)
);

The following is a sample initial entry of the metadata audit table that can be maintained at job level. The insert statement is the initial entry for the ELT process to load the Customer_Transaction_Fact table:

insert into MetaData_ETL 
values
('Customer_Transaction_Fact_Load', current_timestamp, current_timestamp,'Ready' );

Build a fact table with the latest data

In this post, we demonstrate the loading of a fact table using specific transformation logic. We are skipping the dimension table load, which uses similar logic.

As a prerequisite, create the fact and dimension tables in a preferred schema. In following example, we create the fact table Customer_Transaction_Fact in Amazon Redshift:

CREATE TABLE public.Customer_Transaction_Fact (
Transaction_ID character varying(500),
Customer_ID character varying(500),
OutofPocket_Percent numeric(5,2),
OutofPock_Amount integer,
OutofPocket_Max integer,
Provider character varying(500),
completion_datetime timestamp
);

Transform data using a stored procedure

We load this fact table from the unnested data using a stored procedure. For more information, refer to Creating stored procedures in Amazon Redshift.

Note that in this sample use case, we are using transformation logic to identify and load the latest value of each column for a customer transaction.

The stored procedure contains the following components:

  • In the first step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Running and StartDate as the current timestamp, which indicates that the fact load process is starting.
  • Refresh the materialized view mv_demo_super_unnest, which contains the unnested data.
  • In the following example, we load the fact table Customer_Transaction_Fact using the latest data from the streaming materialized view based on the column approximate_arrival_timestamp, which is available as a system column in the streaming materialized view. The value of approximate_arrival_timestamp is set when a Kinesis data stream successfully receives and stores a record.
  • The following logic in the stored procedure checks if the approximate_arrival_timestamp in mv_demo_super_unnest is greater than the EndDate timestamp in the MetaData_ETL audit table, so that it can only process the incremental data.
  • Additionally, while loading the fact table, we identify the latest non-null value of each column for every Transaction_ID depending on the order of the approximate_arrival_timestamp column using the rank and min
  • The transformed data is loaded into the intermediate staging table
  • The impacted records with the same Transaction_ID values are deleted and reloaded into the Customer_Transaction_Fact table from the staging table
  • In the last step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Complete and EndDate as the current timestamp, which indicates that the fact load process has completed successfully.

See the following code:

CREATE OR REPLACE PROCEDURE SP_Customer_Transaction_Fact()
AS $$
BEGIN

set enable_case_sensitive_identifier to true;

--Update metadata audit table entry to indicate that the fact load process is running
update MetaData_ETL
set status = 'Running',
StartDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';

refresh materialized view mv_demo_super_unnest;

drop table if exists Customer_Transaction_Fact_Stg;

--Create latest record by Merging records based on approximate_arrival_timestamp
create table Customer_Transaction_Fact_Stg as
select 
m.Transaction_ID,
min(case when m.rank_Customer_ID =1 then m.Customer_ID end) Customer_ID,
min(case when m.rank_OutofPocket_Percent =1 then m.OutofPocket_Percent end) OutofPocket_Percent,
min(case when m.rank_OutofPock_Amount =1 then m.OutofPock_Amount end) OutofPock_Amount,
min(case when m.rank_OutofPocket_Max =1 then m.OutofPocket_Max end) OutofPocket_Max,
min(case when m.rank_Provider =1 then m.Provider end) Provider,
min(case when m.rank_Completion_DateTime =1 then m.Completion_DateTime end) Completion_DateTime
from
(
select *,
rank() over(partition by Transaction_ID order by case when mqp.Customer_ID is not null then 1 end, approximate_arrival_timestamp desc ) rank_Customer_ID,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Percent is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Percent,
rank() over(partition by Transaction_ID order by case when mqp.OutofPock_Amount is not null then 1 end, approximate_arrival_timestamp  desc )  rank_OutofPock_Amount,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Max is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Max,
rank() over(partition by Transaction_ID order by case when mqp.Provider is not null then 1 end, approximate_arrival_timestamp  desc ) rank_Provider,
rank() over(partition by Transaction_ID order by case when mqp.Completion_DateTime is not null then 1 end, approximate_arrival_timestamp desc )  rank_Completion_DateTime
from mv_demo_super_unnest mqp
where upper(mqp.event_Name) <> 'REMOVE' and mqp.approximate_arrival_timestamp > (select mde.EndDate from MetaData_ETL mde where mde.JobName = 'Customer_Transaction_Fact_Load') 
) m
group by m.Transaction_ID 
order by m.Transaction_ID
;

--Delete only impacted Transaction_ID from Fact table
delete from Customer_Transaction_Fact  
where Transaction_ID in ( select mqp.Transaction_ID from Customer_Transaction_Fact_Stg mqp);

--Insert latest records from staging table to Fact table
insert into Customer_Transaction_Fact
select * from Customer_Transaction_Fact_Stg; 

--Update metadata audit table entry to indicate that the fact load process is completed
update MetaData_ETL
set status = 'Complete',
EndDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';
END;
$$ LANGUAGE plpgsql;

Additional considerations for implementation

There are several additional capabilities that you could utilize to modify this solution to meet your needs. Many customers utilize multiple AWS accounts, and it’s common that the Kinesis data stream may be in a different AWS account than the Amazon Redshift data warehouse. If this is the case, you can utilize an Amazon Redshift IAM role that assumes a role in the Kinesis data stream AWS account in order to read from the data stream. For more information, refer to Cross-account streaming ingestion for Amazon Redshift.

Another common use case is that you need to schedule the refresh of your Amazon Redshift data warehouse jobs so that the data warehouse’s data is continuously updated. To do this, you can utilize Amazon EventBridge to schedule the jobs in your data warehouse to run on a regular basis. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. Another option is to use Amazon Redshift Query Editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

If you have a requirement to load data from a DynamoDB table with existing data, refer to Loading data from DynamoDB into Amazon Redshift.

For more information on Amazon Redshift streaming ingestion capabilities, refer to Real-time analytics with Amazon Redshift streaming ingestion.

Clean up

To avoid unnecessary charges, clean up any resources that you built as part of this architecture that are no longer in use. This includes dropping the materialized view, stored procedure, external schema, and tables created as part of this post. Additionally, make sure you delete the DynamoDB table and delete the Kinesis data stream.

Conclusion

After following the solution in this post, you’re now able to build near-real-time analytics using Amazon Redshift streaming ingestion. We showed how you can ingest data from a DynamoDB source table using a Kinesis data stream in order to refresh your Amazon Redshift data warehouse. With the capabilities presented in this post, you should be able to increase the refresh rate of your Amazon Redshift data warehouse in order to provide the most up-to-date data in your data warehouse for your use case.


About the authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Matt Nispel is an Enterprise Solutions Architect at AWS. He has more than 10 years of experience building cloud architectures for large enterprise companies. At AWS, Matt helps customers rearchitect their applications to take full advantage of the cloud. Matt lives in Minneapolis, Minnesota, and in his free time enjoys spending time with friends and family.

Dan Dressel is a Senior Analytics Specialist Solutions Architect at AWS. He is passionate about databases, analytics, machine learning, and architecting solutions. In his spare time, he enjoys spending time with family, nature walking, and playing foosball.

Improved scalability and resiliency for Amazon EMR on EC2 clusters

Post Syndicated from Ravi Kumar Singh original https://aws.amazon.com/blogs/big-data/improved-scalability-and-resiliency-for-amazon-emr-on-ec2-clusters/

Amazon EMR is the cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto. Customers asked us for features that would further improve the resiliency and scalability of their Amazon EMR on EC2 clusters, including their large, long-running clusters. We have been hard at work to meet those needs. Over the past 12 months, we have worked backward from customer requirements and launched over 30 new features that improve the resiliency and scalability of your Amazon EMR on EC2 clusters. This post covers some of these key enhancements across three main areas:

  • Improved cluster utilization with optimized scaling experience
  • Minimized interruptions with enhanced resiliency and availability
  • Improved cluster resiliency with upgraded logging and debugging capabilities

Let’s dive into each of these areas.

Improved cluster utilization with optimized scaling experience

Customers use Amazon EMR to run diverse analytics workloads with varying SLAs, ranging from near-real-time streaming jobs to exploratory interactive workloads and everything in between. To cater to these dynamic workloads, you can resize your clusters either manually or by enabling automatic scaling. You can also use the Amazon EMR managed scaling feature to automatically resize your clusters for optimal performance at the lowest possible cost. To ensure swift cluster resizes, we implemented multiple improvements that are available in the latest Amazon EMR releases:

  • Enhanced resiliency of cluster scaling workflow to EC2 Spot Instance interruptions – Many Amazon EMR customers use EC2 Spot Instances for their Amazon EMR on EC2 clusters to reduce costs. Spot Instances are spare Amazon Elastic Compute Cloud (Amazon EC2) compute capacity offered at discounts of up to 90% compared to On-Demand pricing. However, Amazon EC2 can reclaim Spot capacity with a two-minute warning, which can lead to interruptions in workload. We identified an issue where the cluster’s scaling operation gets stuck when over a hundred core nodes launched on Spot Instances are reclaimed by Amazon EC2 throughout the life of the cluster. Starting with Amazon EMR version 6.8.0, we mitigated this issue by fixing a gap in the process HDFS uses to decommission nodes that caused the scaling operations to get stuck. We contributed this improvement back to the open-source community, enabling seamless recovery and efficient scaling in the event of Spot interruptions.
  • Improve cluster utilization by recommissioning recently decommissioned nodes for Spark workloads within seconds – Amazon EMR allows you to scale down your cluster without affecting your workload by gracefully decommissioning core and task nodes. Furthermore, to prevent task failures, Apache Spark ensures that decommissioning nodes are not assigned any new tasks. However, if a new job is submitted immediately before these nodes are fully decommissioned, Amazon EMR will trigger a scale-up operation for the cluster. This results in these decommissioning nodes to be immediately recommissioned and added back into the cluster. Due to a gap in Apache Spark’s recommissioning logic, these recommissioned nodes would not accept new Spark tasks for up to 60 minutes. We enhanced the recommissioning logic, which ensures recommissioned nodes would start accepting new tasks within seconds, thereby improving cluster utilization. This improvement is available in Amazon EMR release 6.11 and higher.
  • Minimized cluster scaling interruptions due to disk over-utilization – The YARN ResourceManager exclude file is a key component of Apache Hadoop that Amazon EMR uses to centrally manage cluster resources for multiple data-processing frameworks. This exclude file contains a list of nodes to be removed from the cluster to facilitate a cluster scale-down operation. With Amazon EMR release 6.11.0, we improved the cluster scaling workflow to reduce scale-down failures. This improvement minimizes failures due to partial updates or corruption in the exclude file caused by low disk space. Additionally, we built a robust file recovery mechanism to restore the exclude file in case of corruption, ensuring uninterrupted cluster scaling operations.

Minimized interruptions with enhanced resiliency and availability

Amazon EMR offers high availability and fault tolerance for your big data workloads. Let’s look at a few key improvements we launched in this area:

  • Improved fault tolerance to hardware reconfiguration – Amazon EMR offers the flexibility to decouple storage and compute. We observed that customers often increase the size of or add incremental block-level storage to their EC2 instances as their data processing volume and concurrency grow. Starting with Amazon EMR release 6.11.0, we made the EMR cluster’s local storage file system more resilient to unpredictable instance reconfigurations such as instance restarts. By addressing scenarios where an instant restart could result in the block storage device name to change, we eliminated the risk of the cluster becoming inoperable or losing data.
  • Reduce cluster startup time for Kerberos-enabled EMR clusters with long-running bootstrap actions – Multiple customers use Kerberos for authentication and run long-running bootstrap actions on their EMR clusters. In Amazon EMR 6.9.0 and higher releases, we fixed a timing sequence mismatch issue that occurs between Apache BigTop and the Amazon EMR on EC2 cluster startup sequence. This timing sequence mismatch occurs when a system attempts to perform two or more operations at the same time instead of doing them in the proper sequence. This issue caused certain cluster configurations to experience instance startup timeouts. We contributed a fix to the open-source community and made additional improvements to the Amazon EMR startup sequence to prevent this condition, resulting in cluster start time improvements of up to 200% for such clusters.

Improved cluster resiliency with upgraded logging and debugging capabilities

Effective log management is essential to ensure log availability and maintain the health of EMR clusters. This becomes especially critical when you’re running multiple custom client tools and third-party applications on your Amazon EMR on EC2 clusters. Customers depend on EMR logs, in addition to EMR events, to monitor cluster and workload health, troubleshoot urgent issues, simplify security audit, and enhance compliance. Let’s look at a few key enhancements we made in this area:

  • Upgraded on-cluster log management daemon – Amazon EMR now automatically restarts the log management daemon if it’s interrupted. The Amazon EMR on-cluster log management daemon archives logs to Amazon Simple Storage Service (Amazon S3) and deletes them from instance storage. This minimizes cluster failures due to disk over-utilization, while allowing the log files to remain accessible even after the cluster or node stops. This upgrade is available in Amazon EMR release 6.10.0 and higher. For more information, see Configure cluster logging and debugging.
  • Enhanced cluster stability with improved log rotation and monitoring – Many of our customers have long-running clusters that have been operating for years. Some open-source application logs such as Hive and Kerberos logs that are never rotated can continue to grow on these long-running clusters. This could lead to disk over-utilization and eventually result in cluster failures. We enabled log rotation for such log files to minimize disk, memory, and CPU over-utilization scenarios. Furthermore, we expanded our log monitoring to include additional log folders. These changes, available starting with Amazon EMR version 6.10.0, minimize situations where EMR cluster resources are over-utilized, while ensuring log files are archived to Amazon S3 for a wider variety of use cases.

Conclusion

In this post, we highlighted the improvements that we made in Amazon EMR on EC2 with the goal to make your EMR clusters more resilient and stable. We focused on improving cluster utilization with the improved and optimized scaling experience for EMR workloads, minimized interruptions with enhanced resiliency and availability for Amazon EMR on EC2 clusters, and improved cluster resiliency with upgraded logging and debugging capabilities. We will continue to deliver further enhancements with new Amazon EMR releases. We invite you to try new features and capabilities in the latest Amazon EMR releases and get in touch with us through your AWS account team to share your valuable feedback and comments. To learn more and get started with Amazon EMR, check out the tutorial Getting started with Amazon EMR.


About the Authors

Ravi Kumar is a Senior Product Manager for Amazon EMR at Amazon Web Services.

Kevin Wikant is a Software Development Engineer for Amazon EMR at Amazon Web Services.

How Amazon Finance Automation built a data mesh to support distributed data ownership and centralize governance

Post Syndicated from Nitin Arora original https://aws.amazon.com/blogs/big-data/how-amazon-finance-automation-built-a-data-mesh-to-support-distributed-data-ownership-and-centralize-governance/

Amazon Finance Automation (FinAuto) is the tech organization of Amazon Finance Operations (FinOps). Its mission is to enable FinOps to support the growth and expansion of Amazon businesses. It works as a force multiplier through automation and self-service, while providing accurate and on-time payments and collections. FinAuto has a unique position to look across FinOps and provide solutions that help satisfy multiple use cases with accurate, consistent, and governed delivery of data and related services.

In this post, we discuss how the Amazon Finance Automation team used AWS Lake Formation and the AWS Glue Data Catalog to build a data mesh architecture that simplified data governance at scale and provided seamless data access for analytics, AI, and machine learning (ML) use cases.

Challenges

Amazon businesses have grown over the years. In the early days, financial transactions could be stored and processed on a single relational database. In today’s business world, however, even a subset of the financial space dedicated to entities such as Accounts Payable (AP) and Accounts Receivable (AR) requires separate systems handling terabytes of data per day. Within FinOps, we can curate more than 300 datasets and consume many more raw datasets from dozens of systems. These datasets can then be used to power front end systems, ML pipelines, and data engineering teams.

This exponential growth necessitated a data landscape that was geared towards keeping FinOps operating. However, as we added more transactional systems, data started to grow in operational data stores. Data copies were common, with duplicate pipelines creating redundant and often out-of-sync domain datasets. Multiple curated data assets were available with similar attributes. To resolve these challenges, FinAuto decided to build a data services layer based on a data mesh architecture. FinAuto wanted to verify that the data domain owners would retain ownership of their datasets while users got access to the data by using a data mesh architecture.

Solution overview

Being customer focused, we started by understanding our data producers’ and consumers’ needs and priorities. Consumers prioritized data discoverability, fast data access, low latency, and high accuracy of data. Producers prioritized ownership, governance, access management, and reuse of their datasets. These inputs reinforced the need of a unified data strategy across the FinOps teams. We decided to build a scalable data management product that is based on the best practices of modern data architecture. Our source system and domain teams were mapped as data producers, and they would have ownership of the datasets. FinAuto provided the data services’ tools and controls necessary to enable data owners to apply data classification, access permissions, and usage policies. It was necessary for domain owners to continue this responsibility because they had visibility to the business rules or classifications and applied that to the dataset. This enabled producers to publish data products that were curated and authoritative assets for their domain. For example, the AR team created and governed their cash application dataset in their AWS account AWS Glue Data Catalog.

With many such partners building their data products, we needed a way to centralize data discovery, access management, and vending of these data products. So we built a global data catalog in a central governance account based on the AWS Glue Data Catalog. The FinAuto team built AWS Cloud Development Kit (AWS CDK), AWS CloudFormation, and API tools to maintain a metadata store that ingests from domain owner catalogs into the global catalog. This global catalog captures new or updated partitions from the data producer AWS Glue Data Catalogs. The global catalog is also periodically fully refreshed to resolve issues during metadata sync processes to maintain resiliency. With this structure in place, we then needed to add governance and access management. We selected AWS Lake Formation in our central governance account to help secure the data catalog, and added secure vending mechanisms around it. We also built a front-end discovery and access control application where consumers can browse datasets and request access. When a consumer requests access, the application validates the request and routes them to a respective producer via internal tickets for approval. Only after the data producer approves the request are permissions provisioned in the central governance account through Lake Formation.

Solution tenets

A data mesh architecture has its own advantages and challenges. By democratizing the data product creation, we removed dependencies on a central team. We made reuse of data possible with data discoverability and minimized data duplicates. This also helped remove data movement pipelines, thereby reducing data transfer and maintenance costs.

We realized, however, that our implementation could potentially impact day-to-day tasks and inhibit adoption. For example, data producers need to onboard their dataset to the global catalog, and complete their permissions management before they can share that with consumers. To overcome this obstacle, we prioritized self-service tools and automation with a reliable and simple-to-use interface. We made interaction, including producer-consumer onboarding, data access request, approvals, and governance, quicker through the self-service tools in our application.

Solution architecture

Within Amazon, we isolate different teams and business processes with separate AWS accounts. From a security perspective, the account boundary is one of the strongest security boundaries in AWS. Because of this, the global catalog resides in its own locked-down AWS account.

The following diagram shows AWS account boundaries for producers, consumers, and the central catalog. It also describes the steps involved for data producers to register their datasets as well as how data consumers get access. Most of these steps are automated through convenience scripts with both AWS CDK and CloudFormation templates for our producers and consumer to use.

Solution Architecture Diagram

The workflow contains the following steps:

  1. Data is saved by the producer in their own Amazon Simple Storage Service (Amazon S3) buckets.
  2. Data source locations hosted by the producer are created within the producer’s AWS Glue Data Catalog.
  3. Data source locations are registered with Lake Formation.
  4. An onboarding AWS CDK script creates a role for the central catalog to use to read metadata and generate the tables in the global catalog.
  5. The metadata sync is set up to continuously sync data schema and partition updates to the central data catalog.
  6. When a consumer requests table access from the central data catalog, the producer grants Lake Formation permissions to the consumer account AWS Identity and Access Management (IAM) role and tables are visible in the consumer account.
  7. The consumer account accepts the AWS Resource Access Manager (AWS RAM) share and creates resource links in Lake Formation.
  8. The consumer data lake admin provides grants to IAM users and roles mapping to data consumers within the account.

The global catalog

The basic building block of our business-focused solutions are data products. A data product is a single domain attribute that a business understands as accurate, current, and available. This could be a dataset (a table) representing a business attribute like a global AR invoice, invoice aging, aggregated invoices by a line of business, or a current ledger balance. These attributes are calculated by the domain team and are available for consumers who need that attribute, without duplicating pipelines to recreate it. Data products, along with raw datasets, reside within their data owner’s AWS account. Data producers register their data catalog’s metadata to the central catalog. We have services to review source catalogs to identify and recommend classification of sensitive data columns such as name, email address, customer ID, and bank account numbers. Producers can review and accept those recommendations, which results in corresponding tags applied to the columns.

Producer experience

Producers onboard their accounts when they want to publish a data product. Our job is to sync the metadata between the AWS Glue Data Catalog in the producer account with the central catalog account, and register the Amazon S3 data location with Lake Formation. Producers and data owners can use Lake Formation for fine-grained access controls on the table. It is also now searchable and discoverable via the central catalog application.

Consumer experience

When a data consumer discovers the data product that they’re interested in, they submit a data access request from the application UI. Internally, we route the request to the data owner for the disposition of the request (approval or rejection). We then create an internal ticket to track the request for auditing and traceability. If the data owner approves the request, we run automation to create an AWS RAM resource share to share with the consumer account covering the AWS Glue database and tables approved for access. These consumers can now query the datasets using the AWS analytics services of their choice like Amazon Redshift Spectrum, Amazon Athena, and Amazon EMR.

Operational excellence

Along with building the data mesh, it’s also important to verify that we can operate with efficiency and reliability. We recognize that the metadata sync process is at the heart of this global data catalog. As such, we are hypervigilant of this process and have built alarms, notifications, and dashboards to verify that this process doesn’t fail silently and create a single point of failure for the global data catalog. We also have a backup repair service that syncs the metadata from producer catalogs into the central governance account catalog periodically. This is a self-healing mechanism to maintain reliability and resiliency.

Empowering customers with the data mesh

The FinAuto data mesh hosts around 850 discoverable and shareable datasets from multiple partner accounts. There are more than 300 curated data products to which producers can provide access and apply governance with fine-grained access controls. Our consumers use AWS analytics services such as Redshift Spectrum, Athena, Amazon EMR, and Amazon QuickSight to access their data. This capability with standardized data vending from the data mesh, along with self-serve capabilities, allows you to innovate faster without dependency on technical teams. You can now get access to data faster with automation that continuously improves the process.

By serving the FinOps team’s data needs with high availability and security, we enabled them to effectively support operation and reporting. Data science teams can now use the data mesh for their finance-related AI/ML use cases such as fraud detection, credit risk modeling, and account grouping. Our finance operations analysts are now enabled to dive deep into their customer issues, which is most important to them.

Conclusion

FinOps implemented a data mesh architecture with Lake Formation to improve data governance with fine-grained access controls. With these improvements, the FinOps team is now able to innovate faster with access to the right data at the right time in a self-serve manner to drive business outcomes. The FinOps team will continue to innovate in this space with AWS services to further provide for customer needs.

To learn more about how to use Lake Formation to build a data mesh architecture, see Design a data mesh architecture using AWS Lake Formation and AWS Glue.


About the Authors

Nitin Arora PicNitin Arora is a Sr. Software Development Manager for Finance Automation in Amazon. He has over 18 years of experience building business critical, scalable, high-performance software. Nitin leads several data and analytics initiatives within Finance, which includes building Data Mesh. In his spare time, he enjoys listening to music and read.

Pradeep Misra PicPradeep Misra is a Specialist Solutions Architect at AWS. He works across Amazon to architect and design modern distributed analytics and AI/ML platform solutions. He is passionate about solving customer challenges using data, analytics, and AI/ML. Outside of work, Pradeep likes exploring new places, trying new cuisines, and playing board games with his family. He also likes doing science experiments with his daughters.

Rajesh Rao PicRajesh Rao is a Sr. Technical Program Manager in Amazon Finance. He works with Data Services teams within Amazon to build and deliver data processing and data analytics solutions for Financial Operations teams. He is passionate about delivering innovative and optimal solutions using AWS to enable data-driven business outcomes for his customers.

Andrew Long PicAndrew Long, the lead developer for data mesh, has designed and built many of the big data processing systems that have fueled Amazon’s financial data processing infrastructure. His work encompasses a range of areas, including S3-based table formats for Spark, diverse Spark performance optimizations, distributed orchestration engines and the development of data cataloging systems. Additionally, Andrew finds pleasure in sharing his knowledge of partner acrobatics.

Satyen GauravKumar Satyen Gaurav, is an experienced Software Development Manager at Amazon, with over 16 years of expertise in big data analytics and software development. He leads a team of engineers to build products and services using AWS big data technologies, for providing key business insights for Amazon Finance Operations across diverse business verticals. Beyond work, he finds joy in reading, traveling and learning strategic challenges of chess.

How AWS helped Altron Group accelerate their vision for optimized customer engagement

Post Syndicated from Jason Yung original https://aws.amazon.com/blogs/big-data/how-aws-helped-altron-group-accelerate-their-vision-for-optimized-customer-engagement/

This is a guest post co-authored by Jacques Steyn, Senior Manager Professional Services at Altron Group.

Altron is a pioneer of providing data-driven solutions for their customers by combining technical expertise with in-depth customer understanding to provide highly differentiated technology solutions. Alongside their partner AWS, they participated in AWS Data-Driven Everything (D2E) workshops and a bespoke AWS Immersion Day workshop that catered to their needs to improve their engagement with their customers.

This post discusses the journey that took Altron from their initial goals, to technical implementation, to the business value created from understanding their customers and their unique opportunities better. They were able to think big but start small with a working solution involving rich business intelligence (BI) and insights provided to their key business areas.

Data-Driven Everything engagement

Altron has provided information technology services since 1965 across South Africa, the Middle East, and Australia. Although the group saw strong results at 2022-year end, the region continues to experience challenging operating conditions with global supply chains disrupted, electronic component shortages, and scarcity of IT talent.

To reflect the needs of their customers spread across different geographies and industries, Altron has organized their operating model across individual Operating Companies (OpCos). These are run autonomously with different sales teams, creating siloed operations and engagement with customers and making it difficult to have a holistic and unified sales motion.

To succeed further, their vision of data requires it to be accessible and actionable to all, with key roles and responsibilities defined by those who produce and consume data, as shown in the following figure. This allows for transparency, speed to action, and collaboration across the group while enabling the platform team to evangelize the use of data:

Altron engaged with AWS to seek advice on their data strategy and cloud modernization to bring their vision to fruition. The D2E program was selected to help identify the best way to think big but start small by working collaboratively to ideate on the opportunities to build data as a product, particularly focused on federating customer profile data in an agile and scalable approach.

Amazon mechanisms such as Working Backwards were employed to devise the most delightful and meaningful solution and put customers at the heart of the experience. The workshop helped devise the think big solution that starting with the Systems Integration (SI) OpCo as the first flywheel turn would be the best way to start small and prototype the initial data foundation collaboratively with AWS Solutions Architects.

Preparing for an AWS Immersion Day workshop

The typical preparation of an AWS Immersion Day involves identifying examples of common use case patterns and utilizing demonstration data. To maximize its success, the Immersion Day was stretched across multiple days as a hands-on workshop to enable Altron to bring their own data, build a robust data pipeline, and scale their long-term architecture. In addition, AWS and Altron identified and resolved any external dependencies, such as network connectivity to data sources and targets, where Altron was able to put the connectivity to the sources in place.

Identifying key use cases

After a number of preparation meetings to discuss business and technical aspects of the use case, AWS and Altron identified two uses cases to resolve their two business challenges:

  • Business intelligence for business-to-business accounts – Altron wanted to focus on their business-to-business (B2B) accounts and customer data. In particular, they wanted to enable their account managers, sales executives, and analysts to use actual data and facts to get a 360 view of their accounts.
    • Goals – Grow revenue, increase the conversion ratio of opportunities, reduce the average sales cycle, improve the customer renewal rate.
    • Target – Dashboards to be refreshed on a daily basis that would provide insights on sales, gross profit, sales pipelines, and customers.
  • Data quality for account and customer data – Altron wanted to enable data quality and data governance best practices.
    • Goals – Lay the foundation for a data platform that can be used in the future by internal and external stakeholders.

Conducting the Immersion Day workshop

Altron set aside 4 days for their Immersion Day, during which time AWS had assigned a dedicated Solutions Architect to work alongside them to build the following prototype architecture:

This solution includes the following components:

  1. AWS Glue is a serverless data integration service that makes it simple to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning, and application development. The Altron team created an AWS Glue crawler and configured it to run against Azure SQL to discover its tables. The AWS Glue crawler populates the table definition with its schema in AWS Glue Data Catalog.
  2. This step consists of two components:
    1. A set of AWS Glue PySpark jobs reads the source tables from Azure SQL and outputs the resulting data in the Amazon Simple Storage Service “Raw Zone”. Basic formatting and readability of the data is standardized here. The jobs run on a scheduled basis, according to the upstream data availability (which currently is daily).
    2. Users are able to manually upload reference files (CSV and Excel format) via the Amazon Web Services console directly to the Amazon S3 buckets. Depending on the frequency of upload, the Altron team will consider automated mechanisms and remove manual upload.
  3. The reporting zone is based on a set of Amazon Athena views, which are consumed for BI purposes. The Altron team used Athena to explore the source tables and create the views in SQL language. Depending on the needs, the Altron team will materialize these views or create corresponding AWS Glue jobs.
  4. Athena exposes the content of the reporting zone for consumption.
  5. The content of the reporting zone is ingested via SPICE in Amazon QuickSight. BI users create dashboards and reports in QuickSight. Business users can access QuickSight dashboards from their mobile, thanks to the QuickSight native application, configured to use single sign-on (SSO).
  6. An AWS Step Functions state machine orchestrates the run of the AWS Glue jobs. The Altron team will expand the state machine to include automated refresh of QuickSight SPICE datasets.
  7. To verify the data quality of the sources through statistically-relevant metrics, AWS Glue Data Quality runs data quality tasks on relevant AWS Glue tables. This can be run manually or scheduled via Amazon Eventbridge (Optional).

Generating business outcomes

In 4 days, the Altron SI team left the Immersion Day workshop with the following:

  • A data pipeline ingesting data from 21 sources (SQL tables and files) and combining them into three mastered and harmonized views that are cataloged for Altron’s B2B accounts.
  • A set of QuickSight dashboards to be consumed via browser and mobile.
  • Foundations for a data lake with data governance controls and data quality checks. The datasets used for the workshop originate from different systems; by integrating the datasets during the workshop implementation, the Altron team can have a comprehensive overview of their customers.

Altron’s sales teams are now able to quickly refresh dashboards encompassing previously disparate datasets that are now centralized to get insights about sales pipelines and forecasts on their desktop or mobile. The technical teams are equally adept at adjusting to business needs by autonomously onboarding new data sources and further enriching the user experience and trust in the data.

Conclusion

In this post, we walked you through the journey the Altron team took with AWS. The outcomes to identify the opportunities that were most pressing to Altron, applying a working backward approach and coming up with a best-fit architecture, led to the subsequent AWS Immersion Day to implement a working prototype that helped them become more data-driven.

With their new focus on AWS skills and mechanisms, increasing trust in their internal data, and understanding the importance of driving change in data literacy and mindset, Altron is better set up for success to best serve their customers in the region.

To find out more about how Altron and AWS can help work backward on your data strategy and employ the agile methodologies discussed in this post, check out Data Management. To learn more about how can help you turn your ideas into solutions, visit the D2E website and the series of AWS Immersion Days that you can choose from. For more hands-on bespoke options, contact your AWS Account Manager, who can provide more details.

Special thanks to everyone at Altron Group who helped contribute to the success of the D2E and Build Lab workshops:

  • The Analysts (Liesl Kok, Carmen Kotze)
  • Data Engineers (Banele Ngemntu, James Owen, Andrew Corry, Thembelani Mdlankomo)
  • QuickSight BI Developers (Ricardo De Gavino Dias, Simei Antoniades)
  • Cloud Administrator (Shamiel Galant)

About the authors

Jacques Steyn runs the Altron Data Analytics Professional Services. He has been leading the building of data warehouses and analytic solutions for the past 20 years. In his free time, he spends time with his family, whether it be on the golf , walking in the mountains, or camping in South Africa, Botswana, and Namibia.

Jason Yung is a Principal Analytics Specialist with Amazon Web Services. Working with Senior Executives across the Europe and Asia-Pacific Regions, he helps customers become data-driven by understanding their use cases and articulating business value through Amazon mechanisms. In his free time, he spends time looking after a very active 1-year-old daughter, alongside juggling geeky activities with respectable hobbies such as cooking sub-par food.

Michele Lamarca is a Senior Solutions Architect with Amazon Web Services. He helps architect and run Solutions Accelerators in Europe to enable customers to become hands-on with AWS services and build prototypes quickly to release the value of data in the organization. In his free time, he reads books and tries (hopelessly) to improve his jazz piano skills.

Hamza is a Specialist Solutions Architect with Amazon Web Services. He runs Solutions Accelerators in EMEA regions to help customers accelerate their journey to move from an idea into a solution in production. In his free time, he spends time with his family, meets with friends, swims in the municipal swimming pool, and learns new skills.

With a zero-ETL approach, AWS is helping builders realize near-real-time analytics

Post Syndicated from Swami Sivasubramanian original https://aws.amazon.com/blogs/big-data/realizing-near-real-time-analytics-with-a-zero-etl-future/

Data is at the center of every application, process, and business decision. When data is used to improve customer experiences and drive innovation, it can lead to business growth. According to Forrester, advanced insights-driven businesses are 8.5 times more likely than beginners to report at least 20% revenue growth. However, to realize this growth, managing and preparing the data for analysis has to get easier.

That’s why AWS is investing in a zero-ETL future so that builders can focus more on creating value from data, instead of preparing data for analysis.

Challenges with ETL

What is ETL? Extract, Transform, Load is the process data engineers use to combine data from different sources. ETL can be challenging, time-consuming, and costly. Firstly, it invariably requires data engineers to create custom code. Next, DevOps engineers have to deploy and manage the infrastructure to make sure the pipelines scale with the workload. In case the data sources change, data engineers have to manually make changes in their code and deploy it again. While all of this is happening—a process that can take days—data analysts can’t run interactive analysis or build dashboards, data scientists can’t build machine learning (ML) models or run predictions, and end-users can’t make data-driven decisions.

Furthermore, the time required to build or change pipelines makes the data unfit for near-real-time use cases such as detecting fraudulent transactions, placing online ads, and tracking passenger train schedules. In these scenarios, the opportunity to improve customer experiences, address new business opportunities, or lower business risks can simply be lost.

On the flip side, when organizations can quickly and seamlessly integrate data that is stored and analyzed in different tools and systems, they get a better understanding of their customers and business. As a result, they can make data-driven predictions with more confidence, improve customer experiences, and promote data-driven insights across the business.

AWS is bringing its zero-ETL vision to life

We have been making steady progress towards bringing our zero-ETL vision to life. For example, customers told us that they want to ingest streaming data into their data stores for doing analytics—all without delving into the complexities of ETL.

With Amazon Redshift Streaming Ingestion, organizations can configure Amazon Redshift to directly ingest high-throughput streaming data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Amazon Kinesis Data Streams and make it available for near-real-time analytics in just a few seconds. They can connect to multiple data streams and pull data directly into Amazon Redshift without staging it in Amazon Simple Storage Service (Amazon S3). After running analytics, the insights can be made available broadly across the organization with Amazon QuickSight, a cloud-native, serverless business intelligence service. QuickSight makes it incredibly simple and intuitive to get to answers with Amazon QuickSight Q, which allows users to ask business questions about their data in natural language and receive answers quickly through data visualizations.

Another example of AWS’s investment in zero-ETL is providing the ability to query a variety of data sources without having to worry about data movement. Using federated query in Amazon Redshift and Amazon Athena, organizations can run queries across data stored in their operational databases, data warehouses, and data lakes so that they can create insights from across multiple data sources with no data movement. Data analysts and data engineers can use familiar SQL commands to join data across several data sources for quick analysis, and store the results in Amazon S3 for subsequent use. This provides a flexible way to ingest data while avoiding complex ETL pipelines.

More recently, AWS introduced Amazon Aurora zero-ETL integration with Amazon Redshift at AWS re:Invent 2022. Check out the following video:

We learned from customers that they spend significant time and resources building and managing ETL pipelines between transactional databases and data warehouses. For example, let’s say a global manufacturing company with factories in a dozen countries uses a cluster of Aurora databases to store order and inventory data in each of those countries. When the company executives want to view all of the orders and inventory, the data engineers would have to build individual data pipelines from each of the Aurora clusters to a central data warehouse so that the data analysts can query the combined dataset. To do this, the data integration team has to write code to connect to 12 different clusters and manage and test 12 production pipelines. After the team deploys the code, it has to constantly monitor and scale the pipelines to optimize performance, and when anything changes, they have to make updates across 12 different places. It is quite a lot of repetitive work.

No more need for custom ETL pipelines between Aurora and Amazon Redshift

The Aurora zero-ETL integration with Amazon Redshift brings together the transactional data of Aurora with the analytics capabilities of Amazon Redshift. It minimizes the work of building and managing custom ETL pipelines between Aurora and Amazon Redshift. Unlike the traditional systems where data is siloed in one database and the user has to make a trade-off between unified analysis and performance, data engineers can replicate data from multiple Aurora database clusters into the same or new Amazon Redshift instance to derive holistic insights across many applications or partitions. Updates in Aurora are automatically and continuously propagated to Amazon Redshift so the data engineers have the most recent information in near-real time. The entire system can be serverless and dynamically scales up and down based on data volume, so there’s no infrastructure to manage. Now, organizations get the best of both worlds—fast, scalable transactions in Aurora together with fast, scalable analytics in Amazon Redshift—all in one seamless system. With near-real-time access to transactional data, organizations can leverage Amazon Redshift’s analytics and capabilities such as built-in ML, materialized views, data sharing, and federated access to multiple data stores and data lakes to derive insights from transactional and other data.

Improving the zero-ETL performance is a continuous goal for AWS. For instance, one of our early zero-ETL preview customers observed that the hundreds of thousands of transactions produced every minute from their Amazon Aurora MySQL databases appeared in less than 10 seconds into their Amazon Redshift warehouse. Previously, they had more than a 2-hour delay moving data from their ETL pipeline into Amazon Redshift. With the zero-ETL integration between Aurora and Redshift, they are now able to achieve near-real time analytics.

This integration is now available in Public Preview. To learn more, refer to Getting started guide for near-real-time analytics using Amazon Aurora zero-ETL integration with Amazon Redshift.

Zero-ETL makes data available to data engineers at the point of use through direct integrations between services and direct querying across a variety of data stores. This frees the data engineers to focus on creating value from the data, instead of spending time and resources building pipelines. AWS will continue investing in its zero-ETL vision so that organizations can accelerate their use of data to drive business growth.


About the Author

Swami Sivasubramanian is the Vice President of AWS Data and Machine Learning.

Choosing an open table format for your transactional data lake on AWS

Post Syndicated from Shana Schipers original https://aws.amazon.com/blogs/big-data/choosing-an-open-table-format-for-your-transactional-data-lake-on-aws/

A modern data architecture enables companies to ingest virtually any type of data through automated pipelines into a data lake, which provides highly durable and cost-effective object storage at petabyte or exabyte scale. This data is then projected into analytics services such as data warehouses, search systems, stream processors, query editors, notebooks, and machine learning (ML) models through direct access, real-time, and batch workflows. Data in customers’ data lakes is used to fulfil a multitude of use cases, from real-time fraud detection for financial services companies, inventory and real-time marketing campaigns for retailers, or flight and hotel room availability for the hospitality industry. Across all use cases, permissions, data governance, and data protection are table stakes, and customers require a high level of control over data security, encryption, and lifecycle management.

This post shows how open-source transactional table formats (or open table formats) can help you solve advanced use cases around performance, cost, governance, and privacy in your data lakes. We also provide insights into the features and capabilities of the most common open table formats available to support various use cases.

You can use this post for guidance when looking to select an open table format for your data lake workloads, facilitating the decision-making process and potentially narrowing down the available options. The content of this post is based on the latest open-source releases of the reviewed formats at the time of writing: Apache Hudi v0.13.0, Apache Iceberg 1.2.0, and Delta Lake 2.3.0.

Advanced use cases in modern data lakes

Data lakes offer one of the best options for cost, scalability, and flexibility to store data, allowing you to retain large volumes of structured and unstructured data at a low cost, and to use this data for different types of analytics workloads—from business intelligence reporting to big data processing, real-time analytics, and ML—to help guide better decisions.

Despite these capabilities, data lakes are not databases, and object storage does not provide support for ACID processing semantics, which you may require to effectively optimize and manage your data at scale across hundreds or thousands of users using a multitude of different technologies. For example:

  • Performing efficient record-level updates and deletes as data changes in your business
  • Managing query performance as tables grow to millions of files and hundreds of thousands of partitions
  • Ensuring data consistency across multiple concurrent writers and readers
  • Preventing data corruption from write operations failing partway through
  • Evolving table schemas over time without (partially) rewriting datasets

These challenges have become particularly prevalent in use cases such as CDC (change data capture) from relational database sources, privacy regulations requiring deletion of data, and streaming data ingestion, which can result in many small files. Typical data lake file formats such as CSV, JSON, Parquet, or Orc only allow for writes of entire files, making the aforementioned requirements hard to implement, time consuming, and costly.

To help overcome these challenges, open table formats provide additional database-like functionality that simplifies the optimization and management overhead of data lakes, while still supporting storage on cost-effective systems like Amazon Simple Storage Service (Amazon S3). These features include:

  • ACID transactions – Allowing a write to completely succeed or be rolled back in its entirety
  • Record-level operations – Allowing for single rows to be inserted, updated, or deleted
  • Indexes – Improving performance in addition to data lake techniques like partitioning
  • Concurrency control – Allowing for multiple processes to read and write the same data at the same time
  • Schema evolution – Allowing for columns of a table to be added or modified over the life of a table
  • Time travel – Enabling you to query data as of a point in time in the past

In general, open table formats implement these features by storing multiple versions of a single record across many underlying files, and use a tracking and indexing mechanism that allows an analytics engine to see or modify the correct version of the records they are accessing. When records are updated or deleted, the changed information is stored in new files, and the files for a given record are retrieved during an operation, which is then reconciled by the open table format software. This is a powerful architecture that is used in many transactional systems, but in data lakes, this can have some side effects that have to be addressed to help you align with performance and compliance requirements. For instance, when data is deleted from an open table format, in some cases only a delete marker is stored, with the original data retained until a compaction or vacuum operation is performed, which performs a hard deletion. For updates, previous versions of the old values of a record may be retained until a similar process is run. This can mean that data that should be deleted isn’t, or that you store a significantly larger number of files than you intend to, increasing storage cost and slowing down read performance. Regular compaction and vacuuming must be run, either as part of the way the open table format works, or separately as a maintenance procedure.

The three most common and prevalent open table formats are Apache Hudi, Apache Iceberg, and Delta Lake. AWS supports all three of these open table formats, and in this post, we review the features and capabilities of each, how they can be used to implement the most common transactional data lake use cases, and which features and capabilities are available in AWS’s analytics services. Innovation around these table formats is happening at an extremely rapid pace, and there are likely preview or beta features available in these file formats that aren’t covered here. All due care has been taken to provide the correct information as of time of writing, but we also expect this information to change quickly, and we’ll update this post frequently to contain the most accurate information. Also, this post focuses only on the open-source versions of the covered table formats, and doesn’t speak to extensions or proprietary features available from individual third-party vendors.

How to use this post

We encourage you to use the high-level guidance in this post with the mapping of functional fit and supported integrations for your use cases. Combine both aspects to identify what table format is likely a good fit for a specific use case, and then prioritize your proof of concept efforts accordingly. Most organizations have a variety of workloads that can benefit from an open table format, but today no single table format is a “one size fits all.” You may wish to select a specific open table format on a case-by-case basis to get the best performance and features for your requirements, or you may wish to standardize on a single format and understand the trade-offs that you may encounter as your use cases evolve.

This post doesn’t promote a single table format for any given use case. The functional evaluations are only intended to help speed up your decision-making process by highlighting key features and attention points for each table format with each use case. It is crucial that you perform testing to ensure that a table format meets your specific use case requirements.

This post is not intended to provide detailed technical guidance (e.g. best practices) or benchmarking of each of the specific file formats, which are available in AWS Technical Guides and benchmarks from the open-source community respectively.

Choosing an open table format

When choosing an open table format for your data lake, we believe that there are two critical aspects that should be evaluated:

  • Functional fit – Does the table format offer the features required to efficiently implement your use case with the required performance? Although they all offer common features, each table format has a different underlying technical design and may support unique features. Each format can handle a range of use cases, but they also offer specific advantages or trade-offs, and may be more efficient in certain scenarios as a result of its design.
  • Supported integrations – Does­ the table format integrate seamlessly with your data environment? When evaluating a table format, it’s important to consider supported engine integrations on dimensions such as support for reads/writes, data catalog integration, supported access control tools, and so on that you have in your organization. This applies to both integration with AWS services and with third-party tools.

General features and considerations

The following table summarizes general features and considerations for each file format that you may want to take into account, regardless of your use case. In addition to this, it is also important to take into account other aspects such as the complexity of the table format and in-house skills.

. Apache Hudi Apache Iceberg Delta Lake
Primary API
  • Spark DataFrame
  • SQL
  • Spark DataFrame
Write modes
  • Copy On Write approach only
Supported data file formats
  • Parquet
  • ORC
  • HFile
  • Parquet
  • ORC
  • Avro
  • Parquet
File layout management
  • Compaction to reorganize data (sort) and merge small files together
Query optimization
S3 optimizations
  • Metadata reduces file listing operations
Table maintenance
  • Automatic within writer
  • Separate processes
  • Separate processes
  • Separate processes
Time travel
Schema evolution
Operations
  • Hudi CLI for table management, troubleshooting, and table inspection
  • No out-of-the-box options
Monitoring
  • No out-of-the-box options that are integrated with AWS services
  • No out-of-the-box options that are integrated with AWS services
Data Encryption
  • Server-side encryption on Amazon S3 supported
  • Server-side encryption on Amazon S3 supported
Configuration Options
  • High configurability:

Extensive configuration options for customizing read/write behavior (such as index type or merge logic) and automatically performed maintenance and optimizations (such as file sizing, compaction, and cleaning)

  • Medium configurability:

Configuration options for basic read/write behavior (Merge On Read or Copy On Write operation modes)

  • Low configurability:

Limited configuration options for table properties (for example, indexed columns)

Other
  • Savepoints allow you to restore tables to a previous version without having to retain the entire history of files
  • Iceberg supports S3 Access Points in Spark, allowing you to implement failover across AWS Regions using a combination of S3 access points, S3 cross-Region replication, and the Iceberg Register Table API
  • Shallow clones allow you to efficiently run tests or experiments on Delta tables in production, without creating copies of the dataset or affecting the original table.
AWS Analytics Services Support*
Amazon EMR Read and write Read and write Read and write
AWS Glue Read and write Read and write Read and write
Amazon Athena (SQL) Read Read and write Read
Amazon Redshift (Spectrum) Read Currently not supported Read
AWS Glue Data Catalog Yes Yes Yes

* For table format support in third-party tools, consult the official documentation for the respective tool.
Amazon Redshift only supports Delta Symlink tables (see Creating external tables for data managed in Delta Lake for more information).
Refer to Working with other AWS services in the Lake Formation documentation for an overview of table format support when using Lake Formation with other AWS services.

Functional fit for common use cases

Now let’s dive deep into specific use cases to understand the capabilities of each open table format.

Getting data into your data lake

In this section, we discuss the capabilities of each open table format for streaming ingestion, batch load and change data capture (CDC) use cases.

Streaming ingestion

Streaming ingestion allows you to write changes from a queue, topic, or stream into your data lake. Although your specific requirements may vary based on the type of use case, streaming data ingestion typically requires the following features:

  • Low-latency writes – Supporting record-level inserts, updates, and deletes, for example to support late-arriving data
  • File size management – Enabling you to create files that are sized for optimal read performance (rather than creating one or more files per streaming batch, which can result in millions of tiny files)
  • Support for concurrent readers and writers – Including schema changes and table maintenance
  • Automatic table management services – Enabling you to maintain consistent read performance

In this section, we talk about streaming ingestion where records are just inserted into files, and you aren’t trying to update or delete previous records based on changes. A typical example of this is time series data (for example sensor readings), where each event is added as a new record to the dataset. The following table summarizes the features.

. Apache Hudi Apache Iceberg Delta Lake
Functional fit
Considerations Hudi’s default configurations are tailored for upserts, and need to be tuned for append-only streaming workloads. For example, Hudi’s automatic file sizing in the writer minimizes operational effort/complexity required to maintain read performance over time, but can add a performance overhead at write time. If write speed is of critical importance, it can be beneficial to turn off Hudi’s file sizing, write new data files for each batch (or micro-batch), then run clustering later to create better sized files for read performance (using a similar approach as Iceberg or Delta).
  • Iceberg doesn’t optimize file sizes or run automatic table services (for example, compaction or clustering) when writing, so streaming ingestion will create many small data and metadata files. Frequent table maintenance needs to be performed to prevent read performance from degrading over time.
  • Delta doesn’t optimize file sizes or run automatic table services (for example, compaction or clustering) when writing, so streaming ingestion will create many small data and metadata files. Frequent table maintenance needs to be performed to prevent read performance from degrading over time.
Supported AWS integrations
  • Amazon EMR (Spark Structured Streaming (streaming sink and forEachBatch), Flink, Hudi DeltaStreamer)
  • AWS Glue (Spark Structured Streaming (streaming sink and forEachBatch), Hudi DeltaStreamer)
  • Amazon Kinesis Data Analytics
  • Amazon Managed Streaming for Apache Kafka (MSK Connect)
  • Amazon EMR (Spark Structured Streaming (only forEachBatch), Flink)
  • AWS Glue (Spark Structured Streaming (only forEachBatch))
  • Amazon Kinesis Data Analytics
Conclusion Good functional fit for all append-only streaming when configuration tuning for append-only workloads is acceptable. Good fit for append-only streaming with larger micro-batch windows, and when operational overhead of table management is acceptable. Good fit for append-only streaming with larger micro-batch windows, and when operational overhead of table management is acceptable.

When streaming data with updates and deletes into a data lake, a key priority is to have fast upserts and deletes by being able to efficiently identify impacted files to be updated.

. Apache Hudi Apache Iceberg Delta Lake
Functional fit
  • Iceberg offers a Merge On Read strategy to enable fast writes.
  • Streaming upserts into Iceberg tables are natively supported with Flink, and Spark can implement streaming ingestion with updates and deletes using a micro-batch approach with MERGE INTO.
  • Using column statistics, Iceberg offers efficient updates on tables that are sorted on a “key” column.
  • Streaming ingestion with updates and deletes into OSS Delta Lake tables can be implemented using a micro-batch approach with MERGE INTO.
  • Using data skipping with column statistics, Delta offers efficient updates on tables that are sorted on a “key” column.
Considerations
  • Hudi’s automatic optimizations in the writer (for example, file sizing) add performance overhead at write time.
  • Reading from Merge On Read tables is generally slower than Copy On Write tables due to log files. Frequent compaction can be used to optimize read performance.
  • Iceberg uses a MERGE INTO approach (a join) for upserting data. This is more resource intensive and less performant for streaming data ingestion with frequent commits on (large unsorted) tables, because full table or partition scans would be performed on unsorted tables.
  • Iceberg does not optimize file sizes or run automatic table services (for example, compaction) when writing, so streaming ingestion will create many small data and metadata files. Frequent table maintenance needs to be performed to prevent read performance from degrading over time.
  • Reading from tables using the Merge On Read approach is generally slower than tables using only the Copy On Write approach due to delete files. Frequent compaction can be used to optimize read performance.
  • Iceberg Merge On Read currently does not support dynamic file pruning using its column statistics during merges and updates. This has impact on write performance, resulting in full table joins.
  • Delta uses a Copy On Write strategy that is not optimized for fast (streaming) writes, as it rewrites entire files for record updates.
  • Delta uses a MERGE INTO approach (a join). This is more resource intensive (less performant) and not suited for streaming data ingestion with frequent commits on large unsorted tables, because full table or partition scans would be performed on unsorted tables.
  • No auto file sizing is performed; separate table management processes are required (which can impact writes).
Supported AWS integrations
  • Amazon EMR (Spark Structured Streaming (streaming sink and forEachBatch), Flink, Hudi DeltaStreamer)
  • AWS Glue (Spark Structured Streaming (streaming sink and forEachBatch), Hudi DeltaStreamer)
  • Amazon Kinesis Data Analytics
  • Amazon Managed Streaming for Apache Kafka (MSK Connect)
  • Amazon EMR (Spark Structured Streaming (only forEachBatch), Flink)
  • Amazon Kinesis Data Analytics
  • Amazon EMR (Spark Structured Streaming (only forEachBatch))
  • AWS Glue (Spark Structured Streaming (only forEachBatch))
  • Amazon Kinesis Data Analytics
Conclusion Good fit for lower-latency streaming with updates and deletes thanks to native support for streaming upserts, indexes for upserts, and automatic file sizing and compaction. Good fit for streaming with larger micro-batch windows and when the operational overhead of table management is acceptable. Can be used for streaming data ingestion with updates/deletes if latency is not a concern, because a Copy-On-Write strategy may not deliver the write performance required by low latency streaming use cases.

Change data capture

Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream process or system—in this case, delivering CDC data from databases into Amazon S3.

In addition to the aforementioned general streaming requirements, the following are key requirements for efficient CDC processing:

  • Efficient record-level updates and deletes – With the ability to efficiently identify files to be modified (which is important to support late-arriving data).
  • Native support for CDC – With the following options:
  • CDC record support in the table format – The table format understands how to process CDC-generated records and no custom preprocessing is required for writing CDC records to the table.
  • CDC tools natively supporting the table format – CDC tools understand how to process CDC-generated records and apply them to the target tables. In this case, the CDC engine writes to the target table without another engine in between.

Without support for the two CDC options, processing and applying CDC records correctly into a target table will require custom code. With a CDC engine, each tool likely has its own CDC record format (or payload). For example, Debezium and AWS Database Migration Service (AWS DMS) each have their own specific record formats, and need to be transformed differently. This must be considered when you are operating CDC at scale across many tables.

All three table formats allow you to implement CDC from a source database into a target table. The difference for CDC with each format lies mainly in the ease of implementing CDC pipelines and supported integrations.

. Apache Hudi Apache Iceberg Delta Lake
Functional fit
  • Hudi’s DeltaStreamer utility provides a no-code/low-code option to efficiently ingest CDC records from different sources into Hudi tables.
  • Upserts using indexes allow you to quickly identify the target files for updates, without having to perform a full table join.
  • Unique record keys and deduplication natively enforce source databases’ primary keys and prevent duplicates in the data lake.
  • Out of order records are handled via the pre-combine feature.
  • Native support (through record payload formats) is offered for CDC formats like AWS DMS and Debezium, eliminating the need to write custom CDC preprocessing logic in the writer application to correctly interpret and apply CDC records to the target table. Writing CDC records to Hudi tables is as simple as writing any other records to a Hudi table.
  • Partial updates are supported, so the CDC payload format does not need to include all record columns.
  • Flink CDC is the most convenient way to set up CDC from downstream data sources into Iceberg tables. It supports upsert mode and can interpret CDC formats such as Debezium natively.
  • Using column statistics, Iceberg offers efficient updates on tables that are sorted on a “key” column.
  • CDC into Delta tables can be implemented using third-party tools or using Spark with custom processing logic.
  • Using data skipping with column statistics, Delta offers efficient updates on tables that are sorted on a “key” column.
Considerations
  • Natively supported payload formats can be found in the Hudi code repo. For other formats, consider creating a custom payload or adding custom logic to the writer application to correctly process and apply CDC records of that format to target Hudi tables.
  • Iceberg uses a MERGE INTO approach (a join) for upserting data. This is more resource intensive and less performant, particularly on large unsorted tables where a MERGE INTO operation could require a full table scan.
  • Regular compaction should be implemented to maintain sort order over time in order to prevent MERGE INTO performance degrading.
  • Iceberg has no native support for CDC payload formats (for example, AWS DMS or Debezium). When using other engines than Flink CDC (such as Spark), custom logic needs to be added to the writer application in order to correctly process and apply CDC records to target Iceberg tables (for example, deduplication or ordering based on operation).
  • Deduplication to enforce primary key constraints needs to be handled in the Iceberg writer application.
  • No support for out of order records handling.
  • Delta does not use indexes for upserts, but uses a MERGE INTO approach instead (a join). This is more resource intensive and less performant on large unsorted tables because those would require full table or partition scans.
  • Regular clustering should be implemented to maintain sort order over time in order to prevent MERGE INTO performance degrading.
  • Delta Lake has no native support for CDC payload formats (for example, AWS DMS or Debezium). When using Spark for ingestion, custom logic needs to be added to the writer application in order to correctly process and apply CDC records to target Delta tables (for example, deduplication or ordering based on operation).
  • Record updates on unsorted Delta tables results in full table or partition scans
  • No support for out of order records handling.
Natively supported CDC formats
  • AWS DMS
  • Debezium
  • None
  • None
CDC tool integrations
  • DeltaStreamer
  • Flink CDC
  • Debezium
  • Flink CDC
  • Debezium
  • Debezium
Conclusion All three formats can implement CDC workloads. Apache Hudi offers the best overall technical fit for CDC workloads as well as the most options for efficient CDC pipeline design: no-code/low-code with DeltaStreamer, third-party CDC tools offering native Hudi integration, or a Spark/Flink engine using CDC record payloads offered in Hudi.

Batch loads

If your use case requires only periodic writes but frequent reads, you may want to use batch loads and optimize for read performance.

Batch loading data with updates and deletes is perhaps the simplest use case to implement with any of the three table formats. Batch loads typically don’t require low latency, allowing them to benefit from the operational simplicity of a Copy On Write strategy. With Copy On Write, data files are rewritten to apply updates and add new records, minimizing the complexity of having to run compaction or optimization table services on the table.

. Apache Hudi Apache Iceberg Delta Lake
Functional fit
  • Copy On Write is supported.
  • Automatic file sizing while writing is supported, including optimizing previously written small files by adding new records to them.
  • Multiple index types are provided to optimize update performance for different workload patterns.
  • Copy On Write is supported.
  • File size management is performed within each incoming data batch (but it is not possible to optimize previously written data files by adding new records to them).
  • Copy On Write is supported.
  • File size can be indirectly managed within each data batch by setting the max number of records per file (but it is not possible to optimize previously written data files by adding new records to them).
Considerations
  • Configuring Hudi according to your workload pattern is imperative for good performance (see Apache Hudi on AWS for guidance).
  • Data deduplication needs to be handled in the writer application.
  • If a single data batch does not contain sufficient data to reach a target file size, compaction can be performed to merge smaller files together afterwards.
  • Ensuring data is sorted on a “key” column is imperative for good update performance. Regular sorting compaction should be considered to maintain sorted data over time.
  • Data deduplication needs to be handled in the writer application.
  • If a single data batch does not contain sufficient data to reach a target file size, compaction can be performed to merge smaller files together afterwards.
  • Ensuring data is sorted on a “key” column is imperative for good update performance. Regular clustering should be considered to maintain sorted data over time.
Supported AWS integrations
  • Amazon EMR (Spark)
  • AWS Glue (Spark)
  • Amazon EMR (Spark, Presto, Trino, Hive)
  • AWS Glue (Spark)
  • Amazon Athena (SQL)
  • Amazon EMR (Spark, Trino)
  • AWS Glue (Spark)
Conclusion All three formats are well suited for batch loads. Apache Hudi supports the most configuration options and may increase the effort to get started, but provides lower operational effort due to automatic table management. On the other hand, Iceberg and Delta are simpler to get started with, but require some operational overhead for table maintenance.

Working with open table formats

In this section, we discuss the capabilities of each open table format for common use cases when working with open table formats: optimizing read performance, incremental data processing and processing deletes to comply with privacy regulations.

Optimizing read performance

The preceding sections primarily focused on write performance for specific use cases. Now let’s explore how each open table format can support optimal read performance. Although there are some cases where data is optimized purely for writes, read performance is typically a very important dimension on which you should evaluate an open table format.

Open table format features that improve query performance include the following:

  • Indexes, (column) statistics, and other metadata – Improves query planning and file pruning, resulting in reduced data scanned
  • File layout optimization – Enables query performance:
  • File size management – Properly sized files provide better query performance
  • Data colocation (through clustering) according to query patterns – Reduces the amount of data scanned by queries
. Apache Hudi Apache Iceberg Delta Lake
Functional fit
  • Auto file sizing when writing results in good file sizes for read performance. On Merge On Read tables, automatic compaction and clustering improves read performance.
  • Metadata tables eliminate slow S3 file listing operations. Column statistics in the metadata table can be used for better file pruning in query planning (data skipping feature).
  • Clustering data for better data colocation with hierarchical sorting or z-ordering.
  • Hidden partitioning prevents unintentional full table scans by users, without requiring them to specify partition columns explicitly.
  • Column and partition statistics in manifest files speed up query planning and file pruning, and eliminate S3 file listing operations.
  • Optimized file layout for S3 object storage using random prefixes is supported, which minimizes chances of S3 throttling.
  • Clustering data for better data colocation with hierarchical sorting or z-ordering.
  • File size can be indirectly managed within each data batch by setting the max number of records per file (but not optimizing previously written data files by adding new records to existing files).
  • Generated columns avoid full table scans.
  • Data skipping is automatically used in Spark.
  • Clustering data for better data colocation using z-ordering.
Considerations
  • Data skipping using metadata column stats has to be supported in the query engine (currently only in Apache Spark).
  • Snapshot queries on Merge On Read tables have higher query latencies than on Copy On Write tables. This latency impact can be reduced by increasing the compaction frequency.
  • Separate table maintenance needs to be performed to maintain read performance over time.
  • Reading from tables using a Merge On Read approach is generally slower than tables using only a Copy On Write approach due to delete files. Frequent compaction can be used to optimize read performance.
  • Currently, only Apache Spark can use data skipping.
  • Separate table maintenance needs to be performed to maintain read performance over time.
Optimization & Maintenance Processes
  • Compaction of log files in Merge On Read tables can be run as part of the writing application or as a separate job using Spark on Amazon EMR or AWS Glue. Compaction does not interfere with other jobs or queries.
  • Clustering runs as part of the writing application or in a separate job using Spark on Amazon EMR or AWS Glue because clustering can interfere with other transactions.
  • See Apache Hudi on AWS for guidance.
  • Compaction API in Delta Lake can group small files or cluster data, and it can interfere with other transactions.
  • This process has to be scheduled separately by the user on a time or event basis.
  • Spark can be used to perform compaction in services like Amazon EMR or AWS Glue.
Conclusion For achieving good read performance, it’s important that your query engine supports the optimization features offered by the table formats. When using Spark, all three formats provide good read performance when properly configured. When using Trino (and therefore Athena as well), Iceberg will likely provide better query performance because the data skipping feature of Hudi and Delta is not supported in the Trino engine. Make sure to evaluate this feature support for your query engine of choice.

Incremental processing of data on the data lake

At a high level, incremental data processing is the movement of new or fresh data from a source to a destination. To implement incremental extract, transform, and load (ETL) workloads efficiently, we need to be able to retrieve only the data records that have been changed or added since a certain point in time (incrementally) so we don’t need to reprocess unnecessary data (such as entire partitions). When your data source is an open table format table, we can take advantage of incremental queries to facilitate more efficient reads in these table formats.

. Apache Hudi Apache Iceberg Delta Lake
Functional fit
  • Full incremental pipelines can be built using Hudi’s incremental queries, which capture record-level changes on a Hudi table (including updates and deletes) without the need to store and manage change data records.
  • Hudi’s DeltaStreamer utility offers simple no-code/low-code options to build incremental Hudi pipelines.
  • Iceberg incremental queries can only read new records (no updates) from upstream Iceberg tables and replicate to downstream tables.
  • Incremental pipelines with record-level changes (including updates and deletes) can be implemented using the changelog view procedure.
  • Full incremental pipelines can be built using Delta’s Change Data Feed (CDF) feature, which captures record-level changes (including updates and deletes) using change data records.
Considerations
  • ETL engine used needs to support Hudi’s incremental query type.
  • A view has to be created to incrementally read data between two table snapshots containing updates and deletes.
  • A new view has to be created (or recreated) for reading changes from new snapshots.
  • Record-level changes can only be captured from the moment CDF is turned on.
  • CDF stores change data records on storage, so a storage overhead is incurred and lifecycle management and cleaning of change data records is required.
Supported AWS integrations Incremental queries are supported in:

  • Amazon EMR (Spark, Flink, Hive, Hudi DeltaStreamer)
  • AWS Glue (Spark, Hudi DeltaStreamer)
  • Amazon Kinesis Data Analytics
Incremental queries supported in:

  • Amazon EMR (Spark, Flink)
  • AWS Glue (Spark)
  • Amazon Kinesis Data Analytics

CDC view supported in:

  • Amazon EMR (Spark)
  • AWS Glue (Spark)
CDF supported in:

  • Amazon EMR (Spark)
  • AWS Glue (Spark)
Conclusion Best functional fit for incremental ETL pipelines using a variety of engines, without any storage overhead. Good fit for implementing incremental pipelines using Spark if the overhead of creating views is acceptable. Good fit for implementing incremental pipelines using Spark if the additional storage overhead is acceptable.

Processing deletes to comply with privacy regulations

Due to privacy regulations like the General Data Protection Regulation (GDPR) and California Consumer Privacy Act (CCPA), companies across many industries need to perform record-level deletes on their data lake for “right to be forgotten” or to correctly store changes to consent on how their customers’ data can be used.

The ability to perform record-level deletes without rewriting entire (or large parts of) datasets is the main requirement for this use case. For compliance regulations, it’s important to perform hard deletes (deleting records from the table and physically removing them from Amazon S3).

. Apache Hudi Apache Iceberg Delta Lake
Functional fit Hard deletes are performed by Hudi’s automatic cleaner service. Hard deletes can be implemented as a separate process. Hard deletes can be implemented as a separate process.
Considerations Hudi cleaner needs to be configured according to compliance requirements to automatically remove older file versions in time (within a compliance window), otherwise time travel or rollback operations could recover deleted records. Previous snapshots need to be (manually) expired after the delete operation, otherwise time travel operations could recover deleted records. The vacuum operation needs to be run after the delete, otherwise time travel operations could recover deleted records.
Conclusion This use case can be implemented using all three formats, and in each case, you must ensure that your configuration or background pipelines implement the cleanup procedures required to meet your data retention requirements.

Conclusion

Today, no single table format is the best fit for all use cases, and each format has its own unique strengths for specific requirements. It’s important to determine which requirements and use cases are most crucial and select the table format that best meets those needs.

To speed up the selection process of the right table format for your workload, we recommend the following actions:

  • Identify what table format is likely a good fit for your workload using the high-level guidance provided in this post
  • Perform a proof of concept with the identified table format from the previous step to validate its fit for your specific workload and requirements

Keep in mind that these open table formats are open source and rapidly evolve with new features and enhanced or new integrations, so it can be valuable to also take into consideration product roadmaps when deciding on the format for your workloads.

AWS will continue to innovate on behalf of our customers to support these powerful file formats and to help you be successful with your advanced use cases for analytics in the cloud. For more support on building transactional data lakes on AWS, get in touch with your AWS Account Team, AWS Support, or review the following resources:


About the Authors

Shana Schipers is an Analytics Specialist Solutions Architect at AWS, focusing on big data. She supports customers worldwide in building transactional data lakes using open table formats like Apache Hudi, Apache Iceberg and Delta Lake on AWS.

Ian Meyers is a Director of Product Management for AWS Analytics Services. He works with many of AWS largest customers on emerging technology needs, and leads several data and analytics initiatives within AWS including support for Data Mesh.


Carlos Rodrigues is a Big Data Specialist Solutions Architect at AWS. He helps customers worldwide building transactional data lakes on AWS using open table formats like Apache Hudi and Apache Iceberg.

How Cargotec uses metadata replication to enable cross-account data sharing

Post Syndicated from Sumesh M R original https://aws.amazon.com/blogs/big-data/how-cargotec-uses-metadata-replication-to-enable-cross-account-data-sharing/

This is a guest blog post co-written with Sumesh M R from Cargotec and Tero Karttunen from Knowit Finland.

Cargotec (Nasdaq Helsinki: CGCBV) is a Finnish company that specializes in cargo handling solutions and services. They are headquartered in Helsinki, Finland, and operates globally in over 100 countries. With its leading cargo handling solutions and services, they are pioneers in their field. Through their unique position in ports, at sea, and on roads, they optimize global cargo flows and create sustainable customer value.

Cargotec captures terabytes of IoT telemetry data from their machinery operated by numerous customers across the globe. This data needs to be ingested into a data lake, transformed, and made available for analytics, machine learning (ML), and visualization. For this, Cargotec built an Amazon Simple Storage Service (Amazon S3) data lake and cataloged the data assets in AWS Glue Data Catalog. They chose AWS Glue as their preferred data integration tool due to its serverless nature, low maintenance, ability to control compute resources in advance, and scale when needed.

In this blog, we discuss the technical challenges faced by Cargotec in replicating their AWS Glue metadata across AWS accounts, and how they navigated these challenges successfully to enable cross-account data sharing.  By sharing their story, we hope to inspire readers facing similar challenges and provide insights into how our services can be customized to meet your specific needs.

Challenges

Like many customers, Cargotec’s data lake is distributed across multiple AWS accounts that are owned by different teams. Cargotec wanted to find a solution to share datasets across accounts and use Amazon Athena to query them. To share the datasets, they needed a way to share access to the data and access to catalog metadata in the form of tables and views. Cargotec’s use cases also required them to create views that span tables and views across catalogs. Cargotec’s implementation covers three discrete AWS accounts, 25 databases, 150 tables, and 10 views.

Solution overview

Cargotec required a single catalog per account that contained metadata from their other AWS accounts. The solution that best fit their needs was to replicate metadata using an in-house version of a publicly available utility called Metastore Migration utility. Cargotec extended the utility by changing the overall orchestration layer by adding an Amazon SQS notification and an AWS Lambda. The approach was to programmatically copy and make available each catalog entity (databases, tables, and views) to all consumer accounts. This makes the tables or views local to the account where the query is being run, while the data still remains in its source S3 bucket.

Cargotec’s solution architecture

The following diagram summarizes the architecture and overall flow of events in Cargotec’s design.

Solution Architecture

Catalog entries from a source account are programmatically replicated to multiple target accounts using the following series of steps.

  1. An AWS Glue job (metadata exporter) runs daily on the source account. It reads the table and partition information from the source AWS Glue Data Catalog. Since the target account is used for analytical purposes and does not require real-time schema changes, the metadata exporter runs only once a day. Cargotec uses partition projection, which ensures that the new partitions are available in real-time.
  2. The job then writes the metadata to an S3 bucket in the same account. Please note that the solution doesn’t involve movement of the data across accounts. The target accounts read data from the source account S3 buckets. For guidance on setting up the right permissions, please see the Amazon Athena User Guide.
  3. After the metadata export has been completed, the AWS Glue job pushes a notification to an Amazon Simple Notification Service (Amazon SNS) topic. This message contains the S3 path to the latest metadata export. The SNS notification is Cargotec’s customization to the existing open-source utility.
  4. Every target account runs an AWS Lambda function that is notified when the source account SNS topic receives a push. In short, there are multiple subscriber Lambda functions (one per target account) for the source account SNS topics that get triggered when an export job is completed.
  5. Once triggered, the Lambda function then initiates an AWS Glue job (metadata importer) on the respective target account. The job receives as input the source account’s S3 path to the metadata that has been recently exported.
  6. Based on the path provided, the metadata importer reads the exported metadata from the source S3 bucket.
  7. The metadata importer now uses this information to create or update the corresponding catalog information in the target account.

All along the way, any errors are published to a separate SNS topic for logging and monitoring purposes. With this approach, Cargotec was able to create and consume views that span tables and views from multiple catalogs spread across different AWS accounts.

Implementation

The core of the catalog replication utility is two AWS Glue scripts:

  • Metadata exporter – An AWS Glue job that reads the source data catalog and creates an export of the databases, tables, and partitions in an S3 bucket in the source account.
  • Metadata importer – An AWS Glue job that reads the export that was created by the metadata exporter and applies the metadata to target databases. This code is triggered by a Lambda function once files are written to S3. The job runs in the target account.

Metadata exporter

This section provides details on the AWS Glue job that exports the AWS Glue Data Catalog into an S3 location. The source code for the application is hosted the AWS Glue GitHub. Though this may need to be customized to suit your needs, we will go over the core components of the code in this blog.

Metadata exporter inputs

The application takes a few job input parameters as described below:

  • --mode key accepts either to-s3 or to-jdbc. The latter is used when the code is moving the metadata directly into a JDBC Hive Metastore. In the case of Cargotec, since we are moving the metadata to files on S3, the value for --mode will remain to-s3.
  • --output-path accepts an S3 location to which the exported metadata should be written. The code creates subdirectories corresponding to databases, tables, and partitions.
  • --database-names accepts a semicolon-separated list of databases on the source catalog that need to be replicated to the target

Reading the catalog

The metadata about the database, tables, and partitions are read from the AWS Glue catalog.

dyf = glue_context.create_dynamic_frame.from_options(
 connection_type=’com.amazonaws.services.glue.connections.DataCatalogConnection‘,
            connection_options = {
                            'catalog.name': ‘datacatalog’,
                            'catalog.database': database,
                            'catalog.region': region
                                 })

The above code snippet reads the metadata into an AWS Glue DynamicFrame. The frame is then converted to a Spark DataFrame. It is filtered into individual DataFrames based on it being either part of a database, table, or partition. A schema is attached to the data frame using one of the below:

DATACATALOG_DATABASE_SCHEMA = 
    StructType([
        StructField('items', ArrayType(
            DATACATALOG_DATABASE_ITEM_SCHEMA, False),
                    True),
        StructField('type', StringType(), False)
    ])
DATACATALOG_TABLE_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('type', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_TABLE_ITEM_SCHEMA, False), True)
    ])
DATACATALOG_PARTITION_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('table', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_PARTITION_ITEM_SCHEMA, False), True),
        StructField('type', StringType(), False)
    ])

For details on the individual item schema, refer to the schema definition on GitHub.

Persisting the metadata

After converting to a DataFrame with schema, it is persisted to the S3 location marked by the output-path parameter

databases.write.format('json').mode('overwrite').save(output_path + 'databases')
tables.write.format('json').mode('overwrite').save(output_path + 'tables')
partitions.write.format('json').mode('overwrite').save(output_path + 'partitions')

Exploring the output

Navigate to the S3 bucket that contains the output location, and you should be able to see the output metadata in format. An example export for a table would look like the following code snippet.

{
    "database": "default",
    "type": "table",
    "item": {
        "createTime": "1651241372000",
        "lastAccessTime": "0",
        "owner": "spark",
        "retention": 0,
        "name": "an_example_table",
        "tableType": "EXTERNAL_TABLE",
        "parameters": {
            "totalSize": "2734148",
            "EXTERNAL": "TRUE",
            "last_commit_time_sync": "20220429140907",
            "spark.sql.sources.schema.part.0": "{redacted_schema}",
            "numFiles": "1",
            "transient_lastDdlTime": "1651241371",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "hudi"
        },
        "partitionKeys": [],
        "storageDescriptor": {
            "inputFormat": "org.apache.hudi.hadoop.HoodieParquetInputFormat",
            "compressed": false,
            "storedAsSubDirectories": false,
            "location": "s3://redacted_bucket_name/table/an_example_table",
            "numberOfBuckets": -1,
            "outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "bucketColumns": [],
            "columns": [{
                    "name": "_hoodie_commit_time",
                    "type": "string"
                },
                {
                    "name": "_hoodie_commit_seqno",
                    "type": "string"
                }
            ],
            "parameters": {},
            "serdeInfo": {
                "serializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                "parameters": {
                    "hoodie.query.as.ro.table": "false",
                    "path": "s3://redacted_bucket_name/table/an_example_table",
                    "serialization.format": "1"
                }
            },
            "skewedInfo": {
                "skewedColumnNames": [],
                "skewedColumnValueLocationMaps": {},
                "skewedColumnValues": []
            },
            "sortColumns": []
        }
    }
}

Once the export job is complete, the output S3 path will be pushed to an SNS topic. A Lambda function at the target account processes this message and invokes the import AWS Glue job by passing the S3 import location.

Metadata importer

The import job runs on the target account. The code for the job is available on GitHub. As with the exporter, you may need to customize it to suit your specific requirements, but the code as-is should work for most scenarios.

Metadata importer inputs

The inputs to the application are provided as job parameters. Below is a list of parameters that are used for the import process:

  • --mode key accepts either from-s3 or from-jdbc. The latter is used when migration is from a JDBC source to the AWS Glue Data Catalog. At Cargotec, the metadata is already written to Amazon S3, and hence the value for this key is always set to from-s3.
  • --region key accepts a valid AWS Region for the AWS Glue Catalog. The target Region is specified using this key.
  • --database-input-path key accepts the path to the file containing the database metadata. This is the output of the previous import job.
  • --table-input-path key accepts the path to the file containing the table metadata. This is the output of the previous import job.
  • --partition-input-path key accepts the path to the file containing the partition metadata. This is the output of the previous import job.

Reading the metadata

The metadata, as previously discussed, are files on Amazon S3. They are read into individual spark data frames with their respective schema information

databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA)
tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA)
partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA)

Loading the catalog

Once the spark data frames are read, they are converted to AWS Glue DynamicFrame and then loaded to the catalog, as shown in the following snippet.

glue_context.write_dynamic_frame.from_options(
        frame=dyf_databases, 
        connection_type='catalog',
        connection_options={
               'catalog.name': datacatalog_name, 
               'catalog.region': region
         }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_tables, 
        connection_type='catalog',
        connection_options={
                'catalog.name': datacatalog_name, 
                'catalog.region': region
        }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_partitions, 
        connection_type='catalog',
        connection_options={
                 'catalog.name': datacatalog_name, 
                 'catalog.region': region
         }
)

Once the job concludes, you can query the target AWS Glue catalog to ensure the tables from the source have been synced with the destination. To keep things simple and easy to manage, instead of implementing a mechanism to identify tables that change over time, Cargotec updates the catalog information of all databases or tables that are configured in the export job.

Considerations

Though the setup works effectively for Cargotec’s current business requirements, there are a few drawbacks to this approach, which are highlighted below:

  1. The solution involves code. Customizations were made to the existing open-source utility to be able to publish an SNS notification once an export is complete and a Lambda function to trigger the import process.
  2. The export process on the source account is a scheduled job. Hence there is no real-time sync between the source and target accounts. This was not a requirement for Cargotec’s business process.
  3. For tables that don’t use Athena partition projection, query results may be outdated until the new partitions are added to the metastore through MSCK REPAIR TABLE, ALTER TABLE ADD PARTITION, AWS Glue crawler, and so on.
  4. The current approach requires syncing all the tables across the source and target. If the requirement is to capture only the ones that changed instead of a scheduled daily export, the design needs to change and could benefit from the Amazon EventBridge integration with AWS Glue. An example implementation of using AWS Glue APIs to identify changes is shown in Identify source schema changes using AWS Glue.

Conclusion

In this blog post, we have explored a solution for cross-account sharing of data and tables that makes it possible for Cargotec to create views that combine data from multiple AWS accounts. We’re excited to share Cargotec’s success and believe the post has provided you with valuable insights and inspiration for your own projects.

We encourage you to explore our range of services and see how they can help you achieve your goals. Lastly, for more data and analytics blogs, feel free to bookmark the AWS Blogs.


About the Authors

Sumesh M R is a Full Stack Machine Learning Architect at Cargotec. He has several years of software engineering and ML background. Sumesh is an expert in Sagemaker and other AWS ML/Analytics services. He is passionate about data science and loves to explore the latest ML libraries and techniques. Before joining Cargotec, he worked as a Solution Architect at TCS. In his spare time, he loves to play cricket and badminton.

 Tero Karttunen is a Senior Cloud Architect at Knowit Finland. He advises clients on architecting and adopting Data Architectures that best serve their Data Analytics and Machine Learning needs. He has helped Cargotec in their data journey for more than two years. Outside of work, he enjoys running, winter sports, and role-playing games.

Arun A K is a Big Data Specialist Solutions Architect at AWS.  He works with customers to provide architectural guidance for running analytics solutions on AWS Glue, AWS Lake Formation, Amazon Athena, and Amazon EMR. In his free time, he likes to spend time with his friends and family.

Best practices to optimize your Amazon EC2 Spot Instances usage

Post Syndicated from Sheila Busser original https://aws.amazon.com/blogs/compute/best-practices-to-optimize-your-amazon-ec2-spot-instances-usage/

This blog post is written by Pranaya Anshu, EC2 PMM, and Sid Ambatipudi, EC2 Compute GTM Specialist.

Amazon EC2 Spot Instances are a powerful tool that thousands of customers use to optimize their compute costs. The National Football League (NFL) is an example of customer using Spot Instances, leveraging 4000 EC2 Spot Instances across more than 20 instance types to build its season schedule. By using Spot Instances, it saves 2 million dollars every season! Virtually any organization – small or big – can benefit from using Spot Instances by following best practices.

Overview of Spot Instances

Spot Instances let you take advantage of unused EC2 capacity in the AWS cloud and are available at up to a 90% discount compared to On-Demand prices. Through Spot Instances, you can take advantage of the massive operating scale of AWS and run hyperscale workloads at a significant cost saving. In exchange for these discounts, AWS has the option to reclaim Spot Instances when EC2 requires the capacity. AWS provides a two-minute notification before reclaiming Spot Instances, allowing workloads running on those instances to be gracefully shut down.

In this blog post, we explore four best practices that can help you optimize your Spot Instances usage and minimize the impact of Spot Instances interruptions: diversifying your instances, considering attribute-based instance type selection, leveraging Spot placement scores, and using the price-capacity-optimized allocation strategy. By applying these best practices, you’ll be able to leverage Spot Instances for appropriate workloads and ultimately reduce your compute costs. Note for the purposes of this blog, we will focus on the integration of Spot Instances with Amazon EC2 Auto Scaling groups.

Pre-requisites

Spot Instances can be used for various stateless, fault-tolerant, or flexible applications such as big data, containerized workloads, CI/CD, web servers, high-performance computing (HPC), and AI/ML workloads. However, as previously mentioned, AWS can interrupt Spot Instances with a two-minute notification, so it is best not to use Spot Instances for workloads that cannot handle individual instance interruption — that is, workloads that are inflexible, stateful, fault-intolerant, or tightly coupled.

Best practices

  1. Diversify your instances

The fundamental best practice when using Spot Instances is to be flexible. A Spot capacity pool is a set of unused EC2 instances of the same instance type (for example, m6i.large) within the same AWS Region and Availability Zone (for example, us-east-1a). When you request Spot Instances, you are requesting instances from a specific Spot capacity pool. Since Spot Instances are spare EC2 capacity, you want to base your selection (request) on as many spare pools of capacity as possible in order to increase your likelihood of getting Spot Instances. You should diversify across instance sizes, generations, instance types, and Availability Zones to maximize your savings with Spot Instances. For example, if you are currently using c5a.large in us-east-1a, consider including c6a instances (newer generation of instances), c5a.xl (larger size), or us-east-1b (different Availability Zone) to increase your overall flexibility. Instance diversification is beneficial not only for selecting Spot Instances, but also for scaling, resilience, and cost optimization.

To get hands-on experience with Spot Instances and to practice instance diversification, check out Amazon EC2 Spot Instances workshops. And once you’ve diversified your instances, you can leverage AWS Fault Injection Simulator (AWS FIS) to test your applications’ resilience to Spot Instance interruptions to ensure that they can maintain target capacity while still benefiting from the cost savings offered by Spot Instances. To learn more about stress testing your applications, check out the Back to Basics: Chaos Engineering with AWS Fault Injection Simulator video and AWS FIS documentation.

  1. Consider attribute-based instance type selection

We have established that flexibility is key when it comes to getting the most out of Spot Instances. Similarly, we have said that in order to access your desired Spot Instances capacity, you should select multiple instance types. While building and maintaining instance type configurations in a flexible way may seem daunting or time-consuming, it doesn’t have to be if you use attribute-based instance type selection. With attribute-based instance type selection, you can specify instance attributes — for example, CPU, memory, and storage — and EC2 Auto Scaling will automatically identify and launch instances that meet your defined attributes. This removes the manual-lift of configuring and updating instance types. Moreover, this selection method enables you to automatically use newly released instance types as they become available so that you can continuously have access to an increasingly broad range of Spot Instance capacity. Attribute-based instance type selection is ideal for workloads and frameworks that are instance agnostic, such as HPC and big data workloads, and can help to reduce the work involved with selecting specific instance types to meet specific requirements.

For more information on how to configure attribute-based instance selection for your EC2 Auto Scaling group, refer to Create an Auto Scaling Group Using Attribute-Based Instance Type Selection documentation. To learn more about attribute-based instance type selection, read the Attribute-Based Instance Type Selection for EC2 Auto Scaling and EC2 Fleet news blog or check out the Using Attribute-Based Instance Type Selection and Mixed Instance Groups section of the Launching Spot Instances workshop.

  1. Leverage Spot placement scores

Now that we’ve stressed the importance of flexibility when it comes to Spot Instances and covered the best way to select instances, let’s dive into how to find preferred times and locations to launch Spot Instances. Because Spot Instances are unused EC2 capacity, Spot Instances capacity fluctuates. Correspondingly, it is possible that you won’t always get the exact capacity at a specific time that you need through Spot Instances. Spot placement scores are a feature of Spot Instances that indicates how likely it is that you will be able to get the Spot capacity that you require in a specific Region or Availability Zone. Your Spot placement score can help you reduce Spot Instance interruptions, acquire greater capacity, and identify optimal configurations to run workloads on Spot Instances. However, it is important to note that Spot placement scores serve only as point-in-time recommendations (scores can vary depending on current capacity) and do not provide any guarantees in terms of available capacity or risk of interruption.  To learn more about how Spot placement scores work and to get started with them, see the Identifying Optimal Locations for Flexible Workloads With Spot Placement Score blog and Spot placement scores documentation.

As a near real-time tool, Spot placement scores are often integrated into deployment automation. However, because of its logging and graphic capabilities, you may find it to be a valuable resource even before you launch a workload in the cloud. If you are looking to understand historical Spot placement scores for your workload, you should check out the Spot placement score tracker, a tool that automates the capture of Spot placement scores and stores Spot placement score metrics in Amazon CloudWatch. The tracker is available through AWS Labs, a GitHub repository hosting tools. Learn more about the tracker through the Optimizing Amazon EC2 Spot Instances with Spot Placement Scores blog.

When considering ideal times to launch Spot Instances and exploring different options via Spot placement scores, be sure to consider running Spot Instances at off-peak hours – or hours when there is less demand for EC2 Instances. As you may assume, there is less unused capacity – Spot Instances – available during typical business hours than after business hours. So, in order to leverage as much Spot capacity as you can, explore the possibility of running your workload at hours when there is reduced demand for EC2 instances and thus greater availability of Spot Instances. Similarly, consider running your Spot Instances in “off-peak Regions” – or Regions that are not experiencing business hours at that certain time.

On a related note, to maximize your usage of Spot Instances, you should consider using previous generation of instances if they meet your workload needs. This is because, as with off-peak vs peak hours, there is typically greater capacity available for previous generation instances than current generation instances, as most people tend to use current generation instances for their compute needs.

  1. Use the price-capacity-optimized allocation strategy

Once you’ve selected a diversified and flexible set of instances, you should select your allocation strategy. When launching instances, your Auto Scaling group uses the allocation strategy that you specify to pick the specific Spot pools from all your possible pools. Spot offers four allocation strategies: price-capacity-optimized, capacity-optimized, capacity-optimized-prioritized, and lowest-price. Each of these allocation strategies select Spot Instances in pools based on price, capacity, a prioritized list of instances, or a combination of these factors.

The price-capacity-optimized strategy launched in November 2022. This strategy makes Spot Instance allocation decisions based on the most capacity at the lowest price. It essentially enables Auto Scaling groups to identify the Spot pools with the highest capacity availability for the number of instances that are launching. In other words, if you select this allocation strategy, we will find the Spot capacity pools that we believe have the lowest chance of interruption in the near term. Your Auto Scaling groups then request Spot Instances from the lowest priced of these pools.

We recommend you leverage the price-capacity-optimized allocation strategy for the majority of your workloads that run on Spot Instances. To see how the price-capacity-optimized allocation strategy selects Spot Instances in comparison with lowest-price and capacity-optimized allocation strategies, read the Introducing the Price-Capacity-Optimized Allocation Strategy for EC2 Spot Instances blog post.

Clean-up

If you’ve explored the different Spot Instances workshops we recommended throughout this blog post and spun up resources, please remember to delete resources that you are no longer using to avoid incurring future costs.

Conclusion

Spot Instances can be leveraged to reduce costs across a wide-variety of use cases, including containers, big data, machine learning, HPC, and CI/CD workloads. In this blog, we discussed four Spot Instances best practices that can help you optimize your Spot Instance usage to maximize savings: diversifying your instances, considering attribute-based instance type selection, leveraging Spot placement scores, and using the price-capacity-optimized allocation strategy.

To learn more about Spot Instances, check out Spot Instances getting started resources. Or to learn of other ways of reducing costs and improving performance, including leveraging other flexible purchase models such as AWS Savings Plans, read the Increase Your Application Performance at Lower Costs eBook or watch the Seven Steps to Lower Costs While Improving Application Performance webinar.

Ten new visual transforms in AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/ten-new-visual-transforms-in-aws-glue-studio/

AWS Glue Studio is a graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. It allows you to visually compose data transformation workflows using nodes that represent different data handling steps, which later are converted automatically into code to run.

AWS Glue Studio recently released 10 more visual transforms to allow creating more advanced jobs in a visual way without coding skills. In this post, we discuss potential uses cases that reflect common ETL needs.

The new transforms that will be demonstrated in this post are: Concatenate, Split String, Array To Columns, Add Current Timestamp, Pivot Rows To Columns, Unpivot Columns To Rows, Lookup, Explode Array Or Map Into Columns, Derived Column, and Autobalance Processing.

Solution overview

In this use case, we have some JSON files with stock option operations. We want to make some transformations before storing the data to make it easier to analyze, and we also want to produce a separate dataset summary.

In this dataset, each row represents a trade of option contracts. Options are financial instruments that provide the right—but not the obligation—to buy or sell stock shares at a fixed price (called  strike price) before a defined expiration date.

Input data

The data follows the following schema:

  • order_id – A unique ID
  • symbol – A code generally based on a few letters to identify the corporation that emits the underlying stock shares
  • instrument – The name that identifies the specific option being bought or sold
  • currency – The ISO currency code in which the price is expressed
  • price – The amount that was paid for the purchase of each option contract (on most exchanges, one contract allows you to buy or sell 100 stock shares)
  • exchange – The code of the exchange center or venue where the option was traded
  • sold – A list of the number of contracts that where allocated to fill the sell order when this is a sell trade
  • bought – A list of the number of contracts that where allocated to fill the buy order when this is buy trade

The following is a sample of the synthetic data generated for this post:

{"order_id": 1679931512485, "symbol": "AMZN", "instrument": "AMZN MAR 24 23 102 PUT", "currency": "usd", "price": 17.18, "exchange": "EDGX", "bought": [18, 38]}
{"order_id": 1679931512486, "symbol": "BMW.DE", "instrument": "BMW.DE MAR 24 23 96 PUT", "currency": "eur", "price": 2.98, "exchange": "XETR", "bought": [28]}
{"order_id": 1679931512487, "symbol": "BMW.DE", "instrument": "BMW.DE APR 28 23 101 CALL", "currency": "eur", "price": 14.71, "exchange": "XETR", "sold": [9, 59, 54]}
{"order_id": 1679931512489, "symbol": "JPM", "instrument": "JPM JUN 30 23 140 CALL", "currency": "usd", "price": 11.83, "exchange": "EDGX", "bought": [33, 42, 55, 67]}
{"order_id": 1679931512490, "symbol": "SIE.DE", "instrument": "SIE.DE MAR 24 23 149 CALL", "currency": "eur", "price": 13.68, "exchange": "XETR", "bought": [96, 89, 82]}
{"order_id": 1679931512491, "symbol": "NKE", "instrument": "NKE MAR 24 23 112 CALL", "currency": "usd", "price": 3.23, "exchange": "EDGX", "sold": [67]}
{"order_id": 1679931512492, "symbol": "AMZN", "instrument": "AMZN MAY 26 23 95 CALL", "currency": "usd", "price": 11.44, "exchange": "EDGX", "sold": [41, 62, 12]}
{"order_id": 1679931512493, "symbol": "JPM", "instrument": "JPM MAR 24 23 121 PUT", "currency": "usd", "price": 1.0, "exchange": "EDGX", "bought": [61, 34]}
{"order_id": 1679931512494, "symbol": "SAP.DE", "instrument": "SAP.DE MAR 24 23 132 CALL", "currency": "eur", "price": 15.9, "exchange": "XETR", "bought": [69, 33]}

ETL requirements

This data has a number of unique characteristics, as often found on older systems, that make the data harder to use.

The following are the ETL requirements:

  • The instrument name has valuable information that is intended for humans to understand; we want to normalize it into separate columns for easier analysis.
  • The attributes bought and sold are mutually exclusive; we can consolidate them into a single column with the contract numbers and have another column indicating if the contracts where bought or sold in this order.
  • We want to keep the information about the individual contract allocations but as individual rows instead of forcing users to deal with an array of numbers. We could add up the numbers, but we would lose information about how the order was filled (indicating market liquidity). Instead, we choose to denormalize the table so each row has a single number of contracts, splitting orders with multiple numbers into separate rows. In a compressed columnar format, the extra dataset size of this repetition is often small when compression is applied, so it’s acceptable to make the dataset easier to query.
  • We want to generate a summary table of volume for each option type (call and put) for each stock. This provides an indication of the market sentiment for each stock and the market in general (greed vs. fear).
  • To enable overall trade summaries, we want to provide for each operation the grand total and standardize the currency to US dollars, using an approximate conversion reference.
  • We want to add the date when these transformations took place. This could be useful, for instance, to have a reference on when was the currency conversion made.

Based on those requirements, the job will produce two outputs:

  • A CSV file with a summary of the number of contracts for each symbol and type
  • A catalog table to keep a history of the order, after doing the transformations indicated
    Data schema

Prerequisites

You will need your own S3 bucket to follow along with this use case. To create a new bucket, refer to Creating a bucket.

Generate synthetic data

To follow along with this post (or experiment with this kind of data on your own), you can generate this dataset synthetically. The following Python script can be run on a Python environment with Boto3 installed and access to Amazon Simple Storage Service (Amazon S3).

To generate the data, complete the following steps:

  1. On AWS Glue Studio, create a new job with the option Python shell script editor.
  2. Give the job a name and on the Job details tab, select a suitable role and a name for the Python script.
  3. In the Job details section, expand Advanced properties and scroll down to Job parameters.
  4. Enter a parameter named --bucket and assign as the value the name of the bucket you want to use to store the sample data.
  5. Enter the following script into the AWS Glue shell editor:
    import argparse
    import boto3
    from datetime import datetime
    import io
    import json
    import random
    import sys
    
    # Configuration
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket')
    args, ignore = parser.parse_known_args()
    if not args.bucket:
        raise Exception("This script requires an argument --bucket with the value specifying the S3 bucket where to store the files generated")
    
    data_bucket = args.bucket
    data_path = "transformsblog/inputdata"
    samples_per_file = 1000
    
    # Create a single file with synthetic data samples
    s3 = boto3.client('s3')
    buff = io.BytesIO()
    
    sample_stocks = [("AMZN", 95, "usd"), ("NKE", 120, "usd"), ("JPM", 130, "usd"), ("KO", 130, "usd"),
                     ("BMW.DE", 95, "eur"), ("SIE.DE", 140, "eur"), ("SAP.DE", 115, "eur")]
    option_type = ["PUT", "CALL"]
    operations = ["sold", "bought"]
    dates = ["MAR 24 23", "APR 28 23", "MAY 26 23", "JUN 30 23"]
    for i in range(samples_per_file):
        stock = random.choice(sample_stocks)
        symbol = stock[0]
        ref_price = stock[1]
        currency = stock[2]
        strike_price = round(ref_price * 0.9 + ref_price * random.uniform(0.01, 0.3))
        sample = {
            "order_id": int(datetime.now().timestamp() * 1000) + i,
            "symbol": stock[0],
            "instrument":f"{symbol} {random.choice(dates)} {strike_price} {random.choice(option_type)}",
            "currency": currency,
            "price": round(random.uniform(0.5, 20.1), 2),
            "exchange": "EDGX" if currency == "usd" else "XETR"
         }
        sample[random.choice(operations)] = [random.randrange(1,100) for i in range(random.randrange(1,5))]
        buff.write(json.dumps(sample).encode())
        buff.write("\n".encode())
    
    s3.put_object(Body=buff.getvalue(), Bucket=data_bucket, Key=f"{data_path}/{int(datetime.now().timestamp())}.json")

  6. Run the job and wait until it shows as successfully completed on the Runs tab (it should take just a few seconds).

Each run will generate a JSON file with 1,000 rows under the bucket specified and prefix transformsblog/inputdata/. You can run the job multiple times if you want to test with more input files.
Each line in the synthetic data is a data row representing a JSON object like the following:

{
 "order_id":1681986991888,
 "symbol":"AMZN",
 "instrument":"AMZN APR 28 23 100 PUT",
 "currency":"usd",
 "price":2.89,
 "exchange":"EDGX",
 "sold":[88,49]
}

Create the AWS Glue visual job

To create the AWS Glue visual job, complete the following steps:

  1. Go to AWS Glue Studio and create a job using the option Visual with a blank canvas.
  2. Edit Untitled job to give it a name and assign a role suitable for AWS Glue on the Job details tab.
  3. Add an S3 data source (you can name it JSON files source) and enter the S3 URL under which the files are stored (for example, s3://<your bucket name>/transformsblog/inputdata/), then select JSON as the data format.
  4. Select Infer schema so it sets the output schema based on the data.

From this source node, you’ll keep chaining transforms. When adding each transform, make sure the selected node is the last one added so it gets assigned as the parent, unless indicated otherwise in the instructions.

If you didn’t select the right parent, you can always edit the parent by selecting it and choosing another parent in the configuration pane.

Node parent configuration

For each node added, you’ll give it a specific name (so the node purpose shows in the graph) and configuration on the Transform tab.

Every time a transform changes the schema (for instance, add a new column), the output schema needs to be updated so it’s visible to the downstream transforms. You can manually edit the output schema, but it’s more practical and safer to do it using the data preview.
Additionally, that way you can verify the transformation are working so far as expected. To do so, open the Data preview tab with the transform selected and start a preview session. After you have verified the transformed data looks as expected, go to the Output schema tab and choose Use data preview schema to update the schema automatically.

As you add new kinds of transforms, the preview might show a message about a missing dependency. When this happens, choose End Session and the start a new one, so the preview picks up the new kind of node.

Extract instrument information

Let’s start by dealing with the information on the instrument name to normalize it into columns that are easier to access in the resulting output table.

  1. Add a Split String node and name it Split instrument, which will tokenize the instrument column using a whitespace regex: \s+ (a single space would do in this case, but this way is more flexible and visually clearer).
  2. We want to keep the original instrument information as is, so enter a new column name for the split array: instrument_arr.
    Split config
  3. Add an Array To Columns node and name it Instrument columns to convert the array column just created into new fields, except for symbol, for which we already have a column.
  4. Select the column instrument_arr, skip the first token and tell it to extract the output columns month, day, year, strike_price, type using indexes 2, 3, 4, 5, 6 (the spaces after the commas are for readability, they don’t impact the configuration).
    Array config

The year extracted is expressed with two digits only; let’s put a stopgap to assume it’s in this century if they just use two digits.

  1. Add a Derived Column node and name it Four digits year.
  2. Enter year as the derived column so it overrides it, and enter the following SQL expression:
    CASE WHEN length(year) = 2 THEN ('20' || year) ELSE year END
    Year derived column config

For convenience, we build an expiration_date field that a user can have as reference of the last date the option can be exercised.

  1. Add a Concatenate Columns node and name it Build expiration date.
  2. Name the new column expiration_date, select the columns year, month, and day (in that order), and a hyphen as spacer.
    Concatenated date config

The diagram so far should look like the following example.

DAG

The data preview of the new columns so far should look like the following screenshot.

Data preview

Normalize the number of contracts

Each of the rows in the data indicates the number of contracts of each option that were bought or sold and the batches on which the orders were filled. Without losing the information about the individual batches, we want to have each amount on an individual row with a single amount value, while the rest of the information is replicated in each row produced.

First, let’s merge the amounts into a single column.

  1. Add an Unpivot Columns Into Rows node and name it Unpivot actions.
  2. Choose the columns bought and sold to unpivot and store the names and values in columns named action and contracts, respectively.
    Unpivot config
    Notice in the preview that the new column contracts is still an array of numbers after this transformation.
  1. Add an Explode Array Or Map into Rows row named Explode contracts.
  2. Choose the contracts column and enter contracts as the new column to override it (we don’t need to keep the original array).

The preview now shows that each row has a single contracts amount, and the rest of the fields are the same.

This also means that order_id is no longer a unique key. For your own use cases, you need to decide how to model your data and if you want to denormalize or not.
Explode config

The following screenshot is an example of what the new columns look like after the transformations so far.
Data preview

Create a summary table

Now you create a summary table with the number of contracts traded for each type and each stock symbol.

Let’s assume for illustration purposes that the files processed belong to a single day, so this summary gives the business users information about what the market interest and sentiment are that day.

  1. Add a Select Fields node and select the following columns to keep for the summary: symbol, type, and contracts.
    Selected fields
  2. Add a Pivot Rows Into Columns node and name it Pivot summary.
  3. Aggregate on the contracts column using sum and choose to convert the type column.
    Pivot config

Normally, you would store it on some external database or file for reference; in this example, we save it as a CSV file on Amazon S3.

  1. Add an Autobalance Processing node and name it Single output file.
  2. Although that transform type is normally used to optimize the parallelism, here we use it to reduce the output to a single file. Therefore, enter 1 in the number of partitions configuration.
    Autobalance config
  3. Add an S3 target and name it CSV Contract summary.
  4. Choose CSV as the data format and enter an S3 path where the job role is allowed to store files.

The last part of the job should now look like the following example.
DAG

  1. Save and run the job. Use the Runs tab to check when it has finished successfully.
    You’ll find a file under that path that is a CSV, despite not having that extension. You’ll probably need to add the extension after downloading it to open it.
    On a tool that can read the CSV, the summary should look something like the following example.
    Spreadsheet

Clean up temporary columns

In preparation for saving the orders into a historical table for future analysis, let’s clean up some temporary columns created along the way.

  1. Add a Drop Fields node with the Explode contracts node selected as its parent (we are branching the data pipeline to generate a separate output).
  2. Select the fields to be dropped: instrument_arr, month, day, and year.
    The rest we want to keep so they are saved in the historical table we’ll create later.
    Drop fields

Currency standardization

This synthetic data contains fictional operations on two currencies, but in a real system you could get currencies from markets all over the world. It’s useful to standardize the currencies handled into a single reference currency so they can be easily be compared and aggregated for reporting and analysis.

We use Amazon Athena to simulate a table with approximate currency conversions that gets updated periodically (here we assume we process the orders timely enough that the conversion is a reasonable representative for comparison purposes).

  1. Open the Athena console in the same Region where you’re using AWS Glue.
  2. Run the following query to create the table by setting an S3 location where both your Athena and AWS Glue roles can read and write. Also, you might want to store the table in a different database than default (if you do that, update the table qualified name accordingly in the examples provided).
    CREATE EXTERNAL TABLE default.exchange_rates(currency string, exchange_rate double)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION 's3://<enter some bucket>/exchange_rates/';

  3. Enter a few sample conversions into the table:
    INSERT INTO default.exchange_rates VALUES ('usd', 1.0), ('eur', 1.09), ('gbp', 1.24);
  4. You should now be able to view the table with the following query:
    SELECT * FROM default.exchange_rates
  5. Back on the AWS Glue visual job, add a Lookup node (as a child of Drop Fields) and name it Exchange rate.
  6. Enter the qualitied name of the table you just created, using currency as the key and select the exchange_rate field to use.
    Because the field is named the same in both the data and the lookup table, we can just enter the name currency and don’t need to define a mapping.Lookup config
    At the time of this writing, the Lookup transform is not supported in the data preview and it will show an error that the table doesn’t exist. This is only for the data preview and doesn’t prevent the job from running correctly. The few remaining steps of the post don’t require you to update the schema. If you need to run a data preview on other nodes, you can remove the lookup node temporarily and then put it back.
  7. Add a Derived Column node and name it Total in usd.
  8. Name the derived column total_usd and use the following SQL expression:
    round(contracts * price * exchange_rate, 2)
    Currency conversion config
  9. Add a Add Current Timestamp node and name the column ingest_date.
  10. Use the format %Y-%m-%d for your timestamp (for demonstration purposes, we are just using the date; you can make it more precise if you want to).
    Timestamp config

Save the historical orders table

To save the historical orders table, complete the following steps:

  1. Add an S3 target node and name it Orders table.
  2. Configure Parquet format with snappy compression, and provide an S3 target path under which to store the results (separate from the summary).
  3. Select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  4. Enter a target database and a name for the new table, for instance: option_orders.
    Table sink config

The last part of the diagram should now look similar to the following, with two branches for the two separate outputs.
DAG

After you run the job successfully, you can use a tool like Athena to review the data the job has produced by querying the new table. You can find the table on the Athena list and choose Preview table or just run a SELECT query (updating the table name to the name and catalog you used):

SELECT * FROM default.option_orders limit 10

Your table content should look similar to the following screenshot.
Table content

Clean up

If you don’t want to keep this example, delete the two jobs you created, the two tables in Athena, and the S3 paths where the input and output files were stored.

Conclusion

In this post, we showed how the new transforms in AWS Glue Studio can help you do more advanced transformation with minimum configuration. This means you can implement more ETL uses cases without having to write and maintain any code. The new transforms are already available on AWS Glue Studio, so you can use the new transforms today in your visual jobs.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

How Encored Technologies built serverless event-driven data pipelines with AWS

Post Syndicated from Younggu Yun original https://aws.amazon.com/blogs/big-data/how-encored-technologies-built-serverless-event-driven-data-pipelines-with-aws/

This post is a guest post co-written with SeonJeong Lee, JaeRyun Yim, and HyeonSeok Yang from Encored Technologies.

Encored Technologies (Encored) is an energy IT company in Korea that helps their customers generate higher revenue and reduce operational costs in renewable energy industries by providing various AI-based solutions. Encored develops machine learning (ML) applications predicting and optimizing various energy-related processes, and their key initiative is to predict the amount of power generated at renewable energy power plants.

In this post, we share how Encored runs data engineering pipelines for containerized ML applications on AWS and how they use AWS Lambda to achieve performance improvement, cost reduction, and operational efficiency. We also demonstrate how to use AWS services to ingest and process GRIB (GRIdded Binary) format data, which is a file format commonly used in meteorology to store and exchange weather and climate data in a compressed binary form. It allows for efficient data storage and transmission, as well as easy manipulation of the data using specialized software.

Business and technical challenge

Encored is expanding their business into multiple countries to provide power trading services for end customers. The amount of data and the number of power plants they need to collect data are rapidly increasing over time. For example, the volume of data required for training one of the ML models is more than 200 TB. To meet the growing requirements of the business, the data science and platform team needed to speed up the process of delivering model outputs. As a solution, Encored aimed to migrate existing data and run ML applications in the AWS Cloud environment to efficiently process a scalable and robust end-to-end data and ML pipeline.

Solution overview

The primary objective of the solution is to develop an optimized data ingestion pipeline that addresses the scaling challenges related to data ingestion. During its previous deployment in an on-premises environment, the time taken to process data from ingestion to preparing the training dataset exceeded the required service level agreement (SLA). One of the input datasets required for ML models is weather data supplied by the Korea Meteorological Administration (KMA). In order to use the GRIB datasets for the ML models, Encored needed to prepare the raw data to make it suitable for building and training ML models. The first step was to convert GRIB to the Parquet file format.

Encored used Lambda to run an existing data ingestion pipeline built in a Linux-based container image. Lambda is a compute service that lets you run code without provisioning or managing servers. Lambda runs your code on a high-availability compute infrastructure and performs all of the administration of the compute resources, including server and operating system maintenance, capacity provisioning and automatic scaling, and logging. AWS Lambda is triggered to ingest and process GRIB data files when they are uploaded to Amazon Simple Storage Service (Amazon S3). Once the files are processed, they are stored in Parquet format in the other S3 bucket. Encored receives GRIB files throughout the day, and whenever new files arrive, an AWS Lambda function runs a container image registered in Amazon Elastic Container Registry (ECR). This event-based pipeline triggers a customized data pipeline that is packaged in a container-based solution. Leveraging Amazon AWS Lambda, this solution is cost-effective, scalable, and high-performing.Encored uses Python as their preferred language.

The following diagram illustrates the solution architecture.

Lambda-data-pipeline

For data-intensive tasks such as extract, transform, and load (ETL) jobs and ML inference, Lambda is an ideal solution because it offers several key benefits, including rapid scaling to meet demand, automatic scaling to zero when not in use, and S3 event triggers that can initiate actions in response to object-created events. All this contributes to building a scalable and cost-effective data event-driven pipeline. In addition to these benefits, Lambda allows you to configure ephemeral storage (/tmp) between 512–10,240 MB. Encored used this storage for their data application when reading or writing data, enabling them to optimize performance and cost-effectiveness. Furthermore, Lambda’s pay-per-use pricing model means that users only pay for the compute time in use, making it a cost-effective solution for a wide range of use cases.

Prerequisites

For this walkthrough, you should have the following:

Build your application required for your Docker image

The first step is to develop an application that can ingest and process files. This application reads the bucket name and object key passed from a trigger added to Lambda function. The processing logic involves three parts: downloading the file from Amazon S3 into ephemeral storage (/tmp), parsing the GRIB formatted data, and saving the parsed data to Parquet format.

The customer has a Python script (for example, app.py) that performs these tasks as follows:

import os
import tempfile
import boto3
import numpy as np
import pandas as pd
import pygrib

s3_client = boto3.client('s3')
def handler(event, context):
    try:
        # Get trigger file name
        bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
        s3_file_name = event["Records"][0]["s3"]["object"]["key"]

        # Handle temp files: all temp objects are deleted when the with-clause is closed
        with tempfile.NamedTemporaryFile(delete=True) as tmp_file:
            # Step1> Download file from s3 into temp area
            s3_file_basename = os.path.basename(s3_file_name)
            s3_file_dirname = os.path.dirname(s3_file_name)
            local_filename = tmp_file.name
            s3_client.download_file(
                Bucket=bucket_name,
                Key=f"{s3_file_dirname}/{s3_file_basename}",
                Filename=local_filename
            )

            # Step2> Parse – GRIB2 
            grbs = pygrib.open(local_filename)
            list_of_name = []
            list_of_values = []
            for grb in grbs:
                list_of_name.append(grb.name)
                list_of_values.append(grb.values)
            _, lat, lon = grb.data()
            list_of_name += ["lat", "lon"]
            list_of_values += [lat, lon]
            grbs.close()

            dat = pd.DataFrame(
                np.transpose(np.stack(list_of_values).reshape(len(list_of_values), -1)),
                columns=list_of_name,
            )

        # Step3> To Parquet
        s3_dest_uri = S3path
        dat.to_parquet(s3_dest_uri, compression="snappy")

    except Exception as err:
        print(err)

Prepare a Docker file

The second step is to create a Docker image using an AWS base image. To achieve this, you can create a new Dockerfile using a text editor on your local machine. This Dockerfile should contain two environment variables:

  • LAMBDA_TASK_ROOT=/var/task
  • LAMBDA_RUNTIME_DIR=/var/runtime

It’s important to install any dependencies under the ${LAMBDA_TASK_ROOT} directory alongside the function handler to ensure that the Lambda runtime can locate them when the function is invoked. Refer to the available Lambda base images for custom runtime for more information.

FROM public.ecr.aws/lambda/python:3.8

# Install the function's dependencies using file requirements.txt
# from your project folder.

COPY requirements.txt  .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

Build a Docker image

The third step is to build your Docker image using the docker build command. When running this command, make sure to enter a name for the image. For example:

docker build -t process-grib .

In this example, the name of the image is process-grib. You can choose any name you like for your Docker image.

Upload the image to the Amazon ECR repository

Your container image needs to reside in an Amazon Elastic Container Registry (Amazon ECR) repository. Amazon ECR is a fully managed container registry offering high-performance hosting, so you can reliably deploy application images and artifacts anywhere. For instructions on creating an ECR repository, refer to Creating a private repository.

The first step is to authenticate the Docker CLI to your ECR registry as follows:

aws ecr get-login-password --region ap-northeast-2 | docker login --username AWS --password-stdin 123456789012.dkr.ecr.ap-northeast-2.amazonaws.com 

The second step is to tag your image to match your repository name, and deploy the image to Amazon ECR using the docker push command:

docker tag  hello-world:latest 123456789012.dkr.ecr. ap-northeast-2.amazonaws.com/hello-world:latest
docker push 123456789012.dkr.ecr. ap-northeast-2.amazonaws.com/hello-world:latest     

Deploy Lambda functions as container images

To create your Lambda function, complete the following steps:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose Create function.
  3. Choose the Container image option.
  4. For Function name, enter a name.
  5. For Container image URI, provide a container image. You can enter the ECR image URI or browse for the ECR image.
  6. Under Container image overrides, you can override configuration settings such as the entry point or working directory that are included in the Dockerfile.
  7. Under Permissions, expand Change default execution role.
  8. Choose to create a new role or use an existing role.
  9. Choose Create function.

Key considerations

To handle a large amount of data concurrently and quickly, Encored needed to store GRIB formatted files in the ephemeral storage (/tmp) that comes with Lambda. To achieve this requirement, Encored used tempfile.NamedTemporaryFile, which allows users to create temporary files easily that are deleted when no longer needed. With Lambda, you can configure ephemeral storage between 512 MB–10,240 MB for reading or writing data, allowing you to run ETL jobs, ML inference, or other data-intensive workloads.

Business outcome

Hyoseop Lee (CTO at Encored Technologies) said, “Encored has experienced positive outcomes since migrating to AWS Cloud. Initially, there was a perception that running workloads on AWS would be more expensive than using an on-premises environment. However, we discovered that this was not the case once we started running our applications on AWS. One of the most fascinating aspects of AWS services is the flexible architecture options it provides for processing, storing, and accessing large volumes of data that are only required infrequently.”

Conclusion

In this post, we covered how Encored built serverless data pipelines with Lambda and Amazon ECR to achieve performance improvement, cost reduction, and operational efficiency.

Encored successfully built an architecture that will support their global expansion and enhance technical capabilities through AWS services and the AWS Data Lab program. Based on the architecture and various internal datasets Encored has consolidated and curated, Encored plans to provide renewable energy forecasting and energy trading services.

Thanks for reading this post and hopefully you found it useful. To accelerate your digital transformation with ML, AWS is available to support you by providing prescriptive architectural guidance on a particular use case, sharing best practices, and removing technical roadblocks. You’ll leave the engagement with an architecture or working prototype that is custom fit to your needs, a path to production, and deeper knowledge of AWS services. Please contact your AWS Account Manager or Solutions Architect to get started. If you don’t have an AWS Account Manager, please contact Sales.

To learn more about ML inference use cases with Lambda, check out the following blog posts:

These resources will provide you with valuable insights and practical examples of how to use Lambda for ML inference.


About the Authors

leeSeonJeong Lee is the Head of Algorithms at Encored. She is a data practitioner who finds peace of mind from beautiful codes and formulas.

yimJaeRyun Yim is a Senior Data Scientist at Encored. He is striving to improve both work and life by focusing on simplicity and essence in my work.

yangHyeonSeok Yang is the platform team lead at Encored. He always strives to work with passion and spirit to keep challenging like a junior developer, and become a role model for others.

youngguYounggu Yun works at AWS Data Lab in Korea. His role involves helping customers across the APAC region meet their business objectives and overcome technical challenges by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

How SOCAR handles large IoT data with Amazon MSK and Amazon ElastiCache for Redis

Post Syndicated from Younggu Yun original https://aws.amazon.com/blogs/big-data/how-socar-handles-large-iot-data-with-amazon-msk-and-amazon-elasticache-for-redis/

This is a guest blog post co-written with SangSu Park and JaeHong Ahn from SOCAR. 

As companies continue to expand their digital footprint, the importance of real-time data processing and analysis cannot be overstated. The ability to quickly measure and draw insights from data is critical in today’s business landscape, where rapid decision-making is key. With this capability, businesses can stay ahead of the curve and develop new initiatives that drive success.

This post is a continuation of How SOCAR built a streaming data pipeline to process IoT data for real-time analytics and control. In this post, we provide a detailed overview of streaming messages with Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon ElastiCache for Redis, covering technical aspects and design considerations that are essential for achieving optimal results.

SOCAR is the leading Korean mobility company with strong competitiveness in car-sharing. SOCAR wanted to design and build a solution for a new Fleet Management System (FMS). This system involves the collection, processing, storage, and analysis of Internet of Things (IoT) streaming data from various vehicle devices, as well as historical operational data such as location, speed, fuel level, and component status.

This post demonstrates a solution for SOCAR’s production application that allows them to load streaming data from Amazon MSK into ElastiCache for Redis, optimizing the speed and efficiency of their data processing pipeline. We also discuss the key features, considerations, and design of the solution.

Background

SOCAR operates about 20,000 cars and is planning to include other large vehicle types such as commercial vehicles and courier trucks. SOCAR has deployed in-car devices that capture data using AWS IoT Core. This data was then stored in Amazon Relational Database Service (Amazon RDS). The challenge with this approach included inefficient performance and high resource usage. Therefore, SOCAR looked for purpose-built databases tailored to the needs of their application and usage patterns while meeting the future requirements of SOCAR’s business and technical requirements. The key requirements for SOCAR included achieving maximum performance for real-time data analytics, which required storing data in an in-memory data store.

After careful consideration, ElastiCache for Redis was selected as the optimal solution due to its ability to handle complex data aggregation rules with ease. One of the challenges faced was loading data from Amazon MSK into the database, because there was no built-in Kafka connector and consumer available for this task. This post focuses on the development of a Kafka consumer application that was designed to tackle this challenge by enabling performant data loading from Amazon MSK to Redis.

Solution overview

Extracting valuable insights from streaming data can be a challenge for businesses with diverse use cases and workloads. That’s why SOCAR built a solution to seamlessly bring data from Amazon MSK into multiple purpose-built databases, while also empowering users to transform data as needed. With fully managed Apache Kafka, Amazon MSK provides a reliable and efficient platform for ingesting and processing real-time data.

The following figure shows an example of the data flow at SOCAR.

solution overview

This architecture consists of three components:

  • Streaming data – Amazon MSK serves as a scalable and reliable platform for streaming data, capable of receiving and storing messages from a variety of sources, including AWS IoT Core, with messages organized into multiple topics and partitions
  • Consumer application – With a consumer application, users can seamlessly bring data from Amazon MSK into a target database or data storage while also defining data transformation rules as needed
  • Target databases – With the consumer application, the SOCAR team was able to load data from Amazon MSK into two separate databases, each serving a specific workload

Although this post focuses on a specific use case with ElastiCache for Redis as the target database and a single topic called gps, the consumer application we describe can handle additional topics and messages, as well as different streaming sources and target databases such as Amazon DynamoDB. Our post covers the most important aspects of the consumer application, including its features and components, design considerations, and a detailed guide to the code implementation.

Components of the consumer application

The consumer application comprises three main parts that work together to consume, transform, and load messages from Amazon MSK into a target database. The following diagram shows an example of data transformations in the handler component.

consumer-application

The details of each component are as follows:

  • Consumer – This consumes messages from Amazon MSK and then forwards the messages to a downstream handler.
  • Loader – This is where users specify a target database. For example, SOCAR’s target databases include ElastiCache for Redis and DynamoDB.
  • Handler – This is where users can apply data transformation rules to the incoming messages before loading them into a target database.

Features of the consumer application

This connection has three features:

  • Scalability – This solution is designed to be scalable, ensuring that the consumer application can handle an increasing volume of data and accommodate additional applications in the future. For instance, SOCAR sought to develop a solution capable of handling not only the current data from approximately 20,000 vehicles but also a larger volume of messages as the business and data continue to grow rapidly.
  • Performance – With this consumer application, users can achieve consistent performance, even as the volume of source messages and target databases increases. The application supports multithreading, allowing for concurrent data processing, and can handle unexpected spikes in data volume by easily increasing compute resources.
  • Flexibility – This consumer application can be reused for any new topics without having to build the entire consumer application again. The consumer application can be used to ingest new messages with different configuration values in the handler. SOCAR deployed multiple handlers to ingest many different messages. Also, this consumer application allows users to add additional target locations. For example, SOCAR initially developed a solution for ElastiCache for Redis and then replicated the consumer application for DynamoDB.

Design considerations of the consumer application

Note the following design considerations for the consumer application:

  • Scale out – A key design principle of this solution is scalability. To achieve this, the consumer application runs with Amazon Elastic Kubernetes Service (Amazon EKS) because it can allow users to increase and replicate consumer applications easily.
  • Consumption patterns – To receive, store, and consume data efficiently, it’s important to design Kafka topics depending on messages and consumption patterns. Depending on messages consumed at the end, messages can be received into multiple topics of different schemas. For example, SOCAR has many different topics that are consumed by different workloads.
  • Purpose-built database – The consumer application supports loading data into multiple target options based on the specific use case. For example, SOCAR stored real-time IoT data in ElastiCache for Redis to power real-time dashboard and web applications, while storing recent trip information in DynamoDB that didn’t require real-time processing.

Walkthrough overview

The producer of this solution is AWS IoT Core, which sends out messages into a topic called gps. The target database of this solution is ElastiCache for Redis. ElastiCache for Redis a fast in-memory data store that provides sub-millisecond latency to power internet-scale, real-time applications. Built on open-source Redis and compatible with the Redis APIs, ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with the manageability, security, and scalability from Amazon to power the most demanding real-time applications.

The target location can be either another database or storage depending on the use case and workload. SOCAR uses Amazon EKS to operate the containerized solution to achieve scalability, performance, and flexibility. Amazon EKS is a managed Kubernetes service to run Kubernetes in the AWS Cloud. Amazon EKS automatically manages the availability and scalability of the Kubernetes control plane nodes responsible for scheduling containers, managing application availability, storing cluster data, and other key tasks.

For the programming language, the SOCAR team decided to use the Go Programming language, utilizing both the AWS SDK for Go and a Goroutine, a lightweight logical or virtual thread managed by the Go runtime, which makes it easy to manage multiple threads. The AWS SDK for Go simplifies the use of AWS services by providing a set of libraries that are consistent and familiar for Go developers.

In the following sections, we walk through the steps to implement the solution:

  1. Create a consumer.
  2. Create a loader.
  3. Create a handler.
  4. Build a consumer application with the consumer, loader, and handler.
  5. Deploy the consumer application.

Prerequisites

For this walkthrough, you should have the following:

Create a consumer

In this example, we use a topic called gps, and the consumer includes a Kafka client that receives messages from the topic. SOCAR created a struct and built a consumer (called NewConsumer in the code) to make it extendable. With this approach, any additional parameters and rules can be added easily.

To authenticate with Amazon MSK, SOCAR uses IAM. Because SOCAR already uses IAM to authenticate other resources, such as Amazon EKS, it uses the same IAM role (aws_msk_iam_v2) to authenticate clients for both Amazon MSK and Apache Kafka actions.

The following code creates the consumer:

type Consumer struct {
	logger      *zerolog.Logger
	kafkaReader *kafka.Reader
}

func NewConsumer(logger *zerolog.Logger, awsCfg aws.Config, brokers []string, consumerGroupID, topic string) *Consumer {
	return &Consumer{
		logger: logger,
		kafkaReader: kafka.NewReader(kafka.ReaderConfig{
			Dialer: &kafka.Dialer{
				TLS:           &tls.Config{MinVersion: tls.VersionTLS12},
				Timeout:       10 * time.Second,
				DualStack:     true,
				SASLMechanism: aws_msk_iam_v2.NewMechanism(awsCfg),
			},
			Brokers:     brokers, //
			GroupID:     consumerGroupID, //
			Topic:       topic, //
			StartOffset: kafka.LastOffset, //
		}),
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.message, error) {
	return consumer.kafkaReader.Readmessage(ctx)
}

Create a loader

The loader function, represented by the Loader struct, is responsible for loading messages to the target location, which in this case is ElastiCache for Redis. The NewLoader function initializes a new instance of the Loader struct with a logger and a Redis cluster client, which is used to communicate with the ElastiCache cluster. The redis.NewClusterClient object is initialized using the NewRedisClient function, which uses IAM to authenticate the client for Redis actions. This ensures secure and authorized access to the ElastiCache cluster. The Loader struct also contains the Close method to close the Kafka reader and free up resources.

The following code creates a loader:

type Loader struct {
	logger      *zerolog.Logger
	redisClient *redis.ClusterClient
}

func NewLoader(logger *zerolog.Logger, redisClient *redis.ClusterClient) *Loader {
	return &Loader{
		logger:      logger,
		redisClient: redisClient,
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.Message, error) {
	return consumer.kafkaReader.ReadMessage(ctx)
}

func NewRedisClient(ctx context.Context, awsCfg aws.Config, addrs []string, replicationGroupID, username string) (*redis.ClusterClient, error) {
	redisClient := redis.NewClusterClient(&redis.ClusterOptions{
		NewClient: func(opt *redis.Options) *redis.Client {
			return redis.NewClient(&redis.Options{
				Addr: opt.Addr,
				CredentialsProvider: func() (username string, password string) {
					token, err := BuildRedisIAMAuthToken(ctx, awsCfg, replicationGroupID, opt.Username)
					if err != nil {
						panic(err)
					}
					return opt.Username, token
				},
				PoolSize:    opt.PoolSize,
				PoolTimeout: opt.PoolTimeout,
				TLSConfig:   &tls.Config{InsecureSkipVerify: true},
			})
		},
		Addrs:       addrs,
		Username:    username,
		PoolSize:    100,
		PoolTimeout: 1 * time.Minute,
	})
	pong, err := redisClient.Ping(ctx).Result()
	if err != nil {
		return nil, err
	}
	if pong != "PONG" {
		return nil, fmt.Errorf("failed to verify connection to redis server")
	}
	return redisClient, nil
}

Create a handler

A handler is used to include business rules and data transformation logic that prepares data before loading it into the target location. It acts as a bridge between a consumer and a loader. In this example, the topic name is cars.gps.json, and the message includes two keys, lng and lat, with data type Float64. The business logic can be defined in a function like handlerFuncGpsToRedis and then applied as follows:

type (
	handlerFunc    func(ctx context.Context, loader *Loader, key, value []byte) error
	handlerFuncMap map[string]handlerFunc
)

var HandlerRedis = handlerFuncMap{
	"cars.gps.json":   handlerFuncGpsToRedis
}

func GetHandlerFunc(funcMap handlerFuncMap, topic string) (handlerFunc, error) {
	handlerFunc, exist := funcMap[topic]
	if !exist {
		return nil, fmt.Errorf("failed to find handler func for '%s'", topic)
	}
	return handlerFunc, nil
}

func handlerFuncGpsToRedis(ctx context.Context, loader *Loader, key, value []byte) error {
	// unmarshal raw data to map
	data := map[string]interface{}{}
	err := json.Unmarshal(value, &data)
	if err != nil {
		return err
	}

	// prepare things to load on redis as geolocation
	name := string(key)
	lng, err := getFloat64ValueFromMap(data, "lng")
	if err != nil {
		return err
	}
	lat, err := getFloat64ValueFromMap(data, "lat")
	if err != nil {
		return err
	}

	// add geolocation to redis
	return loader.RedisGeoAdd(ctx, "cars#gps", name, lng, lat)
}

Build a consumer application with the consumer, loader, and handler

Now you have created the consumer, loader, and handler. The next step is to build a consumer application using them. In a consumer application, you read messages from your stream with a consumer, transform them using a handler, and then load transformed messages into a target location with a loader. These three components are parameterized in a consumer application function such as the one shown in the following code:

type Connector struct {
	ctx    context.Context
	logger *zerolog.Logger

	consumer *Consumer
	handler  handlerFuncMap
	loader   *Loader
}

func NewConnector(ctx context.Context, logger *zerolog.Logger, consumer *Consumer, handler handlerFuncMap, loader *Loader) *Connector {
	return &Connector{
		ctx:    ctx,
		logger: logger,

		consumer: consumer,
		handler:  handler,
		loader:   loader,
	}
}

func (connector *Connector) Close() error {
	var err error = nil
	if connector.consumer != nil {
		err = connector.consumer.Close()
	}
	if connector.loader != nil {
		err = connector.loader.Close()
	}
	return err
}

func (connector *Connector) Run() error {
	wg := sync.WaitGroup{}
	defer wg.Wait()
	handlerFunc, err := GetHandlerFunc(connector.handler, connector.consumer.kafkaReader.Config().Topic)
	if err != nil {
		return err
	}
	for {
		msg, err := connector.consumer.Consume(connector.ctx)
		if err != nil {
			if errors.Is(context.Canceled, err) {
				break
			}
		}

		wg.Add(1)
		go func(key, value []byte) {
			defer wg.Done()
			err = handlerFunc(connector.ctx, connector.loader, key, value)
			if err != nil {
				connector.logger.Err(err).Msg("")
			}
		}(msg.Key, msg.Value)
	}
	return nil
}

Deploy the consumer application

To achieve maximum parallelism, SOCAR containerizes the consumer application and deploys it into multiple pods on Amazon EKS. Each consumer application contains a unique consumer, loader, and handler. For example, if you need to receive messages from a single topic with five partitions, you can deploy five identical consumer applications, each running in its own pod. Similarly, if you have two topics with three partitions each, you should deploy two consumer applications, resulting in a total of six pods. It’s a best practice to run one consumer application per topic, and the number of pods should match the number of partitions to enable concurrent message processing. The pod number can be specified in the Kubernetes deployment configuration

There are two stages in the Dockerfile. The first stage is the builder, which installs build tools and dependencies, and builds the application. The second stage is the runner, which uses a smaller base image (Alpine) and copies only the necessary files from the builder stage. It also sets the appropriate user permissions and runs the application. It’s also worth noting that the builder stage uses a specific version of the Golang image, while the runner stage uses a specific version of the Alpine image, both of which are considered to be lightweight and secure images.

The following code is an example of the Dockerfile:

# builder
FROM golang:1.18.2-alpine3.16 AS builder
RUN apk add build-base
WORKDIR /usr/src/app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o connector .

# runner
FROM alpine:3.16.0 AS runner
WORKDIR /usr/bin/app
RUN apk add --no-cache tzdata
RUN addgroup --system app && adduser --system --shell /bin/false --ingroup app app
COPY --from=builder /usr/src/app/connector .
RUN chown -R app:app /usr/bin/app
USER app
ENTRYPOINT ["/usr/bin/app/connector"]

Conclusion

In this post, we discussed SOCAR’s approach to building a consumer application that enables IoT real-time streaming from Amazon MSK to target locations such as ElastiCache for Redis. We hope you found this post informative and useful. Thank you for reading!


About the Authors

SangSu Park is the Head of Operation Group at SOCAR. His passion is to keep learning, embrace challenges, and strive for mutual growth through communication. He loves to travel in search of new cities and places.

jaehongJaeHong Ahn is a DevOps Engineer in SOCAR’s cloud infrastructure team. He is dedicated to promoting collaboration between developers and operators. He enjoys creating DevOps tools and is committed to using his coding abilities to help build a better world. He loves to cook delicious meals as a private chef for his wife.

bdb-2857-youngguYounggu Yun works at AWS Data Lab in Korea. His role involves helping customers across the APAC region meet their business objectives and overcome technical challenges by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

How the BMW Group analyses semiconductor demand with AWS Glue

Post Syndicated from Göksel SARIKAYA original https://aws.amazon.com/blogs/big-data/how-the-bmw-group-analyses-semiconductor-demand-with-aws-glue/

This is a guest post co-written by Maik Leuthold and Nick Harmening from BMW Group.

The BMW Group is headquartered in Munich, Germany, where the company oversees 149,000 employees and manufactures cars and motorcycles in over 30 production sites across 15 countries. This multinational production strategy follows an even more international and extensive supplier network.

Like many automobile companies across the world, the BMW Group has been facing challenges in its supply chain due to the worldwide semiconductor shortage. Creating transparency about BMW Group’s current and future demand of semiconductors is one key strategic aspect to resolve shortages together with suppliers and semiconductor manufacturers. The manufacturers need to know BMW Group’s exact current and future semiconductor volume information, which will effectively help steer the available worldwide supply.

The main requirement is to have an automated, transparent, and long-term semiconductor demand forecast. Additionally, this forecasting system needs to provide data enrichment steps including byproducts, serve as the master data around the semiconductor management, and enable further use cases at the BMW Group.

To enable this use case, we used the BMW Group’s cloud-native data platform called the Cloud Data Hub. In 2019, the BMW Group decided to re-architect and move its on-premises data lake to the AWS Cloud to enable data-driven innovation while scaling with the dynamic needs of the organization. The Cloud Data Hub processes and combines anonymized data from vehicle sensors and other sources across the enterprise to make it easily accessible for internal teams creating customer-facing and internal applications. To learn more about the Cloud Data Hub, refer to BMW Group Uses AWS-Based Data Lake to Unlock the Power of Data.

In this post, we share how the BMW Group analyzes semiconductor demand using AWS Glue.

Logic and systems behind the demand forecast

The first step towards the demand forecast is the identification of semiconductor-relevant components of a vehicle type. Each component is described by a unique part number, which serves as a key in all systems to identify this component. A component can be a headlight or a steering wheel, for example.

For historic reasons, the required data for this aggregation step is siloed and represented differently in diverse systems. Because each source system and data type have its own schema and format, it’s particularly difficult to perform analytics based on this data. Some source systems are already available in the Cloud Data Hub (for example, part master data), therefore it’s straightforward to consume from our AWS account. To access the remaining data sources, we need to build specific ingest jobs that read data from the respective system.

The following diagram illustrates the approach.

The data enrichment starts with an Oracle Database (Software Parts) that contains part numbers that are related to software. This can be the control unit of a headlight or a camera system for automated driving. Because semiconductors are the basis for running software, this database builds the foundation of our data processing.

In the next step, we use REST APIs (Part Relations) to enrich the data with further attributes. This includes how parts are related (for example, a specific control unit that will be installed into a headlight) and over which timespan a part number will be built into a vehicle. The knowledge about the part relations is essential to understand how a specific semiconductor, in this case the control unit, is relevant for a more general part, the headlight. The temporal information about the use of part numbers allows us to filter out outdated part numbers, which will not be used in the future and therefore have no relevance in the forecast.

The data (Part Master Data) can directly be consumed from the Cloud Data Hub. This database includes attributes about the status and material types of a part number. This information is required to filter out part numbers that we gathered in the previous steps but have no relevance for semiconductors. With the information that was gathered from the APIs, this data is also queried to extract further part numbers that weren’t ingested in the previous steps.

After data enrichment and filtering, a third-party system reads the filtered part data and enriches the semiconductor information. Subsequently, it adds the volume information of the components. Finally, it provides the overall semiconductor demand forecast centrally to the Cloud Data Hub.

Applied services

Our solution uses the serverless services AWS Glue and Amazon Simple Storage Service (Amazon S3) to run ETL (extract, transform, and load) workflows without managing an infrastructure. It also reduces the costs by paying only for the time jobs are running. The serverless approach fits our workflow’s schedule very well because we run the workload only once a week.

Because we’re using diverse data source systems as well as complex processing and aggregation, it’s important to decouple ETL jobs. This allows us to process each data source independently. We also split the data transformation into several modules (Data Aggregation, Data Filtering, and Data Preparation) to make the system more transparent and easier to maintain. This approach also helps in case of extending or modifying existing jobs.

Although each module is specific to a data source or a particular data transformation, we utilize reusable blocks inside of every job. This allows us to unify each type of operation and simplifies the procedure of adding new data sources and transformation steps in the future.

In our setup, we follow the security best practice of the least privilege principle, to ensure the information is protected from accidental or unnecessary access. Therefore, each module has AWS Identity and Access Management (IAM) roles with only the necessary permissions, namely access to only data sources and buckets the job deals with. For more information regarding security best practices, refer to Security best practices in IAM.

Solution overview

The following diagram shows the overall workflow where several AWS Glue jobs are interacting with each other sequentially.

As we mentioned earlier, we used the Cloud Data Hub, Oracle DB, and other data sources that we communicate with via the REST API. The first step of the solution is the Data Source Ingest module, which ingests the data from different data sources. For that purpose, AWS Glue jobs read information from different data sources and writes into the S3 source buckets. Ingested data is stored in the encrypted buckets, and keys are managed by AWS Key Management Service (AWS KMS).

After the Data Source Ingest step, intermediate jobs aggregate and enrich the tables with other data sources like components version and categories, model manufacture dates, and so on. Then they write them into the intermediate buckets in the Data Aggregation module, creating comprehensive and abundant data representation. Additionally, according to the business logic workflow, the Data Filtering and Data Preparation modules create the final master data table with only actual and production-relevant information.

The AWS Glue workflow manages all these ingestion jobs and filtering jobs end to end. An AWS Glue workflow schedule is configured weekly to run the workflow on Wednesdays. While the workflow is running, each job writes execution logs (info or error) into Amazon Simple Notification Service (Amazon SNS) and Amazon CloudWatch for monitoring purposes. Amazon SNS forwards the execution results to the monitoring tools, such as Mail, Teams, or Slack channels. In case of any error in the jobs, Amazon SNS also alerts the listeners about the job execution result to take action.

As the last step of the solution, the third-party system reads the master table from the prepared data bucket via Amazon Athena. After further data engineering steps like semiconductor information enrichment and volume information integration, the final master data asset is written into the Cloud Data Hub. With the data now provided in the Cloud Data Hub, other use cases can use this semiconductor master data without building several interfaces to different source systems.

Business outcome

The project results provide the BMW Group a substantial transparency about their semiconductor demand for their entire vehicle portfolio in the present and in the future. The creation of a database with that magnitude enables the BMW Group to establish even further use cases to the benefit of more supply chain transparency and clearer and deeper exchange with first-tier suppliers and semiconductor manufacturers. It helps not only to resolve the current demanding market situation, but also to be more resilient in the future. Therefore, it’s one major step to a digital, transparent supply chain.

Conclusion

This post describes how to analyze semiconductor demand from many data sources with big data jobs in an AWS Glue workflow. A serverless architecture with minimal diversity of services makes the code base and architecture simple to understand and maintain. To learn more about how to use AWS Glue workflows and jobs for serverless orchestration, visit the AWS Glue service page.


About the authors

Maik Leuthold is a Project Lead at the BMW Group for advanced analytics in the business field of supply chain and procurement, and leads the digitalization strategy for the semiconductor management.

Nick Harmening is an IT Project Lead at the BMW Group and an AWS certified Solutions Architect. He builds and operates cloud-native applications with a focus on data engineering and machine learning.

Göksel Sarikaya is a Senior Cloud Application Architect at AWS Professional Services. He enables customers to design scalable, cost-effective, and competitive applications through the innovative production of the AWS platform. He helps them to accelerate customer and partner business outcomes during their digital transformation journey.

Alexander Tselikov is a Data Architect at AWS Professional Services who is passionate about helping customers to build scalable data, analytics and ML solutions to enable timely insights and make critical business decisions.

Rahul Shaurya is a Senior Big Data Architect at Amazon Web Services. He helps and works closely with customers building data platforms and analytical applications on AWS. Outside of work, Rahul loves taking long walks with his dog Barney.

Amazon EMR on EKS widens the performance gap: Run Apache Spark workloads 5.37 times faster and at 4.3 times lower cost

Post Syndicated from Melody Yang original https://aws.amazon.com/blogs/big-data/amazon-emr-on-eks-widens-the-performance-gap-run-apache-spark-workloads-5-37-times-faster-and-at-4-3-times-lower-cost/

Amazon EMR on EKS provides a deployment option for Amazon EMR that allows organizations to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS). With EMR on EKS, Spark applications run on the Amazon EMR runtime for Apache Spark. This performance-optimized runtime offered by Amazon EMR makes your Spark jobs run fast and cost-effectively. Also, you can run other types of business applications, such as web applications and machine learning (ML) TensorFlow workloads, on the same EKS cluster. EMR on EKS simplifies your infrastructure management, maximizes resource utilization, and reduces your cost.

We have been continually improving the Spark performance in each Amazon EMR release to further shorten job runtime and optimize users’ spending on their Amazon EMR big data workloads. As of the Amazon EMR 6.5 release in January 2022, the optimized Spark runtime was 3.5 times faster than OSS Spark v3.1.2 with up to 61% lower costs. Amazon EMR 6.10 is now 1.59 times faster than Amazon EMR 6.5, which has resulted in 5.37 times better performance than OSS Spark v3.3.1 with 76.8% cost savings.

In this post, we describe the benchmark setup and results on top of the EMR on EKS environment. We also share a Spark benchmark solution that suits all Amazon EMR deployment options, so you can replicate the process in your environment for your own performance test cases. The solution uses the TPC-DS dataset and unmodified data schema and table relationships, but derives queries from TPC-DS to support the SparkSQL test cases. It is not comparable to other published TPC-DS benchmark results.

Benchmark setup

To compare with the EMR on EKS 6.5 test result detailed in the post Amazon EMR on Amazon EKS provides up to 61% lower costs and up to 68% performance improvement for Spark workloads, this benchmark for the latest release (Amazon EMR 6.10) uses the same approach: a TPC-DS benchmark framework and the same size of TPC-DS input dataset from an Amazon Simple Storage Service (Amazon S3) location. For the source data, we chose the 3 TB scale factor, which contains 17.7 billion records, approximately 924 GB compressed data in Parquet file format. The setup instructions and technical details can be found in the aws-sample repository.

In summary, the entire performance test job includes 104 SparkSQL queries and was completed in approximately 24 minutes (1,397.55 seconds) with an estimated running cost of $5.08 USD. The input data and test result outputs were both stored on Amazon S3.

The job has been configured with the following parameters that match with the previous Amazon EMR 6.5 test:

  • EMR release – EMR 6.10.0
  • Hardware:
    • Compute – 6 X c5d.9xlarge instances, 216 vCPU, 432 GiB memory in total
    • Storage – 6 x 900 NVMe SSD build-in storage
    • Amazon EBS root volume – 6 X 20GB gp2
  • Spark configuration:
    • Driver pod – 1 instance among other 7 executors on a shared Amazon Elastic Compute Cloud (Amazon EC2) node:
      • spark.driver.cores=4
      • spark.driver.memory=5g
      • spark.kubernetes.driver.limit.cores=4.1
    • Executor pod – 47 instances distributed over 6 EC2 nodes
      • spark.executor.cores=4
      • spark.executor.memory=6g
      • spark.executor.memoryOverhead=2G
      • spark.kubernetes.executor.limit.cores=4.3
  • Metadata store – We use Spark’s in-memory data catalog to store metadata for TPC-DS databases and tables—spark.sql.catalogImplementation is set to the default value in-memory. The fact tables are partitioned by the date column, which consists of partitions ranging from 200–2,100. No statistics are pre-calculated for these tables.

Results

A single test session consists of 104 Spark SQL queries that were run sequentially. We ran each Spark runtime session (EMR runtime for Apache Spark, OSS Apache Spark) three times. The Spark benchmark job produces a CSV file to Amazon S3 that summarizes the median, minimum, and maximum runtime for each individual query.

The way we calculate the final benchmark results (geomean and the total job runtime) are based on arithmetic means. We take the mean of the median, minimum, and maximum values per query using the formula of AVERAGE(), for example AVERAGE(F2:H2). Then we take a geometric mean of the average column I by the formula GEOMEAN(I2:I105) and SUM(I2:I105) for the total runtime.

Previously, we observed that EMR on EKS 6.5 is 3.5 times faster than OSS Spark on EKS, and costs 2.6 times less. From this benchmark, we found that the gap has widened: EMR on EKS 6.10 now provides a 5.37 times performance improvement on average and up to 11.61 times improved performance for individual queries over OSS Spark 3.3.1 on Amazon EKS. From the running cost perspective, we see the significant reduction by 4.3 times.

The following graph shows the performance improvement of Amazon EMR 6.10 compared to OSS Spark 3.3.1 at the individual query level. The X-axis shows the name of each query, and the Y-axis shows the total runtime in seconds on logarithmic scale. The most significant performance gains for eight queries (q14a, q14b, q23b, q24a, q24b, q4, q67, q72) demonstrated over 10 times faster for the runtime.

Job cost estimation

The cost estimate doesn’t account for Amazon S3 storage, or PUT and GET requests. The Amazon EMR on EKS uplift calculation is based on the hourly billing information provided by AWS Cost Explorer.

  • c5d.9xlarge hourly price – $1.728
  • Number of EC2 instances – 6
  • Amazon EBS storage per GB-month – $0.10
  • Amazon EBS gp2 root volume – 20GB
  • Job run time (hour)
    • OSS Spark 3.3.1 – 2.09
    • EMR on EKS 6.5.0 – 0.68
    • EMR on EKS 6.10.0 – 0.39
Cost component OSS Spark 3.3.1 on EKS EMR on EKS 6.5.0 EMR on EKS 6.10.0
Amazon EC2 $21.67 $7.05 $4.04
EMR on EKS $ – $1.57 $0.99
Amazon EKS $0.21 $0.07 $0.04
Amazon EBS root volume $0.03 $0.01 $0.01
Total $21.88 $8.70 $5.08

Performance enhancements

Although we improve on Amazon EMR’s performance with each release, Amazon EMR 6.10 contained many performance optimizations, making it 5.37 times faster than OSS Spark v3.3.1 and 1.59 times faster than our first release of 2022, Amazon EMR 6.5. This additional performance boost was achieved through the addition of multiple optimizations, including:

  • Enhancements to join performance, such as the following:
    • Shuffle-Hash Joins (SHJ) are more CPU and I/O efficient than Shuffle-Sort-Merge Joins (SMJ) when the costs of building and probing the hash table, including the availability of memory, are less than the cost of sorting and performing the merge join. However, SHJs have drawbacks, such as risk of out of memory errors due to its inability to spill to disk, which prevents them from being aggressively used across Spark in place of SMJs by default. We have optimized our use of SHJs so that they can be applied to more queries by default than in OSS Spark.
    • For some query shapes, we have eliminated redundant joins and enabled the use of more performant join types.
  • We have reduced the amount of data shuffled before joins and the potential for data explosions after joins by selectively pushing down aggregates through joins.
  • Bloom filters can improve performance by reducing the amount of data shuffled before the join. However, there are cases where bloom filters are not beneficial and can even regress performance. For example, the bloom filter introduces a dependency between stages that reduces query parallelism, but may end up filtering out relatively little data. Our enhancements allow bloom filters to be safely applied to more query plans than OSS Spark.
  • Aggregates with high-precision decimals are computationally intensive in OSS Spark. We optimized high-precision decimal computations to increasing their performance.

Summary

With version 6.10, Amazon EMR has further enhanced the EMR runtime for Apache Spark in comparison to our previous benchmark tests for Amazon EMR version 6.5. When running EMR workloads with the the equivalent Apache Spark version 3.3.1, we observed 1.59 times better performance with 41.6% cheaper costs than Amazon EMR 6.5.

With our TPC-DS benchmark setup, we observed a significant performance increase of 5.37 times and a cost reduction of 4.3 times using EMR on EKS compared to OSS Spark.

To learn more and get started with EMR on EKS, try out the EMR on EKS Workshop and visit the EMR on EKS Best Practices Guide page.


About the Authors

Melody YangMelody Yang is a Senior Big Data Solution Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Ashok Chintalapati is a software development engineer for Amazon EMR at Amazon Web Services.

Interact with Apache Iceberg tables using Amazon Athena and cross account fine-grained permissions using AWS Lake Formation

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/interact-with-apache-iceberg-tables-using-amazon-athena-and-cross-account-fine-grained-permissions-using-aws-lake-formation/

We recently announced support for AWS Lake Formation fine-grained access control policies in Amazon Athena queries for data stored in any supported file format using table formats such as Apache Iceberg, Apache Hudi and Apache Hive. AWS Lake Formation allows you to define and enforce database, table, and column-level access policies to query Iceberg tables stored in Amazon S3. Lake Formation provides an authorization and governance layer on data stored in Amazon S3. This capability requires that you upgrade to Athena engine version 3.

Large organizations often have lines of businesses (LoBs) that operate with autonomy in managing their business data. It makes sharing data across LoBs non-trivial. These organizations have adopted a federated model, with each LoB having the autonomy to make decisions on their data. They use the publisher/consumer model with a centralized governance layer that is used to enforce access controls. If you are interested in learning more about data mesh architecture, visit Design a data mesh architecture using AWS Lake Formation and AWS Glue. With Athena engine version 3, customers can use the same fine-grained controls for open data frameworks such as Apache Iceberg, Apache Hudi, and Apache Hive.

In this post, we deep dive into a use-case where you have a producer/consumer model with data sharing enabled to give restricted access to an Apache Iceberg table that the consumer can query. We’ll discuss column filtering to restrict certain rows, filtering to restrict column level access, schema evolution, and time travel.

Solution overview

To illustrate the functionality of fine-grained permissions for Apache Iceberg tables with Athena and Lake Formation, we set up the following components:

  • In the producer account:
    • An AWS Glue Data Catalog to register the schema of a table in Apache Iceberg format
    • Lake Formation to provide fine-grained access to the consumer account
    • Athena to verify data from the producer account
  • In the consumer account:
    • AWS Resource Access Manager (AWS RAM) to create a handshake between the producer Data Catalog and consumer
    • Lake Formation to provide fine-grained access to the consumer account
    • Athena to verify data from producer account

The following diagram illustrates the architecture.

Cross-account fine-grained permissions architecture

Prerequisites

Before you get started, make sure you have the following:

Data producer setup

In this section, we present the steps to set up the data producer.

Create an S3 bucket to store the table data

We create a new S3 bucket to save the data for the table:

  1. On the Amazon S3 console, create an S3 bucket with unique name (for this post, we use iceberg-athena-lakeformation-blog).
  2. Create the producer folder inside the bucket to use for the table.

Amazon S3 bucket and folder creation

Register the S3 path storing the table using Lake Formation

We register the S3 full path in Lake Formation:

  1. Navigate to the Lake Formation console.
  2. If you’re logging in for the first time, you’re prompted to create an admin user.
  3. In the navigation pane, under Register and ingest, choose Data lake locations.
  4. Choose Register location, and provide the S3 bucket path that you created earlier.
  5. Choose AWSServiceRoleForLakeFormationDataAccess for IAM role.

For additional information about roles, refer to Requirements for roles used to register locations.

If you enabled encryption of your S3 bucket, you have to provide permissions for Lake Formation to perform encryption and decryption operations. Refer to Registering an encrypted Amazon S3 location for guidance.

  1. Choose Register location.

Register Lake Formation location

Create an Iceberg table using Athena

Now let’s create the table using Athena backed by Apache Iceberg format:

  1. On the Athena console, choose Query editor in the navigation pane.
  2. If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (iceberg-athena-lakeformation-blog/producer).
  3. Choose Save.
  4. In the query editor, enter the following query (replace the location with the S3 bucket that you registered with Lake Formation). Note that we use the default database, but you can use any other database.
CREATE TABLE consumer_iceberg (
customerid bigint,
customername string,
email string,
city string,
country string,
territory string,
contactfirstname string,
contactlastname string)
LOCATION 's3://YOUR-BUCKET/producer/' -- *** Change bucket name to your bucket***
TBLPROPERTIES ('table_type'='ICEBERG')
  1. Choose Run.

Athena query editor to create Iceberg table

Share the table with the consumer account

To illustrate functionality, we implement the following scenarios:

  • Provide access to selected columns
  • Provide access to selected rows based on a filter

Complete the following steps:

  1. On the Lake Formation console, in the navigation pane under Data catalog, choose Data filters.
  2. Choose Create new filter.
  3. For Data filter name, enter blog_data_filter.
  4. For Target database, enter lf-demo-db.
  5. For Target table, enter consumer_iceberg.
  6. For Column-level access, select Include columns.
  7. Choose the columns to share with the consumer: country, address, contactfirstname, city, customerid, and customername.
  8. For Row filter expression, enter the filter country='France'.
  9. Choose Create filter.

create data filter

Now let’s grant access to the consumer account on the consumer_iceberg table.

  1. In the navigation pane, choose Tables.
  2. Select the consumer_iceberg table, and choose Grant on the Actions menu.
    Grant access to consumer account on consumer_iceberg table
  3. Select External accounts.
  4. Enter the external account ID.
    Grant data permissions
  5. Select Named data catalog resources.
  6. Choose your database and table.
  7. For Data filters, choose the data filter you created.
    Add data filter
  8. For Data filter permissions and Grantable permissions, select Select.
  9. Choose Grant.

Permissions for creating grant

Data consumer setup

To set up the data consumer, we accept the resource share and create a table using AWS RAM and Lake Formation. Complete the following steps:

  1. Log in to the consumer account and navigate to the AWS RAM console.
  2. Under Shared with me in the navigation pane, choose Resource shares.
  3. Choose your resource share.
    Resource share in consumer account
  4. Choose Accept resource share.
  5. Note the name of the resource share to use in the next steps.
    Accept resource share
  6. Navigate to the Lake Formation console.
  7. If you’re logging in for the first time, you’re prompted to create an admin user.
  8. Choose Databases in the navigation pane, then choose your database.
  9. On the Actions menu, choose Create resource link.
    Create a resource link
  10. For Resource link name, enter the name of your resource link (for example, consumer_iceberg).
  11. Choose your database and shared table.
  12. Choose Create.
    Create table with resource link

Validate the solution

Now we can run different operations on the tables to validate the fine-grained access controls.

Insert operation

Let’s insert data into the consumer_iceberg table in the producer account, and validate the data filtering works as expected in the consumer account.

  1. Log in to the producer account.
  2. On the Athena console, choose Query editor in the navigation pane.
  3. Use the following SQL to write and insert data into the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
INSERT INTO consumer_iceberg VALUES (1, 'Land of Toys Inc.', '[email protected]',
'NYC','USA', 'NA', 'James', 'xxxx 118th NE');

INSERT INTO consumer_iceberg VALUES (2, 'Reims Collectables', '[email protected]',
'Reims','France', 'EMEA', 'Josephine', 'Darakjy');

INSERT INTO consumer_iceberg VALUES (3, 'Lyon Souveniers', '[email protected]',
'Paris', 'France', 'EMEA','Art', 'Venere');

Insert data into consumer_iceberg table in the producer account

  1. Use the following SQL to read and select data in the Iceberg table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Run select query to validate rows were inserted

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Run same query in consumer account

Based on the filters, the consumer has visibility to a subset of columns, and rows where the country is France.

Update/Delete operations

Now let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Update city='Paris' WHERE city='Reims' and delete the row customerid = 3;
    UPDATE consumer_iceberg SET city= 'Paris' WHERE city= 'Reims' ;

    Run update query in producer account

DELETE FROM consumer_iceberg WHERE customerid =3;

Run delete query in producer account

  1. Verify the updated and deleted dataset:
SELECT * FROM consumer_iceberg;

Verify update and delete reflected in producer account

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Verify update and delete in consumer account

We can observe that only one row is available and the city is updated to Paris.

Schema evolution: Add a new column

Let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Add a new column called geo_loc in the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
ALTER TABLE consumer_iceberg ADD COLUMNS (geo_loc string);

INSERT INTO consumer_iceberg VALUES (5, 'Test_user', '[email protected]',
'Reims','France', 'EMEA', 'Test_user', 'Test_user', 'test_geo');

SELECT * FROM consumer_iceberg;

Add a new column in producer aacccount

To provide visibility to the newly added geo_loc column, we need to update the Lake Formation data filter.

  1. On the Lake Formation console, choose Data filters in the navigation pane.
  2. Select your data filter and choose Edit.
    Update data filter
  3. Under Column-level access, add the new column (geo_loc).
  4. Choose Save.
    Add new column to data filter
  5. Log in to the consumer account.
  6. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Validate new column appears in consumer account

The new column geo_loc is visible and an additional row.

Schema evolution: Delete column

Let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Alter the table to drop the address column from the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
ALTER TABLE consumer_iceberg DROP COLUMN address;

SELECT * FROM consumer_iceberg;

Delete a column in producer account

We can observe that the column address is not present in the table.

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Validate column deletion in consumer account

The column address is not present in the table.

Time travel

We have now changed the Iceberg table multiple times. The Iceberg table keeps track of the snapshots. Complete the following steps to explore the time travel functionality:

  1. Log in to the producer account.
  2. Query the system table:
SELECT * FROM "lf-demo-db"."consumer_iceberg$snapshots" limit 10;

We can observe that we have generated multiple snapshots.

  1. Note down one of the committed_at values to use in the next steps (for this example, 2023-01-29 21:35:02.176 UTC).
    Time travel query in consumer account
  2. Use time travel to find the table snapshot. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
SELECT * FROM consumer_iceberg FOR TIMESTAMP
AS OF TIMESTAMP '2023-01-29 21:35:02.176 UTC';

Find table snapshot using time travel

Clean up

Complete the following steps to avoid incurring future charges:

  1. On the Amazon S3 console, delete the table storage bucket (for this post, iceberg-athena-lakeformation-blog).
  2. In the producer account on the Athena console, run the following commands to delete the tables you created:
DROP TABLE "lf-demo-db"."consumer_iceberg";
DROP DATABASE lf-demo-db;
  1. In the producer account on the Lake Formation console, revoke permissions to the consumer account.
    Clean up - Revoke permissions to consumer account
  2. Delete the S3 bucket used for the Athena query result location from the consumer account.

Conclusion

With the support for cross account, fine-grained access control policies for formats such as Iceberg, you have the flexibility to work with any format supported by Athena. The ability to perform CRUD operations against the data in your S3 data lake combined with Lake Formation fine-grained access controls for all tables and formats supported by Athena provides opportunities to innovate and simplify your data strategy. We’d love to hear your feedback!


About the authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team at AWS. He is an Apache Iceberg Committer and PMC member.

Chris Olson is a Software Development Engineer at AWS.

Xiaoxuan Li is a Software Development Engineer at AWS.

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Use Apache Iceberg in a data lake to support incremental data processing

Post Syndicated from Flora Wu original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/

Apache Iceberg is an open table format for very large analytic datasets, which captures metadata information on the state of datasets as they evolve and change over time. It adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback.

Apache Iceberg integration is supported by AWS analytics services including Amazon EMR, Amazon Athena, and AWS Glue. Amazon EMR can provision clusters with Spark, Hive, Trino, and Flink that can run Iceberg. Starting with Amazon EMR version 6.5.0, you can use Iceberg with your EMR cluster without requiring a bootstrap action. In early 2022, AWS announced general availability of Athena ACID transactions, powered by Apache Iceberg. The recently released Athena query engine version 3 provides better integration with the Iceberg table format. AWS Glue 3.0 and later supports the Apache Iceberg framework for data lakes.

In this post, we discuss what customers want in modern data lakes and how Apache Iceberg helps address customer needs. Then we walk through a solution to build a high-performance and evolving Iceberg data lake on Amazon Simple Storage Service (Amazon S3) and process incremental data by running insert, update, and delete SQL statements. Finally, we show you how to performance tune the process to improve read and write performance.

How Apache Iceberg addresses what customers want in modern data lakes

More and more customers are building data lakes, with structured and unstructured data, to support many users, applications, and analytics tools. There is an increased need for data lakes to support database like features such as ACID transactions, record-level updates and deletes, time travel, and rollback. Apache Iceberg is designed to support these features on cost-effective petabyte-scale data lakes on Amazon S3.

Apache Iceberg addresses customer needs by capturing rich metadata information about the dataset at the time the individual data files are created. There are three layers in the architecture of an Iceberg table: the Iceberg catalog, the metadata layer, and the data layer, as depicted in the following figure (source).

The Iceberg catalog stores the metadata pointer to the current table metadata file. When a select query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the location of the current metadata file. Whenever there is an update to the Iceberg table, a new snapshot of the table is created, and the metadata pointer points to the current table metadata file.

The following is an example Iceberg catalog with AWS Glue implementation. You can see the database name, the location (S3 path) of the Iceberg table, and the metadata location.

The metadata layer has three types of files: the metadata file, manifest list, and manifest file in a hierarchy. At the top of the hierarchy is the metadata file, which stores information about the table’s schema, partition information, and snapshots. The snapshot points to the manifest list. The manifest list has the information about each manifest file that makes up the snapshot, such as location of the manifest file, the partitions it belongs to, and the lower and upper bounds for partition columns for the data files it tracks. The manifest file tracks data files as well as additional details about each file, such as the file format. All three files work in a hierarchy to track the snapshots, schema, partitioning, properties, and data files in an Iceberg table.

The data layer has the individual data files of the Iceberg table. Iceberg supports a wide range of file formats including Parquet, ORC, and Avro. Because the Iceberg table tracks the individual data files instead of only pointing to the partition location with data files, it isolates the writing operations from reading operations. You can write the data files at any time, but only commit the change explicitly, which creates a new version of the snapshot and metadata files.

Solution overview

In this post, we walk you through a solution to build a high-performing Apache Iceberg data lake on Amazon S3; process incremental data with insert, update, and delete SQL statements; and tune the Iceberg table to improve read and write performance. The following diagram illustrates the solution architecture.

To demonstrate this solution, we use the Amazon Customer Reviews dataset in an S3 bucket (s3://amazon-reviews-pds/parquet/). In real use case, it would be raw data stored in your S3 bucket. We can check the data size with the following code in the AWS Command Line Interface (AWS CLI):

//Run this AWS CLI command to check the data size
aws s3 ls --summarize --human-readable --recursive s3://amazon-reviews-pds/parquet

The total object count is 430, and total size is 47.4 GiB.

To set up and test this solution, we complete the following high-level steps:

  1. Set up an S3 bucket in the curated zone to store converted data in Iceberg table format.
  2. Launch an EMR cluster with appropriate configurations for Apache Iceberg.
  3. Create a notebook in EMR Studio.
  4. Configure the Spark session for Apache Iceberg.
  5. Convert data to Iceberg table format and move data to the curated zone.
  6. Run insert, update, and delete queries in Athena to process incremental data.
  7. Carry out performance tuning.

Prerequisites

To follow along with this walkthrough, you must have an AWS account with an AWS Identity and Access Management (IAM) role that has sufficient access to provision the required resources.

Set up the S3 bucket for Iceberg data in the curated zone in your data lake

Choose the Region in which you want to create the S3 bucket and provide a unique name:

s3://iceberg-curated-blog-data

Launch an EMR cluster to run Iceberg jobs using Spark

You can create an EMR cluster from the AWS Management Console, Amazon EMR CLI, or AWS Cloud Development Kit (AWS CDK). For this post, we walk you through how to create an EMR cluster from the console.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose the latest Amazon EMR release. As of January 2023, the latest release is 6.9.0. Iceberg requires release 6.5.0 and above.
  4. Select JupyterEnterpriseGateway and Spark as the software to install.
  5. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  6. Leave other settings at their default and choose Next.
  7. For Hardware, use the default setting.
  8. Choose Next.
  9. For Cluster name, enter a name. We use iceberg-blog-cluster.
  10. Leave the remaining settings unchanged and choose Next.
  11. Choose Create cluster.

Create a notebook in EMR Studio

We now walk you through how to create a notebook in EMR Studio from the console.

  1. On the IAM console, create an EMR Studio service role.
  2. On the Amazon EMR console, choose EMR Studio.
  3. Choose Get started.

The Get started page appears in a new tab.

  1. Choose Create Studio in the new tab.
  2. Enter a name. We use iceberg-studio.
  3. Choose the same VPC and subnet as those for the EMR cluster, and the default security group.
  4. Choose AWS Identity and Access Management (IAM) for authentication, and choose the EMR Studio service role you just created.
  5. Choose an S3 path for Workspaces backup.
  6. Choose Create Studio.
  7. After the Studio is created, choose the Studio access URL.
  8. On the EMR Studio dashboard, choose Create workspace.
  9. Enter a name for your Workspace. We use iceberg-workspace.
  10. Expand Advanced configuration and choose Attach Workspace to an EMR cluster.
  11. Choose the EMR cluster you created earlier.
  12. Choose Create Workspace.
  13. Choose the Workspace name to open a new tab.

In the navigation pane, there is a notebook that has the same name as the Workspace. In our case, it is iceberg-workspace.

  1. Open the notebook.
  2. When prompted to choose a kernel, choose Spark.

Configure a Spark session for Apache Iceberg

Use the following code, providing your own S3 bucket name:

%%configure -f
{
"conf": {
"spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.demo.warehouse": "s3://iceberg-curated-blog-data",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.demo.io-impl":"org.apache.iceberg.aws.s3.S3FileIO"
}
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin.
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information.
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path defined by this property: s3://iceberg-curated-blog-data.
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step).
  • spark.sql.catalog.demo.io-impl – Iceberg allows users to write data to Amazon S3 through S3FileIO. The AWS Glue Data Catalog by default uses this FileIO, and other catalogs can load this FileIO using the io-impl catalog property.

Convert data to Iceberg table format

You can use either Spark on Amazon EMR or Athena to load the Iceberg table. In the EMR Studio Workspace notebook Spark session, run the following commands to load the data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews - this load all the parquet files
val reviews_all_location = "s3://amazon-reviews-pds/parquet/"
val reviews_all = spark.read.parquet(reviews_all_location)

// write reviews data to an Iceberg v2 table
reviews_all.writeTo("demo.reviews.all_reviews").tableProperty("format-version", "2").createOrReplace()

After you run the code, you should find two prefixes created in your data warehouse S3 path (s3://iceberg-curated-blog-data/reviews.db/all_reviews): data and metadata.

Process incremental data using insert, update, and delete SQL statements in Athena

Athena is a serverless query engine that you can use to perform read, write, update, and optimization tasks against Iceberg tables. To demonstrate how the Apache Iceberg data lake format supports incremental data ingestion, we run insert, update, and delete SQL statements on the data lake.

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure the query result location to be the S3 bucket you created earlier. You should be able to see that the table reviews.all_reviews is available for querying. Run the following query to verify that you have loaded the Iceberg table successfully:

select * from reviews.all_reviews limit 5;

Process incremental data by running insert, update, and delete SQL statements:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = 'Watches' and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = 'Watches' and star_rating=1

Performance tuning

In this section, we walk through different ways to improve Apache Iceberg read and write performance.

Configure Apache Iceberg table properties

Apache Iceberg is a table format, and it supports table properties to configure table behavior such as read, write, and catalog. You can improve the read and write performance on Iceberg tables by adjusting the table properties.

For example, if you notice that you write too many small files for an Iceberg table, you can config the write file size to write fewer but bigger size files, to help improve query performance.

Property Default Description
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes

Use the following code to alter the table format:

//Example code to alter table format in EMR Studio Workspace notebook
spark.sql("ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES ('write_target_data_file_size_bytes'='536870912')")

Partitioning and sorting

To make a query run fast, the less data read the better. Iceberg takes advantage of the rich metadata it captures at write time and facilitates techniques such as scan planning, partitioning, pruning, and column-level stats such as min/max values to skip data files that don’t have match records. We walk you through how query scan planning and partitioning work in Iceberg and how we use them to improve query performance.

Query scan planning

For a given query, the first step in a query engine is scan planning, which is the process to find the files in a table needed for a query. Planning in an Iceberg table is very efficient, because Iceberg’s rich metadata can be used to prune metadata files that aren’t needed, in addition to filtering data files that don’t contain matching data. In our tests, we observed Athena scanned 50% or less data for a given query on an Iceberg table compared to original data before conversion to Iceberg format.

There are two types of filtering:

  • Metadata filtering – Iceberg uses two levels of metadata to track the files in a snapshot: the manifest list and manifest files. It first uses the manifest list, which acts as an index of the manifest files. During planning, Iceberg filters manifests using the partition value range in the manifest list without reading all the manifest files. Then it uses selected manifest files to get data files.
  • Data filtering – After selecting the list of manifest files, Iceberg uses the partition data and column-level stats for each data file stored in manifest files to filter data files. During planning, query predicates are converted to predicates on the partition data and applied first to filter data files. Then, the column stats like column-level value counts, null counts, lower bounds, and upper bounds are used to filter out data files that can’t match the query predicate. By using upper and lower bounds to filter data files at planning time, Iceberg greatly improves query performance.

Partitioning and sorting

Partitioning is a way to group records with the same key column values together in writing. The benefit of partitioning is faster queries that access only part of the data, as explained earlier in query scan planning: data filtering. Iceberg makes partitioning simple by supporting hidden partitioning, in the way that Iceberg produces partition values by taking a column value and optionally transforming it.

In our use case, we first run the following query on the Iceberg table not partitioned. Then we partition the Iceberg table by the category of the reviews, which will be used in the query WHERE condition to filter out records. With partitioning, the query could scan much less data. See the following code:

//Example code in EMR Studio Workspace notebook to create an Iceberg table all_reviews_partitioned partitioned by product_category
reviews_all.writeTo("demo.reviews.all_reviews_partitioned").tableProperty("format-version", "2").partitionedBy($"product_category").createOrReplace()

Run the following select statement on the non-partitioned all_reviews table vs. the partitioned table to see the performance difference:

//Run this query on all_reviews table and the partitioned table for performance testing
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

//Run the same select query on partitioned dataset
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews_partitioned where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table shows the performance improvement of data partitioning, with about 50% performance improvement and 70% less data scanned.

Dataset Name Non-Partitioned Dataset Partitioned Dataset
Runtime (seconds) 8.20 4.25
Data Scanned (MB) 131.55 33.79

Note that the runtime is the average runtime with multiple runs in our test.

We saw good performance improvement after partitioning. However, this can be further improved by using column-level stats from Iceberg manifest files. In order to use the column-level stats effectively, you want to further sort your records based on the query patterns. Sorting the whole dataset using the columns that are often used in queries will reorder the data in such a way that each data file ends up with a unique range of values for the specific columns. If these columns are used in the query condition, it allows query engines to further skip data files, thereby enabling even faster queries.

Copy-on-write vs. read-on-merge

When implementing update and delete on Iceberg tables in the data lake, there are two approaches defined by the Iceberg table properties:

  • Copy-on-write – With this approach, when there are changes to the Iceberg table, either updates or deletes, the data files associated with the impacted records will be duplicated and updated. The records will be either updated or deleted from the duplicated data files. A new snapshot of the Iceberg table will be created and pointing to the newer version of data files. This makes the overall writes slower. There might be situations that concurrent writes are needed with conflicts so retry has to happen, which increases the write time even more. On the other hand, when reading the data, there is no extra process needed. The query will retrieve data from the latest version of data files.
  • Merge-on-read – With this approach, when there are updates or deletes on the Iceberg table, the existing data files will not be rewritten; instead new delete files will be created to track the changes. For deletes, a new delete file will be created with the deleted records. When reading the Iceberg table, the delete file will be applied to the retrieved data to filter out the delete records. For updates, a new delete file will be created to mark the updated records as deleted. Then a new file will be created for those records but with updated values. When reading the Iceberg table, both the delete and new files will be applied to the retrieved data to reflect the latest changes and produce the correct results. So, for any subsequent queries, an extra step to merge the data files with the delete and new files will happen, which will usually increase the query time. On the other hand, the writes might be faster because there is no need to rewrite the existing data files.

To test the impact of the two approaches, you can run the following code to set the Iceberg table properties:

//Run code to alter Iceberg table property to set copy-on-write and merge-on-read in EMR Studio Workspace notebook
spark.sql(“ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES (‘write.delete.mode’=’copy-on-write’,’write.update.mode’=’copy-on-write’)”)

Run the update, delete, and select SQL statements in Athena to show the runtime difference for copy-on-write vs. merge-on-read:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = ‘Watches’ and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = ‘Watches’ and star_rating=1

//Example select statement
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = ‘Watches’ and review_date between date(‘2005-01-01’) and date(‘2005-03-31’)

The following table summarizes the query runtimes.

Query Copy-on-Write Merge-on-Read
UPDATE DELETE SELECT UPDATE DELETE SELECT
Runtime (seconds) 66.251 116.174 97.75 10.788 54.941 113.44
Data scanned (MB) 494.06 3.07 137.16 494.06 3.07 137.16

Note that the runtime is the average runtime with multiple runs in our test.

As our test results show, there are always trade-offs in the two approaches. Which approach to use depends on your use cases. In summary, the considerations come down to latency on the read vs. write. You can reference the following table and make the right choice.

. Copy-on-Write Merge-on-Read
Pros Faster reads Faster writes
Cons Expensive writes Higher latency on reads
When to use Good for frequent reads, infrequent updates and deletes or large batch updates Good for tables with frequent updates and deletes

Data compaction

If your data file size is small, you might end up with thousands or millions of files in an Iceberg table. This dramatically increases the I/O operation and slows down the queries. Furthermore, Iceberg tracks each data file in a dataset. More data files lead to more metadata. This in turn increases the overhead and I/O operation on reading metadata files. In order to improve the query performance, it’s recommended to compact small data files to larger data files.

When updating and deleting records in Iceberg table, if the read-on-merge approach is used, you might end up with many small deletes or new data files. Running compaction will combine all these files and create a newer version of the data file. This eliminates the need to reconcile them during reads. It’s recommended to have regular compaction jobs to impact reads as little as possible while still maintaining faster write speed.

Run the following data compaction command, then run the select query from Athena:

//Data compaction 
optimize reviews.all_reviews REWRITE DATA USING BIN_PACK

//Run this query before and after data compaction
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table compares the runtime before vs. after data compaction. You can see about 40% performance improvement.

Query Before Data Compaction After Data Compaction
Runtime (seconds) 97.75 32.676 seconds
Data scanned (MB) 137.16 M 189.19 M

Note that the select queries ran on the all_reviews table after update and delete operations, before and after data compaction. The runtime is the average runtime with multiple runs in our test.

Clean up

After you follow the solution walkthrough to perform the use cases, complete the following steps to clean up your resources and avoid further costs:

  1. Drop the AWS Glue tables and database from Athena or run the following code in your notebook:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.all_reviews") 
spark.sql("DROP TABLE demo.reviews.all_reviews_partitioned") 

// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the EMR Studio console, choose Workspaces in the navigation pane.
  2. Select the Workspace you created and choose Delete.
  3. On the EMR console, navigate to the Studios page.
  4. Select the Studio you created and choose Delete.
  5. On the EMR console, choose Clusters in the navigation pane.
  6. Select the cluster and choose Terminate.
  7. Delete the S3 bucket and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. Then we walked you though a solution to process incremental data in a data lake using Apache Iceberg. Finally, we had a deep dive into performance tuning to improve read and write performance for our use cases.

We hope this post provides some useful information for you to decide whether you want to adopt Apache Iceberg in your data lake solution.


About the Authors

Flora Wu is a Sr. Resident Architect at AWS Data Lab. She helps enterprise customers create data analytics strategies and build solutions to accelerate their businesses outcomes. In her spare time, she enjoys playing tennis, dancing salsa, and traveling.

Daniel Li is a Sr. Solutions Architect at Amazon Web Services. He focuses on helping customers develop, adopt, and implement cloud services and strategy. When not working, he likes spending time outdoors with his family.

Patterns for enterprise data sharing at scale

Post Syndicated from Venkata Sistla original https://aws.amazon.com/blogs/big-data/patterns-for-enterprise-data-sharing-at-scale/

Data sharing is becoming an important element of an enterprise data strategy. AWS services like AWS Data Exchange provide an avenue for companies to share or monetize their value-added data with other companies. Some organizations would like to have a data sharing platform where they can establish a collaborative and strategic approach to exchange data with a restricted group of companies in a closed, secure, and exclusive environment. For example, financial services companies and their auditors, or manufacturing companies and their supply chain partners. This fosters development of new products and services and helps improve their operational efficiency.

Data sharing is a team effort, it’s important to note that in addition to establishing the right infrastructure, successful data sharing also requires organizations to ensure that business owners sponsor data sharing initiatives. They also need to ensure that data is of high quality. Data platform owners and security teams should encourage proper data use and fix any privacy and confidentiality issues.

This blog discusses various data sharing options and common architecture patterns that organizations can adopt to set up their data sharing infrastructure based on AWS service availability and data compliance.

Data sharing options and data classification types

Organizations operate across a spectrum of security compliance constraints. For some organizations, it’s possible to use AWS services like AWS Data Exchange. However, organizations working in heavily regulated industries like federal agencies or financial services might be limited by the allow listed AWS service options. For example, if an organization is required to operate in a Fedramp Medium or Fedramp High environment, their options to share data may be limited by the AWS services that are available and have been allow listed. Service availability is based on platform certification by AWS, and allow listing is based on the organizations defining their security compliance architecture and guidelines.

The kind of data that the organization wants to share with its partners may also have an impact on the method used for data sharing. Complying with data classification rules may further limit their choice of data sharing options they may choose.

The following are some general data classification types:

  • Public data – Important information, though often freely available for people to read, research, review and store. It typically has the lowest level of data classification and security.
  • Private data – Information you might want to keep private like email inboxes, cell phone content, employee identification numbers, or employee addresses. If private data were shared, destroyed, or altered, it might pose a slight risk to an individual or the organization.
  • Confidential or restricted data – A limited group of individuals or parties can access sensitive information often requiring special clearance or special authorization. Confidential or restricted data access might involve aspects of identity and authorization management. Examples of confidential data include Social Security numbers and vehicle identification numbers.

The following is a sample decision tree that you can refer to when choosing your data sharing option based on service availability, classification type, and data format (structured or unstructured). Other factors like usability, multi-partner accessibility, data size, consumption patterns like bulk load/API access, and more may also affect the choice of data sharing pattern.

decisiontree

In the following sections, we discuss each pattern in more detail.

Pattern 1: Using AWS Data Exchange

AWS Data Exchange makes exchanging data easier, helping organizations lower costs, become more agile, and innovate faster. Organizations can choose to share data privately using AWS Data Exchange with their external partners. AWS Data Exchange offers perimeter controls that are applied at identity and resource levels. These controls decide which external identities have access to specific data resources. AWS Data Exchange provides multiple different patterns for external parties to access data, such as the following:

The following diagram illustrates an example architecture.

pattern1

With AWS Data Exchange, once the dataset to share (or sell) is configured, AWS Data Exchange automatically manages entitlements (and billing) between the producer and the consumer. The producer doesn’t have to manage policies, set up new access points, or create new Amazon Redshift data shares for each consumer, and access is automatically revoked if the subscription ends. This can significantly reduce the operational overhead in sharing data.

Pattern 2: Using AWS Lake Formation for centralized access management

You can use this pattern in cases where both the producer and consumer are on the AWS platform with an AWS account that is enabled to use AWS Lake Formation. This pattern provides a no-code approach to data sharing. The following diagram illustrates an example architecture.

pattern2

In this pattern, the central governance account has Lake Formation configured for managing access across the producer’s org accounts. Resource links from the production account Amazon Simple Storage Service (Amazon S3) bucket are created in Lake Formation. The producer grants Lake Formation permissions on an AWS Glue Data Catalog resource to an external account, or directly to an AWS Identity and Access Management (IAM) principal in another account. Lake Formation uses AWS Resource Access Manager (AWS RAM) to share the resource. If the grantee account is in the same organization as the grantor account, the shared resource is available immediately to the grantee. If the grantee account is not in the same organization, AWS RAM sends an invitation to the grantee account to accept or reject the resource grant. To make the shared resource available, the consumer administrator in the grantee account must use the AWS RAM console or AWS Command Line Interface (AWS CLI) to accept the invitation.

Authorized principals can share resources explicitly with an IAM principal in an external account. This feature is useful when the producer wants to have control over who in the external account can access the resources. The permissions the IAM principal receives are a union of direct grants and the account-level grants that are cascaded down to the principals. The data lake administrator of the recipient account can view the direct cross-account grants, but can’t revoke permissions.

Pattern 3: Using AWS Lake Formation from the producer external sharing account

The producer may have stringent security requirements where no external consumer should access their production account or their centralized governance account. They may also not have Lake Formation enabled on their production platform. In such cases, as shown in the following diagram, the producer production account (Account A) is dedicated to its internal organization users. The producer creates another account, the producer external sharing account (Account B), which is dedicated for external sharing. This gives the producer more latitude to create specific policies for specific organizations.

The following architecture diagram shows an overview of the pattern.

pattern3

The producer implements a process to create an asynchronous copy of data in Account B. The bucket can be configured for Same Region Replication (SRR) or Cross Region Replication (CRR) for objects that need to be shared. This facilitates automated refresh of data to the external account to the “External Published Datasets” S3 bucket without having to write any code.

Creating a copy of the data allows the producer to add another degree of separation between the external consumer and its production data. It also helps meet any compliance or data sovereignty requirements.

Lake Formation is set up on Account B, and the administrator creates resources links for the “External Published Datasets” S3 bucket in its account to grant access. The administrator follows the same process to grant access as described earlier.

Pattern 4: Using Amazon Redshift data sharing

This pattern is ideally suited for a producer who has most of their published data products on Amazon Redshift. This pattern also requires the producer’s external sharing account (Account B) and the consumer account (Account C) to have an encrypted Amazon Redshift cluster or Amazon Redshift Serverless endpoint that meets the prerequisites for Amazon Redshift data sharing.

The following architecture diagram shows an overview of the pattern.

pattern4

Two options are possible depending on the producer’s compliance constraints:

  • Option A – The producer enables data sharing directly on the production Amazon Redshift cluster.
  • Option B – The producer may have constraints with respect to sharing the production cluster. The producer creates a simple AWS Glue job that copies data from the Amazon Redshift cluster in the production Account A to the Amazon Redshift cluster in the external Account B. This AWS Glue job can be scheduled to refresh data as needed by the consumer. When the data is available in Account B, the producer can create multiple views and multiple data shares as needed.

In both options, the producer maintains complete control over what data is being shared, and the consumer admin maintains full control over who can access the data within their organization.

After both the producer and consumer admins approve the data sharing request, the consumer user can access this data as if it were part of their own account without have to write any additional code.

Pattern 5: Sharing data securely and privately using APIs

You can adopt this pattern when the external partner doesn’t have a presence on AWS. You can also use this pattern when published data products are spread across various services like Amazon S3, Amazon Redshift, Amazon DynamoDB, and Amazon OpenSearch Service but the producer would like to maintain a single data sharing interface.

Here’s an example use case: Company A would like to share some of its log data in near-real time with its partner Company B, who uses this data to generate predictive insights for Company A. Company A stores this data in Amazon Redshift. The company wants to share this transactional information with its partner after masking the personally identifiable information (PII) in a cost-effective and secure way to generate insights. Company B doesn’t use the AWS platform.

Company A establishes a microbatch process using an AWS Lambda function or AWS Glue that queries Amazon Redshift to get incremental log data, applies the rules to redact the PII, and loads this data to the “Published Datasets” S3 bucket. This instantiates an SRR/CRR process that refreshes this data in the “External Sharing” S3 bucket.

The following diagram shows how the consumer can then use an API-based approach to access this data.

pattern5

The workflow contains the following steps:

  1. An HTTPS API request is sent from the API consumer to the API proxy layer.
  2. The HTTPS API request is forwarded from the API proxy to Amazon API Gateway in the external sharing AWS account.
  3. Amazon API Gateway calls the request receiver Lambda function.
  4. The request receiver function writes the status to a DynamoDB control table.
  5. A second Lambda function, the poller, checks the status of the results in the DynamoDB table.
  6. The poller function fetches results from Amazon S3.
  7. The poller function sends a presigned URL to download the file from the S3 bucket to the requestor via Amazon Simple Email Service (Amazon SES).
  8. The requestor downloads the file using the URL.
  9. The network perimeter AWS account only allows egress internet connection.
  10. The API proxy layer enforces both the egress security controls and perimeter firewall before the traffic leaves the producer’s network perimeter.
  11. The AWS Transit Gateway security egress VPC routing table only allows connectivity from the required producer’s subnet, while preventing internet access.

Pattern 6: Using Amazon S3 access points

Data scientists may need to work collaboratively on image, videos, and text documents. Legal and audit groups may want to share reports and statements with the auditing agencies. This pattern discusses an approach to sharing such documents. The pattern assumes that the external partners are also on AWS. Amazon S3 access points allow the producer to share access with their consumer by setting up cross-account access without having to edit bucket policies.

Access points are named network endpoints that are attached to buckets that you can use to perform S3 object operations, such as GetObject and PutObject. Each access point has distinct permissions and network controls that Amazon S3 applies for any request that is made through that access point. Each access point enforces a customized access point policy that works in conjunction with the bucket policy attached to the underlying bucket.

The following architecture diagram shows an overview of the pattern.

pattern6

The producer creates an S3 bucket and enables the use of access points. As part of the configuration, the producer specifies the consumer account, IAM role, and privileges for the consumer IAM role.

The consumer users with the IAM role in the consumer account can access the S3 bucket via the internet or restricted to an Amazon VPC via VPC endpoints and AWS PrivateLink.

Conclusion

Each organization has its unique set of constraints and requirements that it needs to fulfill to set up an efficient data sharing solution. In this post, we demonstrated various options and best practices available to organizations. The data platform owner and security team should work together to assess what works best for your specific situation. Your AWS account team is also available to help.

Related resources

For more information on related topics, refer to the following:


About the Authors


Venkata Sistla
is a Cloud Architect – Data & Analytics at AWS. He specializes in building data processing capabilities and helping customers remove constraints that prevent them from leveraging their data to develop business insights.

Santosh Chiplunkar is a Principal Resident Architect at AWS. He has over 20 years of experience helping customers solve their data challenges. He helps customers develop their data and analytics strategy and provides them with guidance on how to make it a reality.

Automate replication of relational sources into a transactional data lake with Apache Iceberg and AWS Glue

Post Syndicated from Luis Gerardo Baeza original https://aws.amazon.com/blogs/big-data/automate-replication-of-relational-sources-into-a-transactional-data-lake-with-apache-iceberg-and-aws-glue/

Organizations have chosen to build data lakes on top of Amazon Simple Storage Service (Amazon S3) for many years. A data lake is the most popular choice for organizations to store all their organizational data generated by different teams, across business domains, from all different formats, and even over history. According to a study, the average company is seeing the volume of their data growing at a rate that exceeds 50% per year, usually managing an average of 33 unique data sources for analysis.

Teams often try to replicate thousands of jobs from relational databases with the same extract, transform, and load (ETL) pattern. There is lot of effort in maintaining the job states and scheduling these individual jobs. This approach helps the teams add tables with few changes and also maintains the job status with minimum effort. This can lead to a huge improvement in the development timeline and tracking the jobs with ease.

In this post, we show you how to easily replicate all your relational data stores into a transactional data lake in an automated fashion with a single ETL job using Apache Iceberg and AWS Glue.

Solution architecture

Data lakes are usually organized using separate S3 buckets for three layers of data: the raw layer containing data in its original form, the stage layer containing intermediate processed data optimized for consumption, and the analytics layer containing aggregated data for specific use cases. In the raw layer, tables usually are organized based on their data sources, whereas tables in the stage layer are organized based on the business domains they belong to.

This post provides an AWS CloudFormation template that deploys an AWS Glue job that reads an Amazon S3 path for one data source of the data lake raw layer, and ingests the data into Apache Iceberg tables on the stage layer using AWS Glue support for data lake frameworks. The job expects tables in the raw layer to be structured in the way AWS Database Migration Service (AWS DMS) ingests them: schema, then table, then data files.

This solution uses AWS Systems Manager Parameter Store for table configuration. You should modify this parameter specifying the tables you want to process and how, including information such as primary key, partitions, and the business domain associated. The job uses this information to automatically create a database (if it doesn’t already exist) for every business domain, create the Iceberg tables, and perform the data loading.

Finally, we can use Amazon Athena to query the data in the Iceberg tables.

The following diagram illustrates this architecture.

Solution architecture

This implementation has the following considerations:

  • All tables from the data source must have a primary key to be replicated using this solution. The primary key can be a single column or a composite key with more than one column.
  • If the data lake contains tables that don’t need upserts or don’t have a primary key, you can exclude them from the parameter configuration and implement traditional ETL processes to ingest them into the data lake. That’s outside of the scope of this post.
  • If there are additional data sources that need to be ingested, you can deploy multiple CloudFormation stacks, one to handle each data source.
  • The AWS Glue job is designed to process data in two phases: the initial load that runs after AWS DMS finishes the full load task, and the incremental load that runs on a schedule that applies change data capture (CDC) files captured by AWS DMS. Incremental processing is performed using an AWS Glue job bookmark.

There are nine steps to complete this tutorial:

  1. Set up a source endpoint for AWS DMS.
  2. Deploy the solution using AWS CloudFormation.
  3. Review the AWS DMS replication task.
  4. Optionally, add permissions for encryption and decryption or AWS Lake Formation.
  5. Review the table configuration on Parameter Store.
  6. Perform initial data loading.
  7. Perform incremental data loading.
  8. Monitor table ingestion.
  9. Schedule incremental batch data loading.

Prerequisites

Before starting this tutorial, you should already be familiar with Iceberg. If you’re not, you can get started by replicating a single table following the instructions in Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue. Additionally, set up the following:

Set up a source endpoint for AWS DMS

Before we create our AWS DMS task, we need to set up a source endpoint to connect to the source database:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. If your database is running on Amazon RDS, choose Select RDS DB instance, then choose the instance from the list. Otherwise, choose the source engine and provide the connection information either through AWS Secrets Manager or manually.
  4. For Endpoint identifier, enter a name for the endpoint; for example, source-postgresql.
  5. Choose Create endpoint.

Deploy the solution using AWS CloudFormation

Create a CloudFormation stack using the provided template. Complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. Provide a stack name, such as transactionaldl-postgresql.
  4. Enter the required parameters:
    1. DMSS3EndpointIAMRoleARN – The IAM role ARN for AWS DMS to write data into Amazon S3.
    2. ReplicationInstanceArn – The AWS DMS replication instance ARN.
    3. S3BucketStage – The name of the existing bucket used for the stage layer of the data lake.
    4. S3BucketGlue – The name of the existing S3 bucket for storing AWS Glue scripts.
    5. S3BucketRaw – The name of the existing bucket used for the raw layer of the data lake.
    6. SourceEndpointArn – The AWS DMS endpoint ARN that you created earlier.
    7. SourceName – The arbitrary identifier of the data source to replicate (for example, postgres). This is used to define the S3 path of the data lake (raw layer) where data will be stored.
  5. Do not modify the following parameters:
    1. SourceS3BucketBlog – The bucket name where the provided AWS Glue script is stored.
    2. SourceS3BucketPrefix – The bucket prefix name where the provided AWS Glue script is stored.
  6. Choose Next twice.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

After approximately 5 minutes, the CloudFormation stack is deployed.

Review the AWS DMS replication task

The AWS CloudFormation deployment created an AWS DMS target endpoint for you. Because of two specific endpoint settings, the data will be ingested as we need it on Amazon S3.

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Search for and choose the endpoint that begins with dmsIcebergs3endpoint.
  3. Review the endpoint settings:
    1. DataFormat is specified as parquet.
    2. TimestampColumnName will add the column last_update_time with the date of creation of the records on Amazon S3.

AWS DMS endpoint settings

The deployment also creates an AWS DMS replication task that begins with dmsicebergtask.

  1. Choose Replication tasks in the navigation pane and search for the task.

You will see that the Task Type is marked as Full load, ongoing replication. AWS DMS will perform an initial full load of existing data, and then create incremental files with changes performed to the source database.

On the Mapping Rules tab, there are two types of rules:

  • A selection rule with the name of the source schema and tables that will be ingested from the source database. By default, it uses the sample database provided in the prerequisites, dms_sample, and all tables with the keyword %.
  • Two transformation rules that include in the target files on Amazon S3 the schema name and table name as columns. This is used by our AWS Glue job to know to which tables the files in the data lake correspond.

To learn more about how to customize this for your own data sources, refer to Selection rules and actions.

AWS mapping rules

Let’s change some configurations to finish our task preparation.

  1. On the Actions menu, choose Modify.
  2. In the Task Settings section, under Stop task after full load completes, choose Stop after applying cached changes.

This way, we can control the initial load and incremental file generation as two different steps. We use this two-step approach to run the AWS Glue job once per each step.

  1. Under Task logs, choose Turn on CloudWatch logs.
  2. Choose Save.
  3. Wait about 1 minute for the database migration task status to show as Ready.

Add permissions for encryption and decryption or Lake Formation

Optionally, you can add permissions for encryption and decryption or Lake Formation.

Add encryption and decryption permissions

If your S3 buckets used for the raw and stage layers are encrypted using AWS Key Management Service (AWS KMS) customer managed keys, you need to add permissions to allow the AWS Glue job to access the data:

Add Lake Formation permissions

If you’re managing permissions using Lake Formation, you need to allow your AWS Glue job to create your domain’s databases and tables through the IAM role GlueJobRole.

  1. Grant permissions to create databases (for instructions, refer to Creating a Database).
  2. Grant SUPER permissions to the default database.
  3. Grant data location permissions.
  4. If you create databases manually, grant permissions on all databases to create tables. Refer to Granting table permissions using the Lake Formation console and the named resource method or Granting Data Catalog permissions using the LF-TBAC method according to your use case.

After you complete the later step of performing the initial data load, make sure to also add permissions for consumers to query the tables. The job role will become the owner of all the tables created, and the data lake admin can then perform grants to additional users.

Review table configuration in Parameter Store

The AWS Glue job that performs the data ingestion into Iceberg tables uses the table specification provided in Parameter Store. Complete the following steps to review the parameter store that was configured automatically for you. If needed, modify according to your own needs.

  1. On the Parameter Store console, choose My parameters in the navigation pane.

The CloudFormation stack created two parameters:

  • iceberg-config for job configurations
  • iceberg-tables for table configuration
  1. Choose the parameter iceberg-tables.

The JSON structure contains information that AWS Glue uses to read data and write the Iceberg tables on the target domain:

  • One object per table – The name of the object is created using the schema name, a period, and the table name; for example, schema.table.
  • primaryKey – This should be specified for every source table. You can provide a single column or a comma-separated list of columns (without spaces).
  • partitionCols – This optionally partitions columns for target tables. If you don’t want to create partitioned tables, provide an empty string. Otherwise, provide a single column or a comma-separated list of columns to be used (without spaces).
  1. If you want to use your own data source, use the following JSON code and replace the text in CAPS from the template provided. If you’re using the sample data source provided, keep the default settings:
{
    "SCHEMA_NAME.TABLE_NAME_1": {
        "primaryKey": "ONLY_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": ""
    },
    "SCHEMA_NAME.TABLE_NAME_2": {
        "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO"
    }
}
  1. Choose Save changes.

Perform initial data loading

Now that the required configuration is finished, we ingest the initial data. This step includes three parts: ingesting the data from the source relational database into the raw layer of the data lake, creating the Iceberg tables on the stage layer of the data lake, and verifying results using Athena.

Ingest data into the raw layer of the data lake

To ingest data from the relational data source (PostgreSQL if you are using the sample provided) to our transactional data lake using Iceberg, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the replication task you created and on the Actions menu, choose Restart/Resume.
  3. Wait about 5 minutes for the replication task to complete. You can monitor the tables ingested on the Statistics tab of the replication task.

AWS DMS full load statistics

After some minutes, the task finishes with the message Full load complete.

  1. On the Amazon S3 console, choose the bucket you defined as the raw layer.

Under the S3 prefix defined on AWS DMS (for example, postgres), you should see a hierarchy of folders with the following structure:

  • Schema
    • Table name
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

AWS DMS full load objects created on S3

If your S3 bucket is empty, review Troubleshooting migration tasks in AWS Database Migration Service before running the AWS Glue job.

Create and ingest data into Iceberg tables

Before running the job, let’s navigate the script of the AWS Glue job provided as part of the CloudFormation stack to understand its behavior.

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Search for the job that starts with IcebergJob- and a suffix of your CloudFormation stack name (for example, IcebergJob-transactionaldl-postgresql).
  3. Choose the job.

AWS Glue ETL job review

The job script gets the configuration it needs from Parameter Store. The function getConfigFromSSM() returns job-related configurations such as source and target buckets from where the data needs to be read and written. The variable ssmparam_table_values contain table-related information like the data domain, table name, partition columns, and primary key of the tables that needs to be ingested. See the following Python code:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables"

# Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

The script uses an arbitrary catalog name for Iceberg that is defined as my_catalog. This is implemented on the AWS Glue Data Catalog using Spark configurations, so a SQL operation pointing to my_catalog will be applied on the Data Catalog. See the following code:

catalog_name = 'my_catalog'
errored_table_list = []

# Iceberg configuration
spark = SparkSession.builder \
    .config('spark.sql.warehouse.dir', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .getOrCreate()

The script iterates over the tables defined in Parameter Store and performs the logic for detecting if the table exists and if the incoming data is an initial load or an upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values:
    # Get table data
    isTableExists = False
    schemaName, tableName = key.split('.')
    logger.info(f'Processing table : {tableName}')

The initialLoadRecordsSparkSQL() function loads initial data when no operation column is present in the S3 files. AWS DMS adds this column only to Parquet data files produced by the continuous replication (CDC). The data loading is performed using the INSERT INTO command with SparkSQL. See the following code:

sqltemp = Template("""
    INSERT INTO $catalog_name.$dbName.$tableName  ($insertTableColumnList)
    SELECT $insertTableColumnList FROM insertTable $partitionStrSQL
""")
SQLQUERY = sqltemp.substitute(
    catalog_name = catalog_name, 
    dbName = dbName, 
    tableName = tableName,
    insertTableColumnList = insertTableColumnList[ : -1],
    partitionStrSQL = partitionStrSQL)

logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Now we run the AWS Glue job to ingest the initial data into the Iceberg tables. The CloudFormation stack adds the --datalake-formats parameter, adding the required Iceberg libraries to the job.

  1. Choose Run job.
  2. Choose Job Runs to monitor the status. Wait until the status is Run Succeeded.

Verify the data loaded

To confirm that the job processed the data as expected, complete the following steps:

  1. On the Athena console, choose Query Editor in the navigation pane.
  2. Verify AwsDataCatalog is selected as the data source.
  3. Under Database, choose the data domain that you want to explore, based on the configuration you defined in the parameter store. If using the sample database provided, use sports.

Under Tables and views, we can see the list of tables that were created by the AWS Glue job.

  1. Choose the options menu (three dots) next to the first table name, then choose Preview Data.

You can see the data loaded into Iceberg tables. Amazon Athena review initial data loaded

Perform incremental data loading

Now we start capturing changes from our relational database and applying them to the transactional data lake. This step is also divided in three parts: capturing the changes, applying them to the Iceberg tables, and verifying the results.

Capture changes from the relational database

Due to the configuration we specified, the replication task stopped after running the full load phase. Now we restart the task to add incremental files with changes into the raw layer of the data lake.

  1. On the AWS DMS console, select the task we created and ran before.
  2. On the Actions menu, choose Resume.
  3. Choose Start task to start capturing changes.
  4. To trigger new file creation on the data lake, perform inserts, updates, or deletes on the tables of your source database using your preferred database administration tool. If using the sample database provided, you could run the following SQL commands:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31';

update  dms_sample.mlb_data 
set bats = 'R'
where mlb_id=506560 and bats='L';

update dms_sample.sporting_event 
set start_date  = current_date 
where id=11 and sold_out=0;
  1. On the AWS DMS task details page, choose the Table statistics tab to see the changes captured.
    AWS DMS CDC statistics
  2. Open the raw layer of the data lake to find a new file holding the incremental changes inside every table’s prefix, for example under the sporting_event prefix.

The record with changes for the sporting_event table looks like the following screenshot.

AWS DMS objects migrated into S3 with CDC

Notice the Op column in the beginning identified with an update (U). Also, the second date/time value is the control column added by AWS DMS with the time the change was captured.

CDC file schema on Amazon S3

Apply changes on the Iceberg tables using AWS Glue

Now we run the AWS Glue job again, and it will automatically process only the new incremental files since the job bookmark is enabled. Let’s review how it works.

The dedupCDCRecords() function performs deduplication of data because multiple changes to a single record ID could be captured within the same data file on Amazon S3. Deduplication is performed based on the last_update_time column added by AWS DMS that indicates the timestamp of when the change was captured. See the following Python code:

def dedupCDCRecords(inputDf, keylist):
    IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
    inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF))
    
    NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'")
    UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')")
    finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

    return finalInputDF

On line 99, the upsertRecordsSparkSQL() function performs the upsert in a similar fashion to the initial load, but this time with a SQL MERGE command.

Review the applied changes

Open the Athena console and run a query that selects the changed records on the source database. If using the provided sample database, use one the following SQL queries:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Amazon Athena review cdc data loaded

Monitor table ingestion

The AWS Glue job script is coded with simple Python exception handling to catch errors during processing a specific table. The job bookmark is saved after each table finishes processing successfully, to avoid reprocessing tables if the job run is retried for the tables with errors.

The AWS Command Line Interface (AWS CLI) provides a get-job-bookmark command for AWS Glue that provides insight into the status of the bookmark for each table processed.

  1. On the AWS Glue Studio console, choose the ETL job.
  2. Choose the Job Runs tab and copy the job run ID.
  3. Run the following command on a terminal authenticated for the AWS CLI, replacing <GLUE_JOB_RUN_ID> on line 1 with the value you copied. If your CloudFormation stack is not named transactionaldl-postgresql, provide the name of your job on line 2 of the script:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

In this solution, when a table processing causes an exception, the AWS Glue job will not fail according to this logic. Instead, the table will be added into an array that is printed after the job is complete. In such scenario, the job will be marked as failed after it tries to process the rest of the tables detected on the raw data source. This way, tables without errors don’t have to wait until the user identifies and solves the problem on the conflicting tables. The user can quickly detect job runs that had issues using the AWS Glue job run status, and identify which specific tables are causing the problem using the CloudWatch logs for the job run.

  1. The job script implements this feature with the following Python code:
# Performed for every table
        try:
            # Table processing logic
        except Exception as e:
            logger.info(f'There is an issue with table: {tableName}')
            logger.info(f'The exception is : {e}')
            errored_table_list.append(tableName)
            continue
        job.commit()
if (len(errored_table_list)):
    logger.info('Total number of errored tables are ',len(errored_table_list))
    logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ')
    raise Exception(f'***** Some tables failed to process.')

The following screenshot shows how the CloudWatch logs look for tables that cause errors on processing.

AWS Glue job monitoring with logs

Aligned with the AWS Well-Architected Framework Data Analytics Lens practices, you can adapt more sophisticated control mechanisms to identify and notify stakeholders when errors appear on the data pipelines. For example, you can use an Amazon DynamoDB control table to store all tables and job runs with errors, or using Amazon Simple Notification Service (Amazon SNS) to send alerts to operators when certain criteria is met.

Schedule incremental batch data loading

The CloudFormation stack deploys an Amazon EventBridge rule (disabled by default) that can trigger the AWS Glue job to run on a schedule. To provide your own schedule and enable the rule, complete the following steps:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Search for the rule prefixed with the name of your CloudFormation stack followed by JobTrigger (for example, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Choose the rule.
  4. Under Event Schedule, choose Edit.

The default schedule is configured to trigger every hour.

  1. Provide the schedule you want to run the job.
  2. Additionally, you can use an EventBridge cron expression by selecting A fine-grained schedule.
    Amazon EventBridge schedule ETL job
  3. When you finish setting up the cron expression, choose Next three times, and finally choose Update Rule to save changes.

The rule is created disabled by default to allow you to run the initial data load first.

  1. Activate the rule by choosing Enable.

You can use the Monitoring tab to view rule invocations, or directly on the AWS Glue Job Run details.

Conclusion

After deploying this solution, you have automated the ingestion of your tables on a single relational data source. Organizations using a data lake as their central data platform usually need to handle multiple, sometimes even tens of data sources. Also, more and more use cases require organizations to implement transactional capabilities to the data lake. You can use this solution to accelerate the adoption of such capabilities across all your relational data sources to enable new business use cases, automating the implementation process to derive more value from your data.


About the Authors

Luis Gerardo BaezaLuis Gerardo Baeza is a Big Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience helping organizations in the healthcare, financial and education sectors to adopt enterprise architecture programs, cloud computing, and data analytics capabilities. Luis currently helps organizations across Latin America to accelerate strategic data initiatives.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 10 years of experience implementing data loading, transformation, and visualization processes. SaiKiran currently helps organizations in North America to adopt modern data architectures such as data lakes and data mesh. He has experience in the retail, airline, and finance sectors.

Narendra MerlaNarendra Merla is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience in designing and productionalizing both real-time and batch-oriented data pipelines and building data lakes on both cloud and on-premises environments. Narendra currently helps organizations in North America to build and design robust data architectures, and has experience in the telecom and finance sectors.