Tag Archives: AWS Big Data

Interact with Apache Iceberg tables using Amazon Athena and cross account fine-grained permissions using AWS Lake Formation

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/interact-with-apache-iceberg-tables-using-amazon-athena-and-cross-account-fine-grained-permissions-using-aws-lake-formation/

We recently announced support for AWS Lake Formation fine-grained access control policies in Amazon Athena queries for data stored in any supported file format using table formats such as Apache Iceberg, Apache Hudi and Apache Hive. AWS Lake Formation allows you to define and enforce database, table, and column-level access policies to query Iceberg tables stored in Amazon S3. Lake Formation provides an authorization and governance layer on data stored in Amazon S3. This capability requires that you upgrade to Athena engine version 3.

Large organizations often have lines of businesses (LoBs) that operate with autonomy in managing their business data. It makes sharing data across LoBs non-trivial. These organizations have adopted a federated model, with each LoB having the autonomy to make decisions on their data. They use the publisher/consumer model with a centralized governance layer that is used to enforce access controls. If you are interested in learning more about data mesh architecture, visit Design a data mesh architecture using AWS Lake Formation and AWS Glue. With Athena engine version 3, customers can use the same fine-grained controls for open data frameworks such as Apache Iceberg, Apache Hudi, and Apache Hive.

In this post, we deep dive into a use-case where you have a producer/consumer model with data sharing enabled to give restricted access to an Apache Iceberg table that the consumer can query. We’ll discuss column filtering to restrict certain rows, filtering to restrict column level access, schema evolution, and time travel.

Solution overview

To illustrate the functionality of fine-grained permissions for Apache Iceberg tables with Athena and Lake Formation, we set up the following components:

  • In the producer account:
    • An AWS Glue Data Catalog to register the schema of a table in Apache Iceberg format
    • Lake Formation to provide fine-grained access to the consumer account
    • Athena to verify data from the producer account
  • In the consumer account:
    • AWS Resource Access Manager (AWS RAM) to create a handshake between the producer Data Catalog and consumer
    • Lake Formation to provide fine-grained access to the consumer account
    • Athena to verify data from producer account

The following diagram illustrates the architecture.

Cross-account fine-grained permissions architecture

Prerequisites

Before you get started, make sure you have the following:

Data producer setup

In this section, we present the steps to set up the data producer.

Create an S3 bucket to store the table data

We create a new S3 bucket to save the data for the table:

  1. On the Amazon S3 console, create an S3 bucket with unique name (for this post, we use iceberg-athena-lakeformation-blog).
  2. Create the producer folder inside the bucket to use for the table.

Amazon S3 bucket and folder creation

Register the S3 path storing the table using Lake Formation

We register the S3 full path in Lake Formation:

  1. Navigate to the Lake Formation console.
  2. If you’re logging in for the first time, you’re prompted to create an admin user.
  3. In the navigation pane, under Register and ingest, choose Data lake locations.
  4. Choose Register location, and provide the S3 bucket path that you created earlier.
  5. Choose AWSServiceRoleForLakeFormationDataAccess for IAM role.

For additional information about roles, refer to Requirements for roles used to register locations.

If you enabled encryption of your S3 bucket, you have to provide permissions for Lake Formation to perform encryption and decryption operations. Refer to Registering an encrypted Amazon S3 location for guidance.

  1. Choose Register location.

Register Lake Formation location

Create an Iceberg table using Athena

Now let’s create the table using Athena backed by Apache Iceberg format:

  1. On the Athena console, choose Query editor in the navigation pane.
  2. If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (iceberg-athena-lakeformation-blog/producer).
  3. Choose Save.
  4. In the query editor, enter the following query (replace the location with the S3 bucket that you registered with Lake Formation). Note that we use the default database, but you can use any other database.
CREATE TABLE consumer_iceberg (
customerid bigint,
customername string,
email string,
city string,
country string,
territory string,
contactfirstname string,
contactlastname string)
LOCATION 's3://YOUR-BUCKET/producer/' -- *** Change bucket name to your bucket***
TBLPROPERTIES ('table_type'='ICEBERG')
  1. Choose Run.

Athena query editor to create Iceberg table

Share the table with the consumer account

To illustrate functionality, we implement the following scenarios:

  • Provide access to selected columns
  • Provide access to selected rows based on a filter

Complete the following steps:

  1. On the Lake Formation console, in the navigation pane under Data catalog, choose Data filters.
  2. Choose Create new filter.
  3. For Data filter name, enter blog_data_filter.
  4. For Target database, enter lf-demo-db.
  5. For Target table, enter consumer_iceberg.
  6. For Column-level access, select Include columns.
  7. Choose the columns to share with the consumer: country, address, contactfirstname, city, customerid, and customername.
  8. For Row filter expression, enter the filter country='France'.
  9. Choose Create filter.

create data filter

Now let’s grant access to the consumer account on the consumer_iceberg table.

  1. In the navigation pane, choose Tables.
  2. Select the consumer_iceberg table, and choose Grant on the Actions menu.
    Grant access to consumer account on consumer_iceberg table
  3. Select External accounts.
  4. Enter the external account ID.
    Grant data permissions
  5. Select Named data catalog resources.
  6. Choose your database and table.
  7. For Data filters, choose the data filter you created.
    Add data filter
  8. For Data filter permissions and Grantable permissions, select Select.
  9. Choose Grant.

Permissions for creating grant

Data consumer setup

To set up the data consumer, we accept the resource share and create a table using AWS RAM and Lake Formation. Complete the following steps:

  1. Log in to the consumer account and navigate to the AWS RAM console.
  2. Under Shared with me in the navigation pane, choose Resource shares.
  3. Choose your resource share.
    Resource share in consumer account
  4. Choose Accept resource share.
  5. Note the name of the resource share to use in the next steps.
    Accept resource share
  6. Navigate to the Lake Formation console.
  7. If you’re logging in for the first time, you’re prompted to create an admin user.
  8. Choose Databases in the navigation pane, then choose your database.
  9. On the Actions menu, choose Create resource link.
    Create a resource link
  10. For Resource link name, enter the name of your resource link (for example, consumer_iceberg).
  11. Choose your database and shared table.
  12. Choose Create.
    Create table with resource link

Validate the solution

Now we can run different operations on the tables to validate the fine-grained access controls.

Insert operation

Let’s insert data into the consumer_iceberg table in the producer account, and validate the data filtering works as expected in the consumer account.

  1. Log in to the producer account.
  2. On the Athena console, choose Query editor in the navigation pane.
  3. Use the following SQL to write and insert data into the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
INSERT INTO consumer_iceberg VALUES (1, 'Land of Toys Inc.', '[email protected]',
'NYC','USA', 'NA', 'James', 'xxxx 118th NE');

INSERT INTO consumer_iceberg VALUES (2, 'Reims Collectables', '[email protected]',
'Reims','France', 'EMEA', 'Josephine', 'Darakjy');

INSERT INTO consumer_iceberg VALUES (3, 'Lyon Souveniers', '[email protected]',
'Paris', 'France', 'EMEA','Art', 'Venere');

Insert data into consumer_iceberg table in the producer account

  1. Use the following SQL to read and select data in the Iceberg table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Run select query to validate rows were inserted

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Run same query in consumer account

Based on the filters, the consumer has visibility to a subset of columns, and rows where the country is France.

Update/Delete operations

Now let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Update city='Paris' WHERE city='Reims' and delete the row customerid = 3;
    UPDATE consumer_iceberg SET city= 'Paris' WHERE city= 'Reims' ;

    Run update query in producer account

DELETE FROM consumer_iceberg WHERE customerid =3;

Run delete query in producer account

  1. Verify the updated and deleted dataset:
SELECT * FROM consumer_iceberg;

Verify update and delete reflected in producer account

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Verify update and delete in consumer account

We can observe that only one row is available and the city is updated to Paris.

Schema evolution: Add a new column

Let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Add a new column called geo_loc in the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
ALTER TABLE consumer_iceberg ADD COLUMNS (geo_loc string);

INSERT INTO consumer_iceberg VALUES (5, 'Test_user', '[email protected]',
'Reims','France', 'EMEA', 'Test_user', 'Test_user', 'test_geo');

SELECT * FROM consumer_iceberg;

Add a new column in producer aacccount

To provide visibility to the newly added geo_loc column, we need to update the Lake Formation data filter.

  1. On the Lake Formation console, choose Data filters in the navigation pane.
  2. Select your data filter and choose Edit.
    Update data filter
  3. Under Column-level access, add the new column (geo_loc).
  4. Choose Save.
    Add new column to data filter
  5. Log in to the consumer account.
  6. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Validate new column appears in consumer account

The new column geo_loc is visible and an additional row.

Schema evolution: Delete column

Let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Alter the table to drop the address column from the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
ALTER TABLE consumer_iceberg DROP COLUMN address;

SELECT * FROM consumer_iceberg;

Delete a column in producer account

We can observe that the column address is not present in the table.

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Validate column deletion in consumer account

The column address is not present in the table.

Time travel

We have now changed the Iceberg table multiple times. The Iceberg table keeps track of the snapshots. Complete the following steps to explore the time travel functionality:

  1. Log in to the producer account.
  2. Query the system table:
SELECT * FROM "lf-demo-db"."consumer_iceberg$snapshots" limit 10;

We can observe that we have generated multiple snapshots.

  1. Note down one of the committed_at values to use in the next steps (for this example, 2023-01-29 21:35:02.176 UTC).
    Time travel query in consumer account
  2. Use time travel to find the table snapshot. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
SELECT * FROM consumer_iceberg FOR TIMESTAMP
AS OF TIMESTAMP '2023-01-29 21:35:02.176 UTC';

Find table snapshot using time travel

Clean up

Complete the following steps to avoid incurring future charges:

  1. On the Amazon S3 console, delete the table storage bucket (for this post, iceberg-athena-lakeformation-blog).
  2. In the producer account on the Athena console, run the following commands to delete the tables you created:
DROP TABLE "lf-demo-db"."consumer_iceberg";
DROP DATABASE lf-demo-db;
  1. In the producer account on the Lake Formation console, revoke permissions to the consumer account.
    Clean up - Revoke permissions to consumer account
  2. Delete the S3 bucket used for the Athena query result location from the consumer account.

Conclusion

With the support for cross account, fine-grained access control policies for formats such as Iceberg, you have the flexibility to work with any format supported by Athena. The ability to perform CRUD operations against the data in your S3 data lake combined with Lake Formation fine-grained access controls for all tables and formats supported by Athena provides opportunities to innovate and simplify your data strategy. We’d love to hear your feedback!


About the authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team at AWS. He is an Apache Iceberg Committer and PMC member.

Chris Olson is a Software Development Engineer at AWS.

Xiaoxuan Li is a Software Development Engineer at AWS.

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Use Apache Iceberg in a data lake to support incremental data processing

Post Syndicated from Flora Wu original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/

Apache Iceberg is an open table format for very large analytic datasets, which captures metadata information on the state of datasets as they evolve and change over time. It adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback.

Apache Iceberg integration is supported by AWS analytics services including Amazon EMR, Amazon Athena, and AWS Glue. Amazon EMR can provision clusters with Spark, Hive, Trino, and Flink that can run Iceberg. Starting with Amazon EMR version 6.5.0, you can use Iceberg with your EMR cluster without requiring a bootstrap action. In early 2022, AWS announced general availability of Athena ACID transactions, powered by Apache Iceberg. The recently released Athena query engine version 3 provides better integration with the Iceberg table format. AWS Glue 3.0 and later supports the Apache Iceberg framework for data lakes.

In this post, we discuss what customers want in modern data lakes and how Apache Iceberg helps address customer needs. Then we walk through a solution to build a high-performance and evolving Iceberg data lake on Amazon Simple Storage Service (Amazon S3) and process incremental data by running insert, update, and delete SQL statements. Finally, we show you how to performance tune the process to improve read and write performance.

How Apache Iceberg addresses what customers want in modern data lakes

More and more customers are building data lakes, with structured and unstructured data, to support many users, applications, and analytics tools. There is an increased need for data lakes to support database like features such as ACID transactions, record-level updates and deletes, time travel, and rollback. Apache Iceberg is designed to support these features on cost-effective petabyte-scale data lakes on Amazon S3.

Apache Iceberg addresses customer needs by capturing rich metadata information about the dataset at the time the individual data files are created. There are three layers in the architecture of an Iceberg table: the Iceberg catalog, the metadata layer, and the data layer, as depicted in the following figure (source).

The Iceberg catalog stores the metadata pointer to the current table metadata file. When a select query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the location of the current metadata file. Whenever there is an update to the Iceberg table, a new snapshot of the table is created, and the metadata pointer points to the current table metadata file.

The following is an example Iceberg catalog with AWS Glue implementation. You can see the database name, the location (S3 path) of the Iceberg table, and the metadata location.

The metadata layer has three types of files: the metadata file, manifest list, and manifest file in a hierarchy. At the top of the hierarchy is the metadata file, which stores information about the table’s schema, partition information, and snapshots. The snapshot points to the manifest list. The manifest list has the information about each manifest file that makes up the snapshot, such as location of the manifest file, the partitions it belongs to, and the lower and upper bounds for partition columns for the data files it tracks. The manifest file tracks data files as well as additional details about each file, such as the file format. All three files work in a hierarchy to track the snapshots, schema, partitioning, properties, and data files in an Iceberg table.

The data layer has the individual data files of the Iceberg table. Iceberg supports a wide range of file formats including Parquet, ORC, and Avro. Because the Iceberg table tracks the individual data files instead of only pointing to the partition location with data files, it isolates the writing operations from reading operations. You can write the data files at any time, but only commit the change explicitly, which creates a new version of the snapshot and metadata files.

Solution overview

In this post, we walk you through a solution to build a high-performing Apache Iceberg data lake on Amazon S3; process incremental data with insert, update, and delete SQL statements; and tune the Iceberg table to improve read and write performance. The following diagram illustrates the solution architecture.

To demonstrate this solution, we use the Amazon Customer Reviews dataset in an S3 bucket (s3://amazon-reviews-pds/parquet/). In real use case, it would be raw data stored in your S3 bucket. We can check the data size with the following code in the AWS Command Line Interface (AWS CLI):

//Run this AWS CLI command to check the data size
aws s3 ls --summarize --human-readable --recursive s3://amazon-reviews-pds/parquet

The total object count is 430, and total size is 47.4 GiB.

To set up and test this solution, we complete the following high-level steps:

  1. Set up an S3 bucket in the curated zone to store converted data in Iceberg table format.
  2. Launch an EMR cluster with appropriate configurations for Apache Iceberg.
  3. Create a notebook in EMR Studio.
  4. Configure the Spark session for Apache Iceberg.
  5. Convert data to Iceberg table format and move data to the curated zone.
  6. Run insert, update, and delete queries in Athena to process incremental data.
  7. Carry out performance tuning.

Prerequisites

To follow along with this walkthrough, you must have an AWS account with an AWS Identity and Access Management (IAM) role that has sufficient access to provision the required resources.

Set up the S3 bucket for Iceberg data in the curated zone in your data lake

Choose the Region in which you want to create the S3 bucket and provide a unique name:

s3://iceberg-curated-blog-data

Launch an EMR cluster to run Iceberg jobs using Spark

You can create an EMR cluster from the AWS Management Console, Amazon EMR CLI, or AWS Cloud Development Kit (AWS CDK). For this post, we walk you through how to create an EMR cluster from the console.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose the latest Amazon EMR release. As of January 2023, the latest release is 6.9.0. Iceberg requires release 6.5.0 and above.
  4. Select JupyterEnterpriseGateway and Spark as the software to install.
  5. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  6. Leave other settings at their default and choose Next.
  7. For Hardware, use the default setting.
  8. Choose Next.
  9. For Cluster name, enter a name. We use iceberg-blog-cluster.
  10. Leave the remaining settings unchanged and choose Next.
  11. Choose Create cluster.

Create a notebook in EMR Studio

We now walk you through how to create a notebook in EMR Studio from the console.

  1. On the IAM console, create an EMR Studio service role.
  2. On the Amazon EMR console, choose EMR Studio.
  3. Choose Get started.

The Get started page appears in a new tab.

  1. Choose Create Studio in the new tab.
  2. Enter a name. We use iceberg-studio.
  3. Choose the same VPC and subnet as those for the EMR cluster, and the default security group.
  4. Choose AWS Identity and Access Management (IAM) for authentication, and choose the EMR Studio service role you just created.
  5. Choose an S3 path for Workspaces backup.
  6. Choose Create Studio.
  7. After the Studio is created, choose the Studio access URL.
  8. On the EMR Studio dashboard, choose Create workspace.
  9. Enter a name for your Workspace. We use iceberg-workspace.
  10. Expand Advanced configuration and choose Attach Workspace to an EMR cluster.
  11. Choose the EMR cluster you created earlier.
  12. Choose Create Workspace.
  13. Choose the Workspace name to open a new tab.

In the navigation pane, there is a notebook that has the same name as the Workspace. In our case, it is iceberg-workspace.

  1. Open the notebook.
  2. When prompted to choose a kernel, choose Spark.

Configure a Spark session for Apache Iceberg

Use the following code, providing your own S3 bucket name:

%%configure -f
{
"conf": {
"spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.demo.warehouse": "s3://iceberg-curated-blog-data",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.demo.io-impl":"org.apache.iceberg.aws.s3.S3FileIO"
}
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin.
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information.
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path defined by this property: s3://iceberg-curated-blog-data.
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step).
  • spark.sql.catalog.demo.io-impl – Iceberg allows users to write data to Amazon S3 through S3FileIO. The AWS Glue Data Catalog by default uses this FileIO, and other catalogs can load this FileIO using the io-impl catalog property.

Convert data to Iceberg table format

You can use either Spark on Amazon EMR or Athena to load the Iceberg table. In the EMR Studio Workspace notebook Spark session, run the following commands to load the data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews - this load all the parquet files
val reviews_all_location = "s3://amazon-reviews-pds/parquet/"
val reviews_all = spark.read.parquet(reviews_all_location)

// write reviews data to an Iceberg v2 table
reviews_all.writeTo("demo.reviews.all_reviews").tableProperty("format-version", "2").createOrReplace()

After you run the code, you should find two prefixes created in your data warehouse S3 path (s3://iceberg-curated-blog-data/reviews.db/all_reviews): data and metadata.

Process incremental data using insert, update, and delete SQL statements in Athena

Athena is a serverless query engine that you can use to perform read, write, update, and optimization tasks against Iceberg tables. To demonstrate how the Apache Iceberg data lake format supports incremental data ingestion, we run insert, update, and delete SQL statements on the data lake.

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure the query result location to be the S3 bucket you created earlier. You should be able to see that the table reviews.all_reviews is available for querying. Run the following query to verify that you have loaded the Iceberg table successfully:

select * from reviews.all_reviews limit 5;

Process incremental data by running insert, update, and delete SQL statements:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = 'Watches' and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = 'Watches' and star_rating=1

Performance tuning

In this section, we walk through different ways to improve Apache Iceberg read and write performance.

Configure Apache Iceberg table properties

Apache Iceberg is a table format, and it supports table properties to configure table behavior such as read, write, and catalog. You can improve the read and write performance on Iceberg tables by adjusting the table properties.

For example, if you notice that you write too many small files for an Iceberg table, you can config the write file size to write fewer but bigger size files, to help improve query performance.

Property Default Description
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes

Use the following code to alter the table format:

//Example code to alter table format in EMR Studio Workspace notebook
spark.sql("ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES ('write_target_data_file_size_bytes'='536870912')")

Partitioning and sorting

To make a query run fast, the less data read the better. Iceberg takes advantage of the rich metadata it captures at write time and facilitates techniques such as scan planning, partitioning, pruning, and column-level stats such as min/max values to skip data files that don’t have match records. We walk you through how query scan planning and partitioning work in Iceberg and how we use them to improve query performance.

Query scan planning

For a given query, the first step in a query engine is scan planning, which is the process to find the files in a table needed for a query. Planning in an Iceberg table is very efficient, because Iceberg’s rich metadata can be used to prune metadata files that aren’t needed, in addition to filtering data files that don’t contain matching data. In our tests, we observed Athena scanned 50% or less data for a given query on an Iceberg table compared to original data before conversion to Iceberg format.

There are two types of filtering:

  • Metadata filtering – Iceberg uses two levels of metadata to track the files in a snapshot: the manifest list and manifest files. It first uses the manifest list, which acts as an index of the manifest files. During planning, Iceberg filters manifests using the partition value range in the manifest list without reading all the manifest files. Then it uses selected manifest files to get data files.
  • Data filtering – After selecting the list of manifest files, Iceberg uses the partition data and column-level stats for each data file stored in manifest files to filter data files. During planning, query predicates are converted to predicates on the partition data and applied first to filter data files. Then, the column stats like column-level value counts, null counts, lower bounds, and upper bounds are used to filter out data files that can’t match the query predicate. By using upper and lower bounds to filter data files at planning time, Iceberg greatly improves query performance.

Partitioning and sorting

Partitioning is a way to group records with the same key column values together in writing. The benefit of partitioning is faster queries that access only part of the data, as explained earlier in query scan planning: data filtering. Iceberg makes partitioning simple by supporting hidden partitioning, in the way that Iceberg produces partition values by taking a column value and optionally transforming it.

In our use case, we first run the following query on the Iceberg table not partitioned. Then we partition the Iceberg table by the category of the reviews, which will be used in the query WHERE condition to filter out records. With partitioning, the query could scan much less data. See the following code:

//Example code in EMR Studio Workspace notebook to create an Iceberg table all_reviews_partitioned partitioned by product_category
reviews_all.writeTo("demo.reviews.all_reviews_partitioned").tableProperty("format-version", "2").partitionedBy($"product_category").createOrReplace()

Run the following select statement on the non-partitioned all_reviews table vs. the partitioned table to see the performance difference:

//Run this query on all_reviews table and the partitioned table for performance testing
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

//Run the same select query on partitioned dataset
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews_partitioned where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table shows the performance improvement of data partitioning, with about 50% performance improvement and 70% less data scanned.

Dataset Name Non-Partitioned Dataset Partitioned Dataset
Runtime (seconds) 8.20 4.25
Data Scanned (MB) 131.55 33.79

Note that the runtime is the average runtime with multiple runs in our test.

We saw good performance improvement after partitioning. However, this can be further improved by using column-level stats from Iceberg manifest files. In order to use the column-level stats effectively, you want to further sort your records based on the query patterns. Sorting the whole dataset using the columns that are often used in queries will reorder the data in such a way that each data file ends up with a unique range of values for the specific columns. If these columns are used in the query condition, it allows query engines to further skip data files, thereby enabling even faster queries.

Copy-on-write vs. read-on-merge

When implementing update and delete on Iceberg tables in the data lake, there are two approaches defined by the Iceberg table properties:

  • Copy-on-write – With this approach, when there are changes to the Iceberg table, either updates or deletes, the data files associated with the impacted records will be duplicated and updated. The records will be either updated or deleted from the duplicated data files. A new snapshot of the Iceberg table will be created and pointing to the newer version of data files. This makes the overall writes slower. There might be situations that concurrent writes are needed with conflicts so retry has to happen, which increases the write time even more. On the other hand, when reading the data, there is no extra process needed. The query will retrieve data from the latest version of data files.
  • Merge-on-read – With this approach, when there are updates or deletes on the Iceberg table, the existing data files will not be rewritten; instead new delete files will be created to track the changes. For deletes, a new delete file will be created with the deleted records. When reading the Iceberg table, the delete file will be applied to the retrieved data to filter out the delete records. For updates, a new delete file will be created to mark the updated records as deleted. Then a new file will be created for those records but with updated values. When reading the Iceberg table, both the delete and new files will be applied to the retrieved data to reflect the latest changes and produce the correct results. So, for any subsequent queries, an extra step to merge the data files with the delete and new files will happen, which will usually increase the query time. On the other hand, the writes might be faster because there is no need to rewrite the existing data files.

To test the impact of the two approaches, you can run the following code to set the Iceberg table properties:

//Run code to alter Iceberg table property to set copy-on-write and merge-on-read in EMR Studio Workspace notebook
spark.sql(“ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES (‘write.delete.mode’=’copy-on-write’,’write.update.mode’=’copy-on-write’)”)

Run the update, delete, and select SQL statements in Athena to show the runtime difference for copy-on-write vs. merge-on-read:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = ‘Watches’ and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = ‘Watches’ and star_rating=1

//Example select statement
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = ‘Watches’ and review_date between date(‘2005-01-01’) and date(‘2005-03-31’)

The following table summarizes the query runtimes.

Query Copy-on-Write Merge-on-Read
UPDATE DELETE SELECT UPDATE DELETE SELECT
Runtime (seconds) 66.251 116.174 97.75 10.788 54.941 113.44
Data scanned (MB) 494.06 3.07 137.16 494.06 3.07 137.16

Note that the runtime is the average runtime with multiple runs in our test.

As our test results show, there are always trade-offs in the two approaches. Which approach to use depends on your use cases. In summary, the considerations come down to latency on the read vs. write. You can reference the following table and make the right choice.

. Copy-on-Write Merge-on-Read
Pros Faster reads Faster writes
Cons Expensive writes Higher latency on reads
When to use Good for frequent reads, infrequent updates and deletes or large batch updates Good for tables with frequent updates and deletes

Data compaction

If your data file size is small, you might end up with thousands or millions of files in an Iceberg table. This dramatically increases the I/O operation and slows down the queries. Furthermore, Iceberg tracks each data file in a dataset. More data files lead to more metadata. This in turn increases the overhead and I/O operation on reading metadata files. In order to improve the query performance, it’s recommended to compact small data files to larger data files.

When updating and deleting records in Iceberg table, if the read-on-merge approach is used, you might end up with many small deletes or new data files. Running compaction will combine all these files and create a newer version of the data file. This eliminates the need to reconcile them during reads. It’s recommended to have regular compaction jobs to impact reads as little as possible while still maintaining faster write speed.

Run the following data compaction command, then run the select query from Athena:

//Data compaction 
optimize reviews.all_reviews REWRITE DATA USING BIN_PACK

//Run this query before and after data compaction
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table compares the runtime before vs. after data compaction. You can see about 40% performance improvement.

Query Before Data Compaction After Data Compaction
Runtime (seconds) 97.75 32.676 seconds
Data scanned (MB) 137.16 M 189.19 M

Note that the select queries ran on the all_reviews table after update and delete operations, before and after data compaction. The runtime is the average runtime with multiple runs in our test.

Clean up

After you follow the solution walkthrough to perform the use cases, complete the following steps to clean up your resources and avoid further costs:

  1. Drop the AWS Glue tables and database from Athena or run the following code in your notebook:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.all_reviews") 
spark.sql("DROP TABLE demo.reviews.all_reviews_partitioned") 

// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the EMR Studio console, choose Workspaces in the navigation pane.
  2. Select the Workspace you created and choose Delete.
  3. On the EMR console, navigate to the Studios page.
  4. Select the Studio you created and choose Delete.
  5. On the EMR console, choose Clusters in the navigation pane.
  6. Select the cluster and choose Terminate.
  7. Delete the S3 bucket and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. Then we walked you though a solution to process incremental data in a data lake using Apache Iceberg. Finally, we had a deep dive into performance tuning to improve read and write performance for our use cases.

We hope this post provides some useful information for you to decide whether you want to adopt Apache Iceberg in your data lake solution.


About the Authors

Flora Wu is a Sr. Resident Architect at AWS Data Lab. She helps enterprise customers create data analytics strategies and build solutions to accelerate their businesses outcomes. In her spare time, she enjoys playing tennis, dancing salsa, and traveling.

Daniel Li is a Sr. Solutions Architect at Amazon Web Services. He focuses on helping customers develop, adopt, and implement cloud services and strategy. When not working, he likes spending time outdoors with his family.

Patterns for enterprise data sharing at scale

Post Syndicated from Venkata Sistla original https://aws.amazon.com/blogs/big-data/patterns-for-enterprise-data-sharing-at-scale/

Data sharing is becoming an important element of an enterprise data strategy. AWS services like AWS Data Exchange provide an avenue for companies to share or monetize their value-added data with other companies. Some organizations would like to have a data sharing platform where they can establish a collaborative and strategic approach to exchange data with a restricted group of companies in a closed, secure, and exclusive environment. For example, financial services companies and their auditors, or manufacturing companies and their supply chain partners. This fosters development of new products and services and helps improve their operational efficiency.

Data sharing is a team effort, it’s important to note that in addition to establishing the right infrastructure, successful data sharing also requires organizations to ensure that business owners sponsor data sharing initiatives. They also need to ensure that data is of high quality. Data platform owners and security teams should encourage proper data use and fix any privacy and confidentiality issues.

This blog discusses various data sharing options and common architecture patterns that organizations can adopt to set up their data sharing infrastructure based on AWS service availability and data compliance.

Data sharing options and data classification types

Organizations operate across a spectrum of security compliance constraints. For some organizations, it’s possible to use AWS services like AWS Data Exchange. However, organizations working in heavily regulated industries like federal agencies or financial services might be limited by the allow listed AWS service options. For example, if an organization is required to operate in a Fedramp Medium or Fedramp High environment, their options to share data may be limited by the AWS services that are available and have been allow listed. Service availability is based on platform certification by AWS, and allow listing is based on the organizations defining their security compliance architecture and guidelines.

The kind of data that the organization wants to share with its partners may also have an impact on the method used for data sharing. Complying with data classification rules may further limit their choice of data sharing options they may choose.

The following are some general data classification types:

  • Public data – Important information, though often freely available for people to read, research, review and store. It typically has the lowest level of data classification and security.
  • Private data – Information you might want to keep private like email inboxes, cell phone content, employee identification numbers, or employee addresses. If private data were shared, destroyed, or altered, it might pose a slight risk to an individual or the organization.
  • Confidential or restricted data – A limited group of individuals or parties can access sensitive information often requiring special clearance or special authorization. Confidential or restricted data access might involve aspects of identity and authorization management. Examples of confidential data include Social Security numbers and vehicle identification numbers.

The following is a sample decision tree that you can refer to when choosing your data sharing option based on service availability, classification type, and data format (structured or unstructured). Other factors like usability, multi-partner accessibility, data size, consumption patterns like bulk load/API access, and more may also affect the choice of data sharing pattern.

decisiontree

In the following sections, we discuss each pattern in more detail.

Pattern 1: Using AWS Data Exchange

AWS Data Exchange makes exchanging data easier, helping organizations lower costs, become more agile, and innovate faster. Organizations can choose to share data privately using AWS Data Exchange with their external partners. AWS Data Exchange offers perimeter controls that are applied at identity and resource levels. These controls decide which external identities have access to specific data resources. AWS Data Exchange provides multiple different patterns for external parties to access data, such as the following:

The following diagram illustrates an example architecture.

pattern1

With AWS Data Exchange, once the dataset to share (or sell) is configured, AWS Data Exchange automatically manages entitlements (and billing) between the producer and the consumer. The producer doesn’t have to manage policies, set up new access points, or create new Amazon Redshift data shares for each consumer, and access is automatically revoked if the subscription ends. This can significantly reduce the operational overhead in sharing data.

Pattern 2: Using AWS Lake Formation for centralized access management

You can use this pattern in cases where both the producer and consumer are on the AWS platform with an AWS account that is enabled to use AWS Lake Formation. This pattern provides a no-code approach to data sharing. The following diagram illustrates an example architecture.

pattern2

In this pattern, the central governance account has Lake Formation configured for managing access across the producer’s org accounts. Resource links from the production account Amazon Simple Storage Service (Amazon S3) bucket are created in Lake Formation. The producer grants Lake Formation permissions on an AWS Glue Data Catalog resource to an external account, or directly to an AWS Identity and Access Management (IAM) principal in another account. Lake Formation uses AWS Resource Access Manager (AWS RAM) to share the resource. If the grantee account is in the same organization as the grantor account, the shared resource is available immediately to the grantee. If the grantee account is not in the same organization, AWS RAM sends an invitation to the grantee account to accept or reject the resource grant. To make the shared resource available, the consumer administrator in the grantee account must use the AWS RAM console or AWS Command Line Interface (AWS CLI) to accept the invitation.

Authorized principals can share resources explicitly with an IAM principal in an external account. This feature is useful when the producer wants to have control over who in the external account can access the resources. The permissions the IAM principal receives are a union of direct grants and the account-level grants that are cascaded down to the principals. The data lake administrator of the recipient account can view the direct cross-account grants, but can’t revoke permissions.

Pattern 3: Using AWS Lake Formation from the producer external sharing account

The producer may have stringent security requirements where no external consumer should access their production account or their centralized governance account. They may also not have Lake Formation enabled on their production platform. In such cases, as shown in the following diagram, the producer production account (Account A) is dedicated to its internal organization users. The producer creates another account, the producer external sharing account (Account B), which is dedicated for external sharing. This gives the producer more latitude to create specific policies for specific organizations.

The following architecture diagram shows an overview of the pattern.

pattern3

The producer implements a process to create an asynchronous copy of data in Account B. The bucket can be configured for Same Region Replication (SRR) or Cross Region Replication (CRR) for objects that need to be shared. This facilitates automated refresh of data to the external account to the “External Published Datasets” S3 bucket without having to write any code.

Creating a copy of the data allows the producer to add another degree of separation between the external consumer and its production data. It also helps meet any compliance or data sovereignty requirements.

Lake Formation is set up on Account B, and the administrator creates resources links for the “External Published Datasets” S3 bucket in its account to grant access. The administrator follows the same process to grant access as described earlier.

Pattern 4: Using Amazon Redshift data sharing

This pattern is ideally suited for a producer who has most of their published data products on Amazon Redshift. This pattern also requires the producer’s external sharing account (Account B) and the consumer account (Account C) to have an encrypted Amazon Redshift cluster or Amazon Redshift Serverless endpoint that meets the prerequisites for Amazon Redshift data sharing.

The following architecture diagram shows an overview of the pattern.

pattern4

Two options are possible depending on the producer’s compliance constraints:

  • Option A – The producer enables data sharing directly on the production Amazon Redshift cluster.
  • Option B – The producer may have constraints with respect to sharing the production cluster. The producer creates a simple AWS Glue job that copies data from the Amazon Redshift cluster in the production Account A to the Amazon Redshift cluster in the external Account B. This AWS Glue job can be scheduled to refresh data as needed by the consumer. When the data is available in Account B, the producer can create multiple views and multiple data shares as needed.

In both options, the producer maintains complete control over what data is being shared, and the consumer admin maintains full control over who can access the data within their organization.

After both the producer and consumer admins approve the data sharing request, the consumer user can access this data as if it were part of their own account without have to write any additional code.

Pattern 5: Sharing data securely and privately using APIs

You can adopt this pattern when the external partner doesn’t have a presence on AWS. You can also use this pattern when published data products are spread across various services like Amazon S3, Amazon Redshift, Amazon DynamoDB, and Amazon OpenSearch Service but the producer would like to maintain a single data sharing interface.

Here’s an example use case: Company A would like to share some of its log data in near-real time with its partner Company B, who uses this data to generate predictive insights for Company A. Company A stores this data in Amazon Redshift. The company wants to share this transactional information with its partner after masking the personally identifiable information (PII) in a cost-effective and secure way to generate insights. Company B doesn’t use the AWS platform.

Company A establishes a microbatch process using an AWS Lambda function or AWS Glue that queries Amazon Redshift to get incremental log data, applies the rules to redact the PII, and loads this data to the “Published Datasets” S3 bucket. This instantiates an SRR/CRR process that refreshes this data in the “External Sharing” S3 bucket.

The following diagram shows how the consumer can then use an API-based approach to access this data.

pattern5

The workflow contains the following steps:

  1. An HTTPS API request is sent from the API consumer to the API proxy layer.
  2. The HTTPS API request is forwarded from the API proxy to Amazon API Gateway in the external sharing AWS account.
  3. Amazon API Gateway calls the request receiver Lambda function.
  4. The request receiver function writes the status to a DynamoDB control table.
  5. A second Lambda function, the poller, checks the status of the results in the DynamoDB table.
  6. The poller function fetches results from Amazon S3.
  7. The poller function sends a presigned URL to download the file from the S3 bucket to the requestor via Amazon Simple Email Service (Amazon SES).
  8. The requestor downloads the file using the URL.
  9. The network perimeter AWS account only allows egress internet connection.
  10. The API proxy layer enforces both the egress security controls and perimeter firewall before the traffic leaves the producer’s network perimeter.
  11. The AWS Transit Gateway security egress VPC routing table only allows connectivity from the required producer’s subnet, while preventing internet access.

Pattern 6: Using Amazon S3 access points

Data scientists may need to work collaboratively on image, videos, and text documents. Legal and audit groups may want to share reports and statements with the auditing agencies. This pattern discusses an approach to sharing such documents. The pattern assumes that the external partners are also on AWS. Amazon S3 access points allow the producer to share access with their consumer by setting up cross-account access without having to edit bucket policies.

Access points are named network endpoints that are attached to buckets that you can use to perform S3 object operations, such as GetObject and PutObject. Each access point has distinct permissions and network controls that Amazon S3 applies for any request that is made through that access point. Each access point enforces a customized access point policy that works in conjunction with the bucket policy attached to the underlying bucket.

The following architecture diagram shows an overview of the pattern.

pattern6

The producer creates an S3 bucket and enables the use of access points. As part of the configuration, the producer specifies the consumer account, IAM role, and privileges for the consumer IAM role.

The consumer users with the IAM role in the consumer account can access the S3 bucket via the internet or restricted to an Amazon VPC via VPC endpoints and AWS PrivateLink.

Conclusion

Each organization has its unique set of constraints and requirements that it needs to fulfill to set up an efficient data sharing solution. In this post, we demonstrated various options and best practices available to organizations. The data platform owner and security team should work together to assess what works best for your specific situation. Your AWS account team is also available to help.

Related resources

For more information on related topics, refer to the following:


About the Authors


Venkata Sistla
is a Cloud Architect – Data & Analytics at AWS. He specializes in building data processing capabilities and helping customers remove constraints that prevent them from leveraging their data to develop business insights.

Santosh Chiplunkar is a Principal Resident Architect at AWS. He has over 20 years of experience helping customers solve their data challenges. He helps customers develop their data and analytics strategy and provides them with guidance on how to make it a reality.

Automate replication of relational sources into a transactional data lake with Apache Iceberg and AWS Glue

Post Syndicated from Luis Gerardo Baeza original https://aws.amazon.com/blogs/big-data/automate-replication-of-relational-sources-into-a-transactional-data-lake-with-apache-iceberg-and-aws-glue/

Organizations have chosen to build data lakes on top of Amazon Simple Storage Service (Amazon S3) for many years. A data lake is the most popular choice for organizations to store all their organizational data generated by different teams, across business domains, from all different formats, and even over history. According to a study, the average company is seeing the volume of their data growing at a rate that exceeds 50% per year, usually managing an average of 33 unique data sources for analysis.

Teams often try to replicate thousands of jobs from relational databases with the same extract, transform, and load (ETL) pattern. There is lot of effort in maintaining the job states and scheduling these individual jobs. This approach helps the teams add tables with few changes and also maintains the job status with minimum effort. This can lead to a huge improvement in the development timeline and tracking the jobs with ease.

In this post, we show you how to easily replicate all your relational data stores into a transactional data lake in an automated fashion with a single ETL job using Apache Iceberg and AWS Glue.

Solution architecture

Data lakes are usually organized using separate S3 buckets for three layers of data: the raw layer containing data in its original form, the stage layer containing intermediate processed data optimized for consumption, and the analytics layer containing aggregated data for specific use cases. In the raw layer, tables usually are organized based on their data sources, whereas tables in the stage layer are organized based on the business domains they belong to.

This post provides an AWS CloudFormation template that deploys an AWS Glue job that reads an Amazon S3 path for one data source of the data lake raw layer, and ingests the data into Apache Iceberg tables on the stage layer using AWS Glue support for data lake frameworks. The job expects tables in the raw layer to be structured in the way AWS Database Migration Service (AWS DMS) ingests them: schema, then table, then data files.

This solution uses AWS Systems Manager Parameter Store for table configuration. You should modify this parameter specifying the tables you want to process and how, including information such as primary key, partitions, and the business domain associated. The job uses this information to automatically create a database (if it doesn’t already exist) for every business domain, create the Iceberg tables, and perform the data loading.

Finally, we can use Amazon Athena to query the data in the Iceberg tables.

The following diagram illustrates this architecture.

Solution architecture

This implementation has the following considerations:

  • All tables from the data source must have a primary key to be replicated using this solution. The primary key can be a single column or a composite key with more than one column.
  • If the data lake contains tables that don’t need upserts or don’t have a primary key, you can exclude them from the parameter configuration and implement traditional ETL processes to ingest them into the data lake. That’s outside of the scope of this post.
  • If there are additional data sources that need to be ingested, you can deploy multiple CloudFormation stacks, one to handle each data source.
  • The AWS Glue job is designed to process data in two phases: the initial load that runs after AWS DMS finishes the full load task, and the incremental load that runs on a schedule that applies change data capture (CDC) files captured by AWS DMS. Incremental processing is performed using an AWS Glue job bookmark.

There are nine steps to complete this tutorial:

  1. Set up a source endpoint for AWS DMS.
  2. Deploy the solution using AWS CloudFormation.
  3. Review the AWS DMS replication task.
  4. Optionally, add permissions for encryption and decryption or AWS Lake Formation.
  5. Review the table configuration on Parameter Store.
  6. Perform initial data loading.
  7. Perform incremental data loading.
  8. Monitor table ingestion.
  9. Schedule incremental batch data loading.

Prerequisites

Before starting this tutorial, you should already be familiar with Iceberg. If you’re not, you can get started by replicating a single table following the instructions in Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue. Additionally, set up the following:

Set up a source endpoint for AWS DMS

Before we create our AWS DMS task, we need to set up a source endpoint to connect to the source database:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. If your database is running on Amazon RDS, choose Select RDS DB instance, then choose the instance from the list. Otherwise, choose the source engine and provide the connection information either through AWS Secrets Manager or manually.
  4. For Endpoint identifier, enter a name for the endpoint; for example, source-postgresql.
  5. Choose Create endpoint.

Deploy the solution using AWS CloudFormation

Create a CloudFormation stack using the provided template. Complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. Provide a stack name, such as transactionaldl-postgresql.
  4. Enter the required parameters:
    1. DMSS3EndpointIAMRoleARN – The IAM role ARN for AWS DMS to write data into Amazon S3.
    2. ReplicationInstanceArn – The AWS DMS replication instance ARN.
    3. S3BucketStage – The name of the existing bucket used for the stage layer of the data lake.
    4. S3BucketGlue – The name of the existing S3 bucket for storing AWS Glue scripts.
    5. S3BucketRaw – The name of the existing bucket used for the raw layer of the data lake.
    6. SourceEndpointArn – The AWS DMS endpoint ARN that you created earlier.
    7. SourceName – The arbitrary identifier of the data source to replicate (for example, postgres). This is used to define the S3 path of the data lake (raw layer) where data will be stored.
  5. Do not modify the following parameters:
    1. SourceS3BucketBlog – The bucket name where the provided AWS Glue script is stored.
    2. SourceS3BucketPrefix – The bucket prefix name where the provided AWS Glue script is stored.
  6. Choose Next twice.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

After approximately 5 minutes, the CloudFormation stack is deployed.

Review the AWS DMS replication task

The AWS CloudFormation deployment created an AWS DMS target endpoint for you. Because of two specific endpoint settings, the data will be ingested as we need it on Amazon S3.

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Search for and choose the endpoint that begins with dmsIcebergs3endpoint.
  3. Review the endpoint settings:
    1. DataFormat is specified as parquet.
    2. TimestampColumnName will add the column last_update_time with the date of creation of the records on Amazon S3.

AWS DMS endpoint settings

The deployment also creates an AWS DMS replication task that begins with dmsicebergtask.

  1. Choose Replication tasks in the navigation pane and search for the task.

You will see that the Task Type is marked as Full load, ongoing replication. AWS DMS will perform an initial full load of existing data, and then create incremental files with changes performed to the source database.

On the Mapping Rules tab, there are two types of rules:

  • A selection rule with the name of the source schema and tables that will be ingested from the source database. By default, it uses the sample database provided in the prerequisites, dms_sample, and all tables with the keyword %.
  • Two transformation rules that include in the target files on Amazon S3 the schema name and table name as columns. This is used by our AWS Glue job to know to which tables the files in the data lake correspond.

To learn more about how to customize this for your own data sources, refer to Selection rules and actions.

AWS mapping rules

Let’s change some configurations to finish our task preparation.

  1. On the Actions menu, choose Modify.
  2. In the Task Settings section, under Stop task after full load completes, choose Stop after applying cached changes.

This way, we can control the initial load and incremental file generation as two different steps. We use this two-step approach to run the AWS Glue job once per each step.

  1. Under Task logs, choose Turn on CloudWatch logs.
  2. Choose Save.
  3. Wait about 1 minute for the database migration task status to show as Ready.

Add permissions for encryption and decryption or Lake Formation

Optionally, you can add permissions for encryption and decryption or Lake Formation.

Add encryption and decryption permissions

If your S3 buckets used for the raw and stage layers are encrypted using AWS Key Management Service (AWS KMS) customer managed keys, you need to add permissions to allow the AWS Glue job to access the data:

Add Lake Formation permissions

If you’re managing permissions using Lake Formation, you need to allow your AWS Glue job to create your domain’s databases and tables through the IAM role GlueJobRole.

  1. Grant permissions to create databases (for instructions, refer to Creating a Database).
  2. Grant SUPER permissions to the default database.
  3. Grant data location permissions.
  4. If you create databases manually, grant permissions on all databases to create tables. Refer to Granting table permissions using the Lake Formation console and the named resource method or Granting Data Catalog permissions using the LF-TBAC method according to your use case.

After you complete the later step of performing the initial data load, make sure to also add permissions for consumers to query the tables. The job role will become the owner of all the tables created, and the data lake admin can then perform grants to additional users.

Review table configuration in Parameter Store

The AWS Glue job that performs the data ingestion into Iceberg tables uses the table specification provided in Parameter Store. Complete the following steps to review the parameter store that was configured automatically for you. If needed, modify according to your own needs.

  1. On the Parameter Store console, choose My parameters in the navigation pane.

The CloudFormation stack created two parameters:

  • iceberg-config for job configurations
  • iceberg-tables for table configuration
  1. Choose the parameter iceberg-tables.

The JSON structure contains information that AWS Glue uses to read data and write the Iceberg tables on the target domain:

  • One object per table – The name of the object is created using the schema name, a period, and the table name; for example, schema.table.
  • primaryKey – This should be specified for every source table. You can provide a single column or a comma-separated list of columns (without spaces).
  • partitionCols – This optionally partitions columns for target tables. If you don’t want to create partitioned tables, provide an empty string. Otherwise, provide a single column or a comma-separated list of columns to be used (without spaces).
  1. If you want to use your own data source, use the following JSON code and replace the text in CAPS from the template provided. If you’re using the sample data source provided, keep the default settings:
{
    "SCHEMA_NAME.TABLE_NAME_1": {
        "primaryKey": "ONLY_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": ""
    },
    "SCHEMA_NAME.TABLE_NAME_2": {
        "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO"
    }
}
  1. Choose Save changes.

Perform initial data loading

Now that the required configuration is finished, we ingest the initial data. This step includes three parts: ingesting the data from the source relational database into the raw layer of the data lake, creating the Iceberg tables on the stage layer of the data lake, and verifying results using Athena.

Ingest data into the raw layer of the data lake

To ingest data from the relational data source (PostgreSQL if you are using the sample provided) to our transactional data lake using Iceberg, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the replication task you created and on the Actions menu, choose Restart/Resume.
  3. Wait about 5 minutes for the replication task to complete. You can monitor the tables ingested on the Statistics tab of the replication task.

AWS DMS full load statistics

After some minutes, the task finishes with the message Full load complete.

  1. On the Amazon S3 console, choose the bucket you defined as the raw layer.

Under the S3 prefix defined on AWS DMS (for example, postgres), you should see a hierarchy of folders with the following structure:

  • Schema
    • Table name
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

AWS DMS full load objects created on S3

If your S3 bucket is empty, review Troubleshooting migration tasks in AWS Database Migration Service before running the AWS Glue job.

Create and ingest data into Iceberg tables

Before running the job, let’s navigate the script of the AWS Glue job provided as part of the CloudFormation stack to understand its behavior.

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Search for the job that starts with IcebergJob- and a suffix of your CloudFormation stack name (for example, IcebergJob-transactionaldl-postgresql).
  3. Choose the job.

AWS Glue ETL job review

The job script gets the configuration it needs from Parameter Store. The function getConfigFromSSM() returns job-related configurations such as source and target buckets from where the data needs to be read and written. The variable ssmparam_table_values contain table-related information like the data domain, table name, partition columns, and primary key of the tables that needs to be ingested. See the following Python code:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables"

# Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

The script uses an arbitrary catalog name for Iceberg that is defined as my_catalog. This is implemented on the AWS Glue Data Catalog using Spark configurations, so a SQL operation pointing to my_catalog will be applied on the Data Catalog. See the following code:

catalog_name = 'my_catalog'
errored_table_list = []

# Iceberg configuration
spark = SparkSession.builder \
    .config('spark.sql.warehouse.dir', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .getOrCreate()

The script iterates over the tables defined in Parameter Store and performs the logic for detecting if the table exists and if the incoming data is an initial load or an upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values:
    # Get table data
    isTableExists = False
    schemaName, tableName = key.split('.')
    logger.info(f'Processing table : {tableName}')

The initialLoadRecordsSparkSQL() function loads initial data when no operation column is present in the S3 files. AWS DMS adds this column only to Parquet data files produced by the continuous replication (CDC). The data loading is performed using the INSERT INTO command with SparkSQL. See the following code:

sqltemp = Template("""
    INSERT INTO $catalog_name.$dbName.$tableName  ($insertTableColumnList)
    SELECT $insertTableColumnList FROM insertTable $partitionStrSQL
""")
SQLQUERY = sqltemp.substitute(
    catalog_name = catalog_name, 
    dbName = dbName, 
    tableName = tableName,
    insertTableColumnList = insertTableColumnList[ : -1],
    partitionStrSQL = partitionStrSQL)

logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Now we run the AWS Glue job to ingest the initial data into the Iceberg tables. The CloudFormation stack adds the --datalake-formats parameter, adding the required Iceberg libraries to the job.

  1. Choose Run job.
  2. Choose Job Runs to monitor the status. Wait until the status is Run Succeeded.

Verify the data loaded

To confirm that the job processed the data as expected, complete the following steps:

  1. On the Athena console, choose Query Editor in the navigation pane.
  2. Verify AwsDataCatalog is selected as the data source.
  3. Under Database, choose the data domain that you want to explore, based on the configuration you defined in the parameter store. If using the sample database provided, use sports.

Under Tables and views, we can see the list of tables that were created by the AWS Glue job.

  1. Choose the options menu (three dots) next to the first table name, then choose Preview Data.

You can see the data loaded into Iceberg tables. Amazon Athena review initial data loaded

Perform incremental data loading

Now we start capturing changes from our relational database and applying them to the transactional data lake. This step is also divided in three parts: capturing the changes, applying them to the Iceberg tables, and verifying the results.

Capture changes from the relational database

Due to the configuration we specified, the replication task stopped after running the full load phase. Now we restart the task to add incremental files with changes into the raw layer of the data lake.

  1. On the AWS DMS console, select the task we created and ran before.
  2. On the Actions menu, choose Resume.
  3. Choose Start task to start capturing changes.
  4. To trigger new file creation on the data lake, perform inserts, updates, or deletes on the tables of your source database using your preferred database administration tool. If using the sample database provided, you could run the following SQL commands:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31';

update  dms_sample.mlb_data 
set bats = 'R'
where mlb_id=506560 and bats='L';

update dms_sample.sporting_event 
set start_date  = current_date 
where id=11 and sold_out=0;
  1. On the AWS DMS task details page, choose the Table statistics tab to see the changes captured.
    AWS DMS CDC statistics
  2. Open the raw layer of the data lake to find a new file holding the incremental changes inside every table’s prefix, for example under the sporting_event prefix.

The record with changes for the sporting_event table looks like the following screenshot.

AWS DMS objects migrated into S3 with CDC

Notice the Op column in the beginning identified with an update (U). Also, the second date/time value is the control column added by AWS DMS with the time the change was captured.

CDC file schema on Amazon S3

Apply changes on the Iceberg tables using AWS Glue

Now we run the AWS Glue job again, and it will automatically process only the new incremental files since the job bookmark is enabled. Let’s review how it works.

The dedupCDCRecords() function performs deduplication of data because multiple changes to a single record ID could be captured within the same data file on Amazon S3. Deduplication is performed based on the last_update_time column added by AWS DMS that indicates the timestamp of when the change was captured. See the following Python code:

def dedupCDCRecords(inputDf, keylist):
    IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
    inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF))
    
    NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'")
    UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')")
    finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

    return finalInputDF

On line 99, the upsertRecordsSparkSQL() function performs the upsert in a similar fashion to the initial load, but this time with a SQL MERGE command.

Review the applied changes

Open the Athena console and run a query that selects the changed records on the source database. If using the provided sample database, use one the following SQL queries:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Amazon Athena review cdc data loaded

Monitor table ingestion

The AWS Glue job script is coded with simple Python exception handling to catch errors during processing a specific table. The job bookmark is saved after each table finishes processing successfully, to avoid reprocessing tables if the job run is retried for the tables with errors.

The AWS Command Line Interface (AWS CLI) provides a get-job-bookmark command for AWS Glue that provides insight into the status of the bookmark for each table processed.

  1. On the AWS Glue Studio console, choose the ETL job.
  2. Choose the Job Runs tab and copy the job run ID.
  3. Run the following command on a terminal authenticated for the AWS CLI, replacing <GLUE_JOB_RUN_ID> on line 1 with the value you copied. If your CloudFormation stack is not named transactionaldl-postgresql, provide the name of your job on line 2 of the script:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

In this solution, when a table processing causes an exception, the AWS Glue job will not fail according to this logic. Instead, the table will be added into an array that is printed after the job is complete. In such scenario, the job will be marked as failed after it tries to process the rest of the tables detected on the raw data source. This way, tables without errors don’t have to wait until the user identifies and solves the problem on the conflicting tables. The user can quickly detect job runs that had issues using the AWS Glue job run status, and identify which specific tables are causing the problem using the CloudWatch logs for the job run.

  1. The job script implements this feature with the following Python code:
# Performed for every table
        try:
            # Table processing logic
        except Exception as e:
            logger.info(f'There is an issue with table: {tableName}')
            logger.info(f'The exception is : {e}')
            errored_table_list.append(tableName)
            continue
        job.commit()
if (len(errored_table_list)):
    logger.info('Total number of errored tables are ',len(errored_table_list))
    logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ')
    raise Exception(f'***** Some tables failed to process.')

The following screenshot shows how the CloudWatch logs look for tables that cause errors on processing.

AWS Glue job monitoring with logs

Aligned with the AWS Well-Architected Framework Data Analytics Lens practices, you can adapt more sophisticated control mechanisms to identify and notify stakeholders when errors appear on the data pipelines. For example, you can use an Amazon DynamoDB control table to store all tables and job runs with errors, or using Amazon Simple Notification Service (Amazon SNS) to send alerts to operators when certain criteria is met.

Schedule incremental batch data loading

The CloudFormation stack deploys an Amazon EventBridge rule (disabled by default) that can trigger the AWS Glue job to run on a schedule. To provide your own schedule and enable the rule, complete the following steps:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Search for the rule prefixed with the name of your CloudFormation stack followed by JobTrigger (for example, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Choose the rule.
  4. Under Event Schedule, choose Edit.

The default schedule is configured to trigger every hour.

  1. Provide the schedule you want to run the job.
  2. Additionally, you can use an EventBridge cron expression by selecting A fine-grained schedule.
    Amazon EventBridge schedule ETL job
  3. When you finish setting up the cron expression, choose Next three times, and finally choose Update Rule to save changes.

The rule is created disabled by default to allow you to run the initial data load first.

  1. Activate the rule by choosing Enable.

You can use the Monitoring tab to view rule invocations, or directly on the AWS Glue Job Run details.

Conclusion

After deploying this solution, you have automated the ingestion of your tables on a single relational data source. Organizations using a data lake as their central data platform usually need to handle multiple, sometimes even tens of data sources. Also, more and more use cases require organizations to implement transactional capabilities to the data lake. You can use this solution to accelerate the adoption of such capabilities across all your relational data sources to enable new business use cases, automating the implementation process to derive more value from your data.


About the Authors

Luis Gerardo BaezaLuis Gerardo Baeza is a Big Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience helping organizations in the healthcare, financial and education sectors to adopt enterprise architecture programs, cloud computing, and data analytics capabilities. Luis currently helps organizations across Latin America to accelerate strategic data initiatives.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 10 years of experience implementing data loading, transformation, and visualization processes. SaiKiran currently helps organizations in North America to adopt modern data architectures such as data lakes and data mesh. He has experience in the retail, airline, and finance sectors.

Narendra MerlaNarendra Merla is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience in designing and productionalizing both real-time and batch-oriented data pipelines and building data lakes on both cloud and on-premises environments. Narendra currently helps organizations in North America to build and design robust data architectures, and has experience in the telecom and finance sectors.

Amazon EMR launches support for Amazon EC2 C7g (Graviton3) instances to improve cost performance for Spark workloads by 7–13%

Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/amazon-emr-launches-support-for-amazon-ec2-c7g-graviton3-instances-to-improve-cost-performance-for-spark-workloads-by-7-13/

Amazon EMR provides a managed service to easily run analytics applications using open-source frameworks such as Apache Spark, Hive, Presto, Trino, HBase, and Flink. The Amazon EMR runtime for Spark and Presto includes optimizations that provide over twice the performance improvements compared to open-source Apache Spark and Presto.

With Amazon EMR release 6.7, you can now use Amazon Elastic Compute Cloud (Amazon EC2) C7g instances, which use the AWS Graviton3 processors. These instances improve price-performance of running Spark workloads on Amazon EMR by 7.93–13.35% over previous generation instances, depending on the instance size. In this post, we describe how we estimated the price-performance benefit.

Amazon EMR runtime performance with EC2 C7g instances

We ran TPC-DS 3 TB benchmark queries on Amazon EMR 6.9 using the Amazon EMR runtime for Apache Spark (compatible with Apache Spark 3.3) with C7g instances. Data was stored in Amazon Simple Storage Service (Amazon S3), and results were compared to equivalent C6g clusters from the previous generation instance family. We measured performance improvements using the total query runtime and geometric mean of the query runtime across TPC-DS 3 TB benchmark queries.

Our results showed 13.65–18.73% improvement in total query runtime performance and 16.98–20.28% improvement in geometric mean on EMR clusters with C7g compared to equivalent EMR clusters with C6g instances, depending on the instance size. In comparing costs, we observed 7.93–13.35% reduction in cost on the EMR cluster with C7g compared to the equivalent with C6g, depending on the instance size. We did not benchmark the C6g xlarge instance because it didn’t have sufficient memory to run the queries.

The following table shows the results from running the TPC-DS 3 TB benchmark queries using Amazon EMR 6.9 compared to equivalent C7g and C6g instance EMR clusters.

Instance Size 16 XL 12 XL 8 XL 4 XL 2 XL
Total size of the cluster (1 leader + 5 core nodes) 6 6 6 6 6
Total query runtime on C6g (seconds) 2774.86205 2752.84429 3173.08086 5108.45489 8697.08117
Total query runtime on C7g (seconds) 2396.22799 2336.28224 2698.72928 4151.85869 7249.58148
Total query runtime improvement with C7g 13.65% 15.13% 14.95% 18.73% 16.64%
Geometric mean query runtime C6g (seconds) 22.2113 21.75459 23.38081 31.97192 45.41656
Geometric mean query runtime C7g (seconds) 18.43905 17.65898 19.01684 25.48695 37.43737
Geometric mean query runtime improvement with C7g 16.98% 18.83% 18.66% 20.28% 17.57%
EC2 C6g instance price ($ per hour) $2.1760 $1.6320 $1.0880 $0.5440 $0.2720
EMR C6g instance price ($ per hour) $0.5440 $0.4080 $0.2720 $0.1360 $0.0680
(EC2 + EMR) instance price ($ per hour) $2.7200 $2.0400 $1.3600 $0.6800 $0.3400
Cost of running on C6g ($ per instance) $2.09656 $1.55995 $1.19872 $0.96493 $0.82139
EC2 C7g instance price ($ per hour) $2.3200 $1.7400 $1.1600 $0.5800 $0.2900
EMR C7g price ($ per hour per instance) $0.5800 $0.4350 $0.2900 $0.1450 $0.0725
(EC2 + EMR) C7g instance price ($ per hour) $2.9000 $2.1750 $1.4500 $0.7250 $0.3625
Cost of running on C7g ($ per instance) $1.930290 $1.411500 $1.086990 $0.836140 $0.729990
Total cost reduction with C7g including performance improvement -7.93% -9.52% -9.32% -13.35% -11.13%

The following graph shows per-query improvements observed on C7g 2xlarge instances compared to equivalent C6g generations.

Benchmarking methodology

The benchmark used in this post is derived from the industry-standard TPC-DS benchmark, and uses queries from the Spark SQL Performance Tests GitHub repo with the following fixes applied.

We calculated TCO by multiplying cost per hour by number of instances in the cluster and time taken to run the queries on the cluster. We used on-demand pricing in the US East (N. Virginia) Region for all instances.

Conclusion

In this post, we described how we estimated the cost-performance benefit from using Amazon EMR with C7g instances compared to using equivalent previous generation instances. Using these new instances with Amazon EMR improves cost-performance by an additional 7–13%.


About the authors

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

Kyeonghyun Ryoo is a Software Development Engineer for EMR at Amazon Web Services. He primarily works on designing and building automation tools for internal teams and customers to maximize their productivity. Outside of work, he is a retired world champion in professional gaming who still enjoy playing video games.

Yuzhou Sun is a software development engineer for EMR at Amazon Web Services.

Steve Koonce is an Engineering Manager for EMR at Amazon Web Services.

How BookMyShow saved 80% in costs by migrating to an AWS modern data architecture

Post Syndicated from Mahesh Vandi Chalil original https://aws.amazon.com/blogs/big-data/how-bookmyshow-saved-80-in-costs-by-migrating-to-an-aws-modern-data-architecture/

This is a guest post co-authored by Mahesh Vandi Chalil, Chief Technology Officer of BookMyShow.

BookMyShow (BMS), a leading entertainment company in India, provides an online ticketing platform for movies, plays, concerts, and sporting events. Selling up to 200 million tickets on an annual run rate basis (pre-COVID) to customers in India, Sri Lanka, Singapore, Indonesia, and the Middle East, BookMyShow also offers an online media streaming service and end-to-end management for virtual and on-ground entertainment experiences across all genres.

The pandemic gave BMS the opportunity to migrate and modernize our 15-year-old analytics solution to a modern data architecture on AWS. This architecture is modern, secure, governed, and cost-optimized architecture, with the ability to scale to petabytes. BMS migrated and modernized from on-premises and other cloud platforms to AWS in just four months. This project was run in parallel with our application migration project and achieved 90% cost savings in storage and 80% cost savings in analytics spend.

The BMS analytics platform caters to business needs for sales and marketing, finance, and business partners (e.g., cinemas and event owners), and provides application functionality for audience, personalization, pricing, and data science teams. The prior analytics solution had multiple copies of data, for a total of over 40 TB, with approximately 80 TB of data in other cloud storage. Data was stored on‑premises and in the cloud in various data stores. Growing organically, the teams had the freedom to choose their technology stack for individual projects, which led to the proliferation of various tools, technology, and practices. Individual teams for personalization, audience, data engineering, data science, and analytics used a variety of products for ingestion, data processing, and visualization.

This post discusses BMS’s migration and modernization journey, and how BMS, AWS, and AWS Partner Minfy Technologies team worked together to successfully complete the migration in four months and saving costs. The migration tenets using the AWS modern data architecture made the project a huge success.

Challenges in the prior analytics platform

  • Varied Technology: Multiple teams used various products, languages, and versions of software.
  • Larger Migration Project: Because the analytics modernization was a parallel project with application migration, planning was crucial in order to consider the changes in core applications and project timelines.
  • Resources: Experienced resource churn from the application migration project, and had very little documentation of current systems.
  • Data : Had multiple copies of data and no single source of truth; each data store provided a view for the business unit.
  • Ingestion Pipelines: Complex data pipelines moved data across various data stores at varied frequencies. We had multiple approaches in place to ingest data to Cloudera, via over 100 Kafka consumers from transaction systems and MQTT(Message Queue Telemetry Transport messaging protocol) for clickstreams, stored procedures, and Spark jobs. We had approximately 100 jobs for data ingestion across Spark, Alteryx, Beam, NiFi, and more.
  • Hadoop Clusters: Large dedicated hardware on which the Hadoop clusters were configured incurring fixed costs. On-premises Cloudera setup catered to most of the data engineering, audience, and personalization batch processing workloads. Teams had their implementation of HBase and Hive for our audience and personalization applications.
  • Data warehouse: The data engineering team used TiDB as their on-premises data warehouse. However, each consumer team had their own perspective of data needed for analysis. As this siloed architecture evolved, it resulted in expensive storage and operational costs to maintain these separate environments.
  • Analytics Database: The analytics team used data sourced from other transactional systems and denormalized data. The team had their own extract, transform, and load (ETL) pipeline, using Alteryx with a visualization tool.

Migration tenets followed which led to project success:

  • Prioritize by business functionality.
  • Apply best practices when building a modern data architecture from Day 1.
  • Move only required data, canonicalize the data, and store it in the most optimal format in the target. Remove data redundancy as much possible. Mark scope for optimization for the future when changes are intrusive.
  • Build the data architecture while keeping data formats, volumes, governance, and security in mind.
  • Simplify ELT and processing jobs by categorizing the jobs as rehosted, rewritten, and retired. Finalize canonical data format, transformation, enrichment, compression, and storage format as Parquet.
  • Rehost machine learning (ML) jobs that were critical for business.
  • Work backward to achieve our goals, and clear roadblocks and alter decisions to move forward.
  • Use serverless options as a first option and pay per use. Assess the cost and effort for rearchitecting to select the right approach. Execute a proof of concept to validate this for each component and service.

Strategies applied to succeed in this migration:

  • Team – We created a unified team with people from data engineering, analytics, and data science as part of the analytics migration project. Site reliability engineering (SRE) and application teams were involved when critical decisions were needed regarding data or timeline for alignment. The analytics, data engineering, and data science teams spent considerable time planning, understanding the code, and iteratively looking at the existing data sources, data pipelines, and processing jobs. AWS team with partner team from Minfy Technologies helped BMS arrive at a migration plan after a proof of concept for each of the components in data ingestion, data processing, data warehouse, ML, and analytics dashboards.
  • Workshops – The AWS team conducted a series of workshops and immersion days, and coached the BMS team on the technology and best practices to deploy the analytics services. The AWS team helped BMS explore the configuration and benefits of the migration approach for each scenario (data migration, data pipeline, data processing, visualization, and machine learning) via proof-of-concepts (POCs). The team captured the changes required in the existing code for migration. BMS team also got acquainted with the following AWS services:
  • Proof of concept – The BMS team, with help from the partner and AWS team, implemented multiple proofs of concept to validate the migration approach:
    • Performed batch processing of Spark jobs in Amazon EMR, in which we checked the runtime, required code changes, and cost.
    • Ran clickstream analysis jobs in Amazon EMR, testing the end-to-end pipeline. Team conducted proofs of concept on AWS IoT Core for MQTT protocol and streaming to Amazon S3.
    • Migrated ML models to Amazon SageMaker and orchestrated with Amazon MWAA.
    • Created sample QuickSight reports and dashboards, in which features and time to build were assessed.
    • Configured for key scenarios for Amazon Redshift, in which time for loading data, query performance, and cost were assessed.
  • Effort vs. cost analysis – Team performed the following assessments:
    • Compared the ingestion pipelines, the difference in data structure in each store, the basis of the current business need for the data source, the activity for preprocessing the data before migration, data migration to Amazon S3, and change data capture (CDC) from the migrated applications in AWS.
    • Assessed the effort to migrate approximately 200 jobs, determined which jobs were redundant or need improvement from a functional perspective, and completed a migration list for the target state. The modernization of the MQTT workflow code to serverless was time-consuming, decided to rehost on Amazon Elastic Compute Cloud (Amazon EC2) and modernization to Amazon Kinesis in to the next phase.
    • Reviewed over 400 reports and dashboards, prioritized development in phases, and reassessed business user needs.

AWS cloud services chosen for proposed architecture:

  • Data lake – We used Amazon S3 as the data lake to store the single truth of information for all raw and processed data, thereby reducing the copies of data storage and storage costs.
  • Ingestion – Because we had multiple sources of truth in the current architecture, we arrived at a common structure before migration to Amazon S3, and existing pipelines were modified to do preprocessing. These one-time preprocessing jobs were run in Cloudera, because the source data was on-premises, and on Amazon EMR for data in the cloud. We designed new data pipelines for ingestion from transactional systems on the AWS cloud using AWS Glue ETL.
  • Processing – Processing jobs were segregated based on runtime into two categories: batch and near-real time. Batch processes were further divided into transient Amazon EMR clusters with varying runtimes and Hadoop application requirements like HBase. Near-real-time jobs were provisioned in an Amazon EMR permanent cluster for clickstream analytics, and a data pipeline from transactional systems. We adopted a serverless approach using AWS Glue ETL for new data pipelines from transactional systems on the AWS cloud.
  • Data warehouse – We chose Amazon Redshift as our data warehouse, and planned on how the data would be distributed based on query patterns.
  • Visualization – We built the reports in Amazon QuickSight in phases and prioritized them based on business demand. We discussed with business users their current needs and identified the immediate reports required. We defined the phases of report and dashboard creation and built the reports in Amazon QuickSight. We plan to use embedded reports for external users in the future.
  • Machine learning – Custom ML models were deployed on Amazon SageMaker. Existing Airflow DAGs were migrated to Amazon MWAA.
  • Governance, security, and compliance – Governance with Amazon Lake Formation was adopted from Day 1. We configured the AWS Glue Data Catalog to reference data used as sources and targets. We had to comply to Payment Card Industry (PCI) guidelines because payment information was in the data lake, so we ensured the necessary security policies.

Solution overview

BMS modern data architecture

The following diagram illustrates our modern data architecture.

The architecture includes the following components:

  1. Source systems – These include the following:
    • Data from transactional systems stored in MariaDB (booking and transactions).
    • User interaction clickstream data via Kafka consumers to DataOps MariaDB.
    • Members and seat allocation information from MongoDB.
    • SQL Server for specific offers and payment information.
  2. Data pipeline – Spark jobs on an Amazon EMR permanent cluster process the clickstream data from Kafka clusters.
  3. Data lake – Data from source systems was stored in their respective Amazon S3 buckets, with prefixes for optimized data querying. For Amazon S3, we followed a hierarchy to store raw, summarized, and team or service-related data in different parent folders as per the source and type of data. Lifecycle polices were added to logs and temp folders of different services as per teams’ requirements.
  4. Data processing – Transient Amazon EMR clusters are used for processing data into a curated format for the audience, personalization, and analytics teams. Small file merger jobs merge the clickstream data to a larger file size, which saved costs for one-time queries.
  5. Governance – AWS Lake Formation enables the usage of AWS Glue crawlers to capture the schema of data stored in the data lake and version changes in the schema. The Data Catalog and security policy in AWS Lake Formation enable access to data for roles and users in Amazon Redshift, Amazon Athena, Amazon QuickSight, and data science jobs. AWS Glue ETL jobs load the processed data to Amazon Redshift at scheduled intervals.
  6. Queries – The analytics team used Amazon Athena to perform one-time queries raised from business teams on the data lake. Because report development is in phases, Amazon Athena was used for exporting data.
  7. Data warehouse – Amazon Redshift was used as the data warehouse, where the reports for the sales teams, management, and third parties (i.e., theaters and events) are processed and stored for quick retrieval. Views to analyze the total sales, movie sale trends, member behavior, and payment modes are configured here. We use materialized views for denormalized tables, different schemas for metadata, and transactional and behavior data.
  8. Reports – We used Amazon QuickSight reports for various business, marketing, and product use cases.
  9. Machine learning – Some of the models deployed on Amazon SageMaker are as follows:
    • Content popularity – Decides the recommended content for users.
    • Live event popularity – Calculates the popularity of live entertainment events in different regions.
    • Trending searches – Identifies trending searches across regions.

Walkthrough

Migration execution steps

We standardized tools, services, and processes for data engineering, analytics, and data science:

  • Data lake
    • Identified the source data to be migrated from Archival DB, BigQuery, TiDB, and the analytics database.
    • Built a canonical data model that catered to multiple business teams and reduced the copies of data, and therefore storage and operational costs. Modified existing jobs to facilitate migration to a canonical format.
    • Identified the source systems, capacity required, anticipated growth, owners, and access requirements.
    • Ran the bulk data migration to Amazon S3 from various sources.
  • Ingestion
    • Transaction systems – Retained the existing Kafka queues and consumers.
    • Clickstream data – Successfully conducted a proof of concept to use AWS IoT Core for MQTT protocol. But because we needed to make changes in the application to publish to AWS IoT Core, we decided to implement it as part of mobile application modernization at a later time. We decided to rehost the MQTT server on Amazon EC2.
  • Processing
  • Listed the data pipelines relevant to business and migrated them with minimal modification.
  • Categorized workloads into critical jobs, redundant jobs, or jobs that can be optimized:
    • Spark jobs were migrated to Amazon EMR.
    • HBase jobs were migrated to Amazon EMR with HBase.
    • Metadata stored in Hive-based jobs were modified to use the AWS Glue Data Catalog.
    • NiFi jobs were simplified and rewritten in Spark run in Amazon EMR.
  • Amazon EMR clusters were configured one persistent cluster for streaming the clickstream and personalization workloads. We used multiple transient clusters for running all other Spark ETL or processing jobs. We used Spot Instances for task nodes to save costs. We optimized data storage with specific jobs to merge small files and compressed file format conversions.
  • AWS Glue crawlers identified new data in Amazon S3. AWS Glue ETL jobs transformed and uploaded processed data to the Amazon Redshift data warehouse.
  • Datawarehouse
    • Defined the data warehouse schema by categorizing the critical reports required by the business, keeping in mind the workload and reports required in future.
    • Defined the staging area for incremental data loaded into Amazon Redshift, materialized views, and tuning the queries based on usage. The transaction and primary metadata are stored in Amazon Redshift to cater to all data analysis and reporting requirements. We created materialized views and denormalized tables in Amazon Redshift to use as data sources for Amazon QuickSight dashboards and segmentation jobs, respectively.
    • Optimally used the Amazon Redshift cluster by loading last two years data in Amazon Redshift, and used Amazon Redshift Spectrum to query historical data through external tables. This helped balance the usage and cost of the Amazon Redshift cluster.
  • Visualization
    • Amazon QuickSight dashboards were created for the sales and marketing team in Phase 1:
      • Sales summary report – An executive summary dashboard to get an overview of sales across the country by region, city, movie, theatre, genre, and more.
      • Live entertainment – A dedicated report for live entertainment vertical events.
      • Coupons – A report for coupons purchased and redeemed.
      • BookASmile – A dashboard to analyze the data for BookASmile, a charity initiative.
  • Machine learning
    • Listed the ML workloads to be migrated based on current business needs.
    • Priority ML processing jobs were deployed on Amazon EMR. Models were modified to use Amazon S3 as source and target, and new APIs were exposed to use the functionality. ML models were deployed on Amazon SageMaker for movies, live event clickstream analysis, and personalization.
    • Existing artifacts in Airflow orchestration were migrated to Amazon MWAA.
  • Security
    • AWS Lake Formation was the foundation of the data lake, with the AWS Glue Data Catalog as the foundation for the central catalog for the data stored in Amazon S3. This provided access to the data by various functionalities, including the audience, personalization, analytics, and data science teams.
    • Personally identifiable information (PII) and payment data was stored in the data lake and data warehouse, so we had to comply to PCI guidelines. Encryption of data at rest and in transit was considered and configured in each service level (Amazon S3, AWS Glue Data Catalog, Amazon EMR, AWS Glue, Amazon Redshift, and QuickSight). Clear roles, responsibilities, and access permissions for different user groups and privileges were listed and configured in AWS Identity and Access Management (IAM) and individual services.
    • Existing single sign-on (SSO) integration with Microsoft Active Directory was used for Amazon QuickSight user access.
  • Automation
    • We used AWS CloudFormation for the creation and modification of all the core and analytics services.
    • AWS Step Functions was used to orchestrate Spark jobs on Amazon EMR.
    • Scheduled jobs were configured in AWS Glue for uploading data in Amazon Redshift based on business needs.
    • Monitoring of the analytics services was done using Amazon CloudWatch metrics, and right-sizing of instances and configuration was achieved. Spark job performance on Amazon EMR was analyzed using the native Spark logs and Spark user interface (UI).
    • Lifecycle policies were applied to the data lake to optimize the data storage costs over time.

Benefits of a modern data architecture

A modern data architecture offered us the following benefits:

  • Scalability – We moved from a fixed infrastructure to the minimal infrastructure required, with configuration to scale on demand. Services like Amazon EMR and Amazon Redshift enable us to do this with just a few clicks.
  • Agility – We use purpose-built managed services instead of reinventing the wheel. Automation and monitoring were key considerations, which enable us to make changes quickly.
  • Serverless – Adoption of serverless services like Amazon S3, AWS Glue, Amazon Athena, AWS Step Functions, and AWS Lambda support us when our business has sudden spikes with new movies or events launched.
  • Cost savings – Our storage size was reduced by 90%. Our overall spend on analytics and ML was reduced by 80%.

Conclusion

In this post, we showed you how a modern data architecture on AWS helped BMS to easily share data across organizational boundaries. This allowed BMS to make decisions with speed and agility at scale; ensure compliance via unified data access, security, and governance; and to scale systems at a low cost without compromising performance. Working with the AWS and Minfy Technologies teams helped BMS choose the correct technology services and complete the migration in four months. BMS achieved the scalability and cost-optimization goals with this updated architecture, which has set the stage for innovation using graph databases and enhanced our ML projects to improve customer experience.


About the Authors

Mahesh Vandi Chalil is Chief Technology Officer at BookMyShow, India’s leading entertainment destination. Mahesh has over two decades of global experience, passionate about building scalable products that delight customers while keeping innovation as the top goal motivating his team to constantly aspire for these. Mahesh invests his energies in creating and nurturing the next generation of technology leaders and entrepreneurs, both within the organization and outside of it. A proud husband and father of two daughters and plays cricket during his leisure time.

Priya Jathar is a Solutions Architect working in Digital Native Business segment at AWS. She has more two decades of IT experience, with expertise in Application Development, Database, and Analytics. She is a builder who enjoys innovating with new technologies to achieve business goals. Currently helping customers Migrate, Modernise, and Innovate in Cloud. In her free time she likes to paint, and hone her gardening and cooking skills.

Vatsal Shah is a Senior Solutions Architect at AWS based out of Mumbai, India. He has more than nine years of industry experience, including leadership roles in product engineering, SRE, and cloud architecture. He currently focuses on enabling large startups to streamline their cloud operations and help them scale on the cloud. He also specializes in AI and Machine Learning use cases.

Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

Hundreds of thousands of customers use data lakes for analytics and machine learning to make data-driven business decisions. Data consumers lose trust in data if it is not accurate and recent, making data quality essential for undertaking optimal and correct decisions.

Evaluation of the accuracy and freshness of data is a common task for engineers. Currently, there are various tools available to evaluate data quality. However, these tools often require manual processes of data discovery and expertise in data engineering and coding.

We are pleased to announce the public preview launch of AWS Glue Data Quality. You can access this feature today without requesting any additional access in the available Regions. AWS Glue Data Quality is a new preview feature of AWS Glue that measures and monitors the data quality of Amazon S3-based data lakes and in AWS Glue ETL jobs. It does not require any expertise in data engineering or coding. It simplifies your experience of monitoring and evaluating the quality of your data.

This is Part 1 of a four-part series of posts to explain how AWS Glue Data Quality works. Check out the next posts in the series:

Getting started with AWS Glue Data Quality

In this post, we will go over the simplicity of using the AWS Glue Data Quality feature by:

  1. Starting data quality recommendations and runs on your data in AWS Glue Data Catalog.
  2. Creating an Amazon CloudWatch alarm for getting notifications when data quality results are below a certain threshold.
  3. Analyzing your AWS Glue Data Quality run results through Amazon Athena.

Set up resources with AWS CloudFormation

The provided CloudFormation script creates the following resources for you:

  1. The IAM role required to run AWS Glue Data Quality runs
  2. An Amazon Simple Storage Service (Amazon S3) bucket to store the NYC Taxi dataset
  3. An S3 bucket to store and analyze the results of AWS Glue Data Quality runs
  4. An AWS Glue database and table created from the NYC Taxi dataset

Steps:

  1. Open the AWS CloudFormation console.
  2. Choose Create stack and then select With new resources (standard).
  3. For Template source, choose Upload a template File, and provide the above attached template file. Then choose Next.
  4. For Stack name, DataQualityDatabase, and DataQualityTable, leave as default. For DataQualityS3BucketName, enter the name of your S3 bucket. Then choose Next.
  5. On the final screen, make sure to acknowledge that this stack would create IAM resources for you, and choose Submit.
  6. Once the stack is successfully created, navigate to the S3 bucket created by the stack and upload the yellow_tripdata_2022-01.parquet file.

Start an AWS Glue Data Quality run on your data in AWS Glue Data Catalog

In this first section, we will generate data quality rule recommendations from the AWS Glue Data Quality service. Using these recommendations, we will then run a data quality task against our dataset to obtain an analysis of our data.

To get started, complete the following steps:

  1. Open AWS Glue console.
  2. Choose Tables under Data Catalog.
  3. Select the DataQualityTable table created via the CloudFormation stack.
  4. Select the Data quality tab.
  5. Choose Recommend ruleset.
  6. On the Recommend data quality rules page, check Save recommended rules as a ruleset. This will allow us to save the recommended rules in a ruleset automatically, for use in the next steps.
  7. For IAM Role, choose the IAM role that was created from the CloudFormation stack.
  8. For Additional configurations -optional, leave the default number of workers and timeout.
  9. Choose Recommend ruleset. This will start a data quality recommendation run, with the given number of workers.
  10. Wait for the ruleset to be completed.
  11. Once completed, navigate back to the Rulesets tab. You should see a successful recommendation run and a ruleset created.

Understand AWS Glue Data Quality recommendations

AWS Glue Data Quality recommendations are suggestions generated by the AWS Glue Data Quality service and are based on the shape of your data. These recommendations automatically take into account aspects like RowCounts, Mean, Standard Deviation etc. of your data, and generate a set of rules, for you to use as a starting point.

The dataset used here was the NYC Taxi dataset. Based on this, the columns in this dataset, and the values of those columns, AWS Glue Data Quality recommends a set of rules. In total, the recommendation service automatically took into consideration all the columns of the dataset, and recommended 55 rules.

Some of these rules are:

  • “RowCount between <> and <> ” → Expect a count of number of rows based on the data it saw
  • “ColumnValues “VendorID” in [ ] → Expect the ”VendorID“ column to be within a specific set of values
  • IsComplete “VendorID” → Expect the “VendorID” to be a non-null value

How do I use the recommended AWS Glue Data Quality rules?

  1. From the Rulesets section, you should see your generated ruleset. Select the generated ruleset, and choose Evaluate ruleset.
    • If you didn’t check the box to Save recommended rules as a ruleset when you ran the recommendation, you can still click on the recommendation task run and copy the rules to create a new ruleset
  2. For Data quality actions under Data quality properties, select Publish metrics to Amazon CloudWatch. If this box isn’t checked, the data quality run will not publish metrics to Amazon CloudWatch.
  3. For IAM role, select the GlueDataQualityBlogRole created in the AWS CloudFormation stack.
  4. For Requested number of workers under Advanced properties, leave as default. 
  5. For Data quality results location, select the value of the GlueDataQualityResultsS3Bucket location that was created via the AWS CloudFormation stack
  6. Choose Evaluate ruleset.
  7. Once the run begins, you can see the status of the run on the Data quality results tab.
  8. After the run reaches a successful stage, select the completed data quality task run, and view the data quality results shown in Run results.

Our recommendation service suggested that we enforce 55 rules, based on the column values and the data within our NYC Taxi dataset. We then converted the collection of 55 rules into a RuleSet. Then, we ran a Data Quality Evaluation task run using our RuleSet against our dataset. In our results above, we see the status of each within the RuleSet.

You can also utilize the AWS Glue Data Quality APIs to carry out these steps.

Get Amazon SNS notifications for my failing data quality runs through Amazon CloudWatch alarms

Each AWS Glue Data Quality evaluation run from the Data Catalog, emits a pair of metrics named glue.data.quality.rules.passed (indicating a number of rules that passed) and glue.data.quality.rules.failed (indicating the number of failed rules) per data quality run. This emitted metric can be used to create alarms to alert users if a given data quality run falls below a threshold.

To get started with setting up an alarm that would send an email via an Amazon SNS notification, follow the steps below:

  1. Open Amazon CloudWatch console.
  2. Choose All metrics under Metrics. You will see an additional namespace under Custom namespaces titled Glue Data Quality.

Note: When starting an AWS Glue Data Quality run, make sure the Publish metrics to Amazon CloudWatch checkbox is enabled, as shown below. Otherwise, metrics for that particular run will not be published to Amazon CloudWatch.

  1. Under the Glue Data Quality namespace, you should be able to see metrics being emitted per table, per ruleset. For the purpose of our blog, we shall be using the glue.data.quality.rules.failed rule and alarm, if this value goes over 1 (indicating that, if we see a number of failed rule evaluations greater than 1, we would like to be notified).
  2. In order to create the alarm, choose All alarms under Alarms.
  3. Choose Create alarm.
  4. Choose Select metric.
  5. Select the glue.data.quality.rules.failed metric corresponding to the table you’ve created, then choose Select metric.
  6. Under the Specify metric and conditions tab, under the Metrics section:
    1. For Statistic, select Sum.
    2. For Period, select 1 minute.
  7. Under the Conditions section:
    1. For Threshold type, choose Static.
    2. For Whenever glue.data.quality.rules.failed is…, select Greater/Equal.
    3. For than…, enter 1 as the threshold value.
    4. Expand the Additional configurations dropdown and select Treat missing data as good

These selections imply that if the glue.data.quality.rules.failed metric emits a value greater than or equal to 1, we will trigger an alarm. However, if there is no data, we will treat it as acceptable.

  1. Choose Next.
  2. On Configure actions:
    1. For the Alarm state trigger section, select In alarm .
    2. For Send a notification to the following SNS topic, choose Create a new topic to send a notification via a new SNS topic.
    3. For Email endpoints that will receive the notification…, enter your email address. Choose Next.
  3. For Alarm name, enter myFirstDQAlarm, then choose Next.
  4. Finally, you should see a summary of all the selections on the Preview and create screen. Choose Create alarm at the bottom.
  5. You should now be able to see the alarm being created from the Amazon CloudWatch alarms dashboard.

In order to demonstrate AWS Glue Data Quality alarms, we are going to go over a real-world scenario where we have corrupted data being ingested, and how we could use the AWS Glue Data Quality service to get notified of this, using the alarm we created in the previous steps. For this purpose, we will use the provided file malformed_yellow_taxi.parquet that contains data that has been tweaked on purpose.

  1. Navigate to the S3 location DataQualityS3BucketName mentioned in the CloudFormation template supplied at the beginning of the blog post.
  2. Upload the malformed_yellow_tripdata.parquet file to this location. This will help us simulate a flow where we have a file with poor data quality coming into our data lakes via our ETL processes.
  3. Navigate to the AWS Glue Data Catalog console, select the demo_nyc_taxi_data_input that was created via the provided AWS CloudFormation template and then navigate to the Data quality tab.
  4. Select the RuleSet we had created in the first section. Then select Evaluate ruleset.
  5. From the Evaluate data quality screen:
    1. Check the box to Publish metrics to Amazon CloudWatch. This checkbox is needed to ensure that the failure metrics are emitted to Amazon CloudWatch.
    2. Select the IAM role created via the AWS CloudFormation template.
    3. Optionally, select an S3 location to publish your AWS Glue Data Quality results.
    4. Select Evaluate ruleset.
  6.  Navigate to the Data Quality results tab. You should now see two runs, one from the previous steps of this blog and one that we currently triggered. Wait for the current run to complete.
  7. As you see, we have a failed AWS Glue Data Quality run result, with only 52 of our original 55 rules passing. These failures are attributed to the new file we uploaded to S3.
  8. Navigate to the Amazon CloudWatch console and select the alarm we created at the beginning of this section.
  9. As you can see, we configured the alarm to fire every time the glue.data.quality.rules.failed metric crosses a threshold of 1. After the above AWS Glue Data Quality run, we see 3 rules failing, which triggered the alarm. Further, you also should have gotten an email detailing the alarm’s firing.

We have thus demonstrated an example where incoming malformed data, coming into our data lakes can be identified via the AWS Glue Data Quality rules, and subsequent alerting mechanisms can be created to notify appropriate personas.

Analyze your AWS Glue Data Quality run results through Amazon Athena

In scenarios where you have multiple AWS Glue Data Quality run results against a dataset, over a period of time, you might want to track the trends of the dataset’s quality over a period of time. To achieve this, we can export our AWS Glue Data Quality run results to S3, and use Amazon Athena to run analytical queries against the exported run. The results can then be further used in Amazon QuickSight to build dashboards to have a graphical representation of your data quality trends

In the third part of this post, we will see the steps needed to start tracking data on your dataset’s quality:

  1. For our data quality runs that we set up in the previous sections, we set the Data quality results location parameter to the bucket location specified by the AWS CloudFormation stack.
  2. After each successful run, you should see a single JSONL file being exported to your selected S3 location, corresponding to that particular run.
  3. Open the Amazon Athena console.
  4. In the query editor, run the following CREATE TABLE statement (replace the <my_table_name> with a relevant value, and <GlueDataQualityResultsS3Bucket_from_cfn> section with the GlueDataQualityResultsS3Bucket value from the provided AWS CloudFormation template):
    CREATE EXTERNAL TABLE `<my_table_name>`(
    `catalogid` string,
    `databasename` string,
    `tablename` string,
    `dqrunid` string,
    `evaluationstartedon` timestamp,
    `evaluationcompletedon` timestamp,
    `rule` string,
    `outcome` string,
    `failurereason` string,
    `evaluatedmetrics` string)
    PARTITIONED BY (
    `year` string,
    `month` string,
    `day` string)
    ROW FORMAT SERDE
    'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName')
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    's3://<GlueDataQualityResultsS3Bucket_from_cfn>/'
    TBLPROPERTIES (
    'classification'='json',
    'compressionType'='none',
    'typeOfData'='file')
    
    MSCK REPAIR TABLE `<my_table_name>`

  5. Once the above table is created, you should be able to run queries to analyze your data quality results.

For example, consider the following query that shows me the failed AWS Glue Data Quality runs against my table demo_nyc_taxi_data_input within a time window:

SELECT * from "<my_table_name>"
WHERE "outcome" = 'Failed'
AND "tablename" = '<my_source_table>'
AND "evaluationcompletedon" between
parse_datetime('2022-12-05 17:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS') AND parse_datetime('2022-12-05 20:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS');

The output of the above query shows me details about all the runs with “outcome” = ‘Failed’ that ran against my NYC Taxi dataset table ( “tablename” = ‘demo_nyc_taxi_data_input’ ). The output also gives me information about the failure reason ( failurereason ) and the values it was evaluated against ( evaluatedmetrics ).

As you can see, we are able to get detailed information about our AWS Glue Data Quality runs, via the run results uploaded to S3, perform more detailed analysis and build dashboards on top of the data.

Clean up

  • Navigate to the Amazon Athena console and delete the table created for data quality analysis.
  • Navigate to the Amazon CloudWatch console and delete the alarms created.
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. You will need to empty the S3 bucket before you delete the bucket.
  • If you have enabled your AWS Glue Data Quality runs to output to S3, empty those buckets as well.

Conclusion

In this post, we talked about the ease and speed of incorporating data quality rules using the AWS Glue Data Quality feature, into your AWS Glue Data Catalog tables. We also talked about how to run recommendations and evaluate data quality against your tables. We then discussed analyzing the data quality results via Amazon Athena, and the process for setting up alarms via Amazon CloudWatch in order to notify users of failed data quality.

To dive into the AWS Glue Data Quality APIs, take a look at the AWS Glue Data Quality API documentation
To learn more about AWS Glue Data Quality, check out the AWS Glue Data Quality Developer Guide


About the authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team.

Joseph Barlan is a Frontend Engineer at AWS Glue. He has over 5 years of experience helping teams build reusable UI components and is passionate about frontend design systems. In his spare time, he enjoys pencil drawing and binge watching tv shows.

LaunchDarkly’s journey from ingesting 1 TB to 100 TB per day with Amazon Kinesis Data Streams

Post Syndicated from Mike Zorn original https://aws.amazon.com/blogs/big-data/launchdarklys-journey-from-ingesting-1-tb-to-100-tb-per-day-with-amazon-kinesis-data-streams/

This post was co-written with Mike Zorn, Software Architect at LaunchDarkly as the lead author.

LaunchDarkly’s feature management platform enables customers to release features and measure their impact. As part of this platform, SDKs gather event data, and the event ingestion platform consumes and analyzes this data to measure impact. As the platform launched and customer adoption increased, we had to scale the event data pipeline to meet the demands of the business for new use cases that required zero data loss. We will explain the challenges that we ran into with the initial architecture and the advantages achieved by using Amazon Kinesis Data Streams and additional AWS services in the new architecture. We will also go into the different factors that we considered in the Amazon Kinesis Data Streams implementation for cost efficiency and performance.

Problem statement

LaunchDarkly’s mission is to fundamentally change how companies deliver software by helping them innovate faster, deploy fearlessly, and make each release a masterpiece. With LaunchDarkly, we allow customers to deploy when they want, release when they are ready, and get total control of the code to ship fast, reduce risk, and reclaim their nights and weekends.

In 2017, the event ingestion platform consisted of a fleet of web servers that would write events to several databases, as shown below, that stored event data to power several features of the LaunchDarkly product. These features allow LaunchDarkly’s customers to gain full visibility into how a feature is performing over time, optimize features through experimentation, and quickly verify their implementation. Unfortunately, all of the database writes to power these features were performed within a single process on these web servers, so if any one of these databases had an availability issue, events would queue up in memory until that process ran out of memory and crashed. Since that same database would be used to write data from each web server, all of them would eventually run out of memory and crash. This cycle would repeat itself until the database availability issue was rectified. During that time, there was a permanent loss of all of the event data that was sent by the SDKs.

Original Architecture

The current system was tolerant of some data loss as the applications that used this data was limited. But the new features and workflows had stringer requirements on data-loss prevention.

We decided to explore alternatives where all the consumers are built with isolated fault tolerance, so each consumer is independent of one another in case of any issues. We built an event-driven pipeline that would be highly durable, scalable, and also provide the ability for data replay. As a result, we scaled from ingesting about 1 TB of data a day to more than 100 TBs of data now.

Solution

The following diagram illustrates our updated design. To support the new use cases, we added Amazon Kinesis Data StreamsAWS Lambda, and Amazon Kinesis Data Firehose to the architecture.

New Architecture

The design has the following key components

  1. Mobile client using the LaunchDarkly SDK to evaluate feature flags
  2. Application Load Balancer distributes traffic to Amazon EC2 nodes
  3. Amazon EC2 nodes run a go application that writes traffic to Amazon Kinesis Data Streams
  4. Amazon Kinesis Data Streams durably persist data
  5. AWS Lambda writes various types of data to databases
  6. Amazon OpenSearch Service records data about users
  7. Amazon ElastiCache records data about flag statuses
  8. Amazon Kinesis Firehose batches flag evaluation data and writes it to Amazon S3
  9. Amazon S3 records data about flag evaluations

Data flows from a LaunchDarkly SDK into the events API, which is backed by an Application Load Balancer (ALB). That ALB routes traffic to a fleet of Amazon EC2 servers. Those servers persist the data to Amazon Kinesis Data Streams. Data is then read out of Amazon Kinesis Data Streams by Lambda functions that transform and write the data in different formats to several databases. The design has a few properties that were very important to this use case.

  • Durability
  • Isolation
  • Data replay

Durability would prevent data loss when issues in data processing arose. Isolation would prevent other consumers of data from failing when one consumer had a failure. Data replay would allow us to debug data anomalies and fix them retroactively.

Amazon Kinesis Data Streams satisfies these three properties. Data written to Amazon Kinesis Data Streams is persisted durably until the data ages out of the stream. Amazon Kinesis Data Streams allows for consumer isolation: each consumer maintains its own iterator position, so consumers can process data from a stream independently of one another. Finally, Amazon Kinesis Data Streams make data replay possible because consumers can set their shard iterator position to be in the past. For example, a consumer can be configured to start reading at 1 hour in the past if the last hour of data needs to be replayed.

A few additional technologies were considered that would allow us to achieve these design properties. Amazon Simple Notification Service (Amazon SNS) combined with Amazon Simple Queue Service (Amazon SQS) would allow for a system with durability and isolation. Data replay was not available out of the box and needed custom implementation to support this feature.

Apache Kafka was also considered, but in spite of the fact that it satisfies these design properties, it was not adopted because the team did not have prior experience with Apache Kafka. Amazon Kinesis Data Streams satisfies these design properties, and it is fully managed, which reduces the need to worry about a lack of operational expertise.

Amazon Kinesis Data Streams implementation deep dive

Before we started our Amazon Kinesis Data Streams implementation, during our initial proof-of-concept phase, we learned that although Amazon Kinesis Data Streams is fully managed, there are some aspects that need to be taken into consideration when implementing it at scale.

  • Costs
  • Client error handling

Amazon Kinesis Data Streams on-demand costs are proportional to the data volume put into the stream. However, if traffic is relatively even and predictable, provisioned throughput billing is more economical. Under provisioned throughput billing, customers also will be billed for put payloads, which is essentially an extra cost, especially if there are a number of small records. Since LaunchDarkly’s use case had predictable, even traffic, provisioned throughput was used. However, we had a small record size (about 100 bytes on average), so it was important to implement batching in order to control costs.

Kinesis Producer Library (KPL) supports a variety of languages, and if you use any of those, you can rely on that to efficiently batch records for you. However, since LaunchDarkly uses Go for backend applications, we had custom code because Go as a producer was not supported by KPL. Our solution was to batch data so that it was close to 25 kB (the size of a put payload). We did this by using protocol buffers and concatenating them together inside of a record.

Client errors occur when the application that writes to Amazon Kinesis Data Streams fails to write data successfully. These are important to minimize, and there are a few factors to consider to achieve that. First, design your application so that there are as few failure modes as possible in the final write path. In our application, we authenticate a request, check the value of some feature flags, and write the data to the Amazon Kinesis Data Stream. We optimized our code to not perform any database queries or network requests before the data is written to Amazon Kinesis Data Streams to avoid any query/call failures, which can cause data loss. Another step to implement is to increase the number of retries in the AWS SDK (we use 10). This way, if there’s a transient issue writing data to Amazon Kinesis Data Streams, the data will have a better likelihood of being persisted. Finally, having a coarse-grained rate limit is important if you’re using provisioned streams. Sometimes end producers will inadvertently configure an SDK to send incredible amounts of data to our system. In these scenarios, we have a rate limiter to prevent a single tenant from consuming too much of our provisioned capacity.

After we figured out how to address all these issues, we proceeded to migrate to the new architecture in two phases.

The first was to send our data into Amazon Kinesis Data Streams. The second was to move our consumer workload from our Amazon EC2 servers to AWS Lambda. For both of these phases, we used LaunchDarkly feature flag with a percentage rollout to gradually ramp up traffic to the new architecture. 

The first phase, sending data to Amazon Kinesis Data Streams, went very smoothly. The batching mechanism worked as expected, and our throughput was also as expected. One thing that we had not expected was an increase in data transfer costs out of our virtual private cloud (VPC). By default, your Amazon Kinesis Data Streams traffic will go through your VPC’s network address translation (NAT) gateway. The charges are based on the data volume that flows through the NAT gateways. To reduce these costs, the design was optimized to configure a AWS Private Link endpoint in each Availability Zone where the Amazon EC2 application is hosted. This design optimization minimizes data transfer costs.

The second phase, moving the workload to AWS Lambda, did not go quite as smoothly. It turns out that dramatically changing the concurrency of a workload from tens of servers to hundreds of Lambda execution contexts can have some unintended consequences. In Amazon EC2, we aggregated our flag evaluation data on each host and flushed a file to Amazon S3 once a minute. In AWS Lambda, this aggregation became about 20 times less effective because of the increased concurrency. To overcome the issue of too many files for our downstream data processing systems to handle, we used Amazon Kinesis Data Firehose. We used it to automatically batch data into files in Amazon S3. Once we integrated that service into the architecture, we were able to migrate our entire workload to AWS Lambda.

Based on LaunchDarkly’s experience, Amazon Kinesis Data Streams are a good option for event data processing use cases. Once events are durably persisted in Amazon Kinesis Data Streams, stream consumers are easy to create, and event retention is managed for you. If you’re considering using Amazon Kinesis Data Streams, there are a few things you should account for in your implementation.

  • Configure AWS Private Link endpoints to reduce data transfer costs.
  • Use the KPL or implement your own record batching so that payloads are close to 25 kB.
  • Use rate limits to ensure you do not exceed provisioned capacity (if you aren’t using on-demand streams).
  • Increase retry counts to ensure data is written.

Conclusion

This system has been in production for over 3 years now, and we are very happy with it. It’s scaled from ingesting about 1 TB per day in 2018 to more than 100 TB per day now. Through that growth, this system has proven to be reliable, performant, and cost-effective. The system has maintained 99.99 percent availability and 99.99999 percent durability of data. End-to-end processing times have been within 30 s. Costs have scaled with increased usage, but they are well within our budget for this workload.

We hope that this post can guide you to build your event processing and analytics pipeline on top of Amazon Kinesis Data Streams while leveraging the power of fully managed technologies to not only accelerate business goals but also have a flexible system that onboards new use cases and features with ease.


About the Authors

Mike Zorn is a Software Architect at LaunchDarkly. He’s helped LaunchDarkly’s infrastructure scale from a hundred million feature flag evaluations a day to the tens of trillions of evaluations that are served nowadays. He’s been in the software industry for over a decade, working at organizations ranging from the federal government to small startups.

Chinmayi Narasimhadevara is a Solutions Architect focused on Analytics and Machine Learning at Amazon Web Services. She has over 15 years of experience in information technology. She helps AWS customers build advanced, highly scalable, and performant solutions.

Amazon EMR launches support for Amazon EC2 M6A, R6A instances to improve cost performance for Spark workloads by 15–50% 

Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/amazon-emr-launches-support-for-amazon-ec2-m6a-r6a-instances-to-improve-cost-performance-for-spark-workloads-by-15-50/

Amazon EMR provides a managed service to easily run analytics applications using open-source frameworks such as Apache Spark, Hive, Presto, Trino, HBase, and Flink. The Amazon EMR runtime for Spark and Presto includes optimizations that provide over 2x performance improvements over open-source Apache Spark and Presto.

With Amazon EMR release 6.8, you can now use Amazon Elastic Compute Cloud (Amazon EC2) instances such as M6A and C6A, which use the third generation AMD EPYC processors. These instances improve the price performance of running Spark workloads on Amazon EMR by 15–50 percent over previous generation instances. In this blog post, we describe how we estimated this price performance benefit.

Amazon EMR runtime performance with EC2 M6A instances

We ran TPC-DS 3 TB benchmark queries on Amazon EMR 6.8 using Amazon EMR runtime for Apache Spark (compatible with Apache Spark 3.3) with M6a instances. Data was stored in Amazon Simple Storage Service (Amazon S3), and results were compared to equivalent clusters with M5a, which is the previous generation instance family. We measured performance improvements using the total query runtime and the geometric mean of query runtime across TPC-DS 3 TB benchmark queries.

Our results showed a 23.6–50.3 percent improvement in total query runtime performance and 22.8–52.4 percent in geometric mean on an EMR cluster with M6a compared to an equivalent EMR cluster with M5a instances. In comparing costs, we observed a 23.2–41.4 percent reduction in cost on the EMR cluster with M6a compared to the equivalent with M5a. M6A 48 XL and 32 XL instances were not benchmarked because the M5A generation does not offer equivalent sizes.

The following table shows the results from running TPC-DS 3 TB benchmark queries using Amazon EMR 6.8 over equivalent M6a and M5a instance EMR clusters.

Instance Size 24 XL 16 XL 12 XL 8 XL 4 XL 2 XL XL
Total size of the cluster (1 Leader + 5 core nodes) 6 6 6 6 6 6 6
Total query runtime on M5A (seconds) 6624.1713838714 5466.7251180433 5269.0578151495 5366.1486275129 7753.6218015794 12118.0922180235 21070.6905510002
Total query runtime on M6A (seconds) 3295.2894058371 3063.7807673078 3399.1509249577 3482.8401591909 4906.2216891762 9184.4366036450 16107.9707619002
Total query runtime improvement with M6A 50.25% 43.96% 35.49% 35.10% 36.72% 24.21% 23.55%
Geometric mean query runtime M5A (sec) 51.1422829354 40.9550798753 38.4890223194 35.3863834186 44.8454957416 61.0454658020 92.6414502105
Geometric mean query runtime M6A (sec) 24.3406154481 22.3484713891 22.9913163520 23.0351017440 28.2855683398 46.4363267349 71.5498816854
Geometric mean query runtime improvement with M6A 52.41% 45.43% 40.27% 34.90% 36.93% 23.93% 22.77%
EC2 M5A instance price ($ per hour) $4.12800 $2.75200 $2.06400 $1.37600 $0.68800 $0.34400 $0.17200
EMR M5A instance price ($ per hour) $0.27000 $0.27000 $0.27000 $0.27000 $0.17200 $0.08600 $0.04300
(EC2 + EMR) M5A instance price ($ per hour) $4.39800 $3.02200 $2.33400 $1.64600 $0.86000 $0.43000 $0.21500
Cost of running on M5A ($ per instance) $8.09253 $4.58901 $3.41611 $2.45352 $1.85225 $1.44744 $1.25839
EC2 M6A instance price ($ per hour) $4.14720 $2.76480 $2.07360 $1.38240 $0.69120 $0.34560 $0.17280
EMR M6A price ($ per hour per instance) $1.03680 $0.69120 $0.51840 $0.34560 $0.17280 $0.08640 $0.04320
(EC2 + EMR) M6A instance price ($ per hour) $5.18400 $3.45600 $2.59200 $1.72800 $0.86400 $0.43200 $0.21600
Cost of running on M6A ($ per instance) $4.74522 $2.94123 $2.44739 $1.67176 $1.17749 $1.10213 $0.96648
Total cost reduction with M6A including performance improvement -41.36% -35.91% -28.36% -31.86% -36.43% -23.86% -23.20%

The following graph shows per query improvements observed on M6a 2XL instances compared to equivalent M5a generation. We observed that two queries take longer to execute on M6a instance clusters compared to M5a instance clusters. Q91 regressed up to 6.64 percent and Q55 regressed up to 1.86 percent on 4 XL instance clusters.

Amazon EMR runtime performance with EC2 R6A instances

R6A instances showed a similar performance improvement while running Apache Spark workloads compared to equivalent R5A instances. R6A 32XL and 48XL instances were not benchmarked since R5A instances do not have 32XL and 48XL sizes available. Our results showed 16–58.22 percent improvement in total query runtime for seven different instance sizes within the instance family and 20.04–59.59 percent improvement in geometric mean. In comparing costs, we observed 15.85–-50.07 percent reduction in cost on R6A instance EMR clusters compared to R5A EMR instance clusters.

The following table shows the results from running TPC-DS 3 TB benchmark queries using Amazon EMR 6.8 over equivalent R6A and R5A instance EMR clusters.

Instance Size 24 XL 16 XL 12 XL 8 XL 4 XL 2 XL XL
Total size of the cluster (1 Leader + 5 core nodes) 6 6 6 6 6 6 6
Total query runtime on R5A (seconds) 6934.22936 5530.74672 5834.32344 5718.72582 7615.58392 11431.37368 20688.58642
Total query runtime on R6A (seconds) 2897.44817 2906.49952 3017.85315 3488.83875 4661.32856 7717.33575 17378.49043
Total query runtime improvement with R6A 58.22% 47.45% 48.27% 38.99% 38.79% 32.49% 16.00%
Geometric mean query runtime R5A (sec) 53.27574 41.76973 42.50324 37.62155 44.58173 58.88182 91.72095
Geometric mean query runtime R6A (sec) 21.52803 21.36831 19.94607 21.59493 26.90097 36.57557 73.3405
Geometric mean query runtime improvement with R6A 59.59% 48.84% 53.07% 42.60% 39.66% 37.88% 20.04%
EC2 R5A instance price ($ per hour) $5.42400 $3.61600 $2.71200 $1.80800 $0.90400 $0.45200 $0.22600
EMR R5A instance price ($ per hour) $0.27000 $0.27000 $0.27000 $0.27000 $0.22600 $0.11300 $0.05700
(EC2 + EMR) R5A instance price ($ per hour) $5.69400 $3.88600 $2.98200 $2.07800 $1.13000 $0.56500 $0.28300
Cost of running on R5A ($ per instance) $10.96764 $5.97013 $4.83276 $3.30098 $2.39045 $1.79409 $1.62635
EC2 R6A instance price ($ per hour) $5.44320 $3.62880 $2.72160 $1.81440 $0.90720 $0.45360 $0.22680
EMR R6A price ($ per hour per instance) $1.36080 $0.90720 $0.68040 $0.45360 $0.22680 $0.11340 $0.05670
(EC2 + EMR) R6A instance price ($ per hour) $6.80400 $4.53600 $3.40200 $2.26800 $1.13400 $0.56700 $0.28350
Cost of running on R6A ($ per instance) $5.47618 $3.66219 $2.85187 $2.19797 $1.46832 $1.21548 $1.36856
Total cost reduction with R6A including performance improvement -50.07% -38.66% -40.99% -33.41% -38.58% -32.25% -15.85%

Benchmarking methodology

The benchmark used in this post is derived from the industry-standard TPC-DS benchmark and uses queries from the Spark SQL Performance Tests GitHub repo with the following fixes applied.

We calculated TCO by multiplying cost per hour by number of instances in the cluster and time taken to run the queries on the cluster. We used the on-demand pricing in the US East (N. Virginia) Region for all instances.

Conclusion

In this post, we described how we estimated the cost-performance benefit from using Amazon EMR with M6A and R6A instances compared to using equivalent previous-generation instances. Using these new instances with Amazon EMR improves price performance by 15–50%.


About the authors

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

Kyeonghyun Ryoo is a Software Development Engineer for EMR at Amazon Web Services. He primarily works on designing and building automation tools for internal teams and customers to maximize their productivity. Outside of work, he is a retired world champion in professional gaming who still enjoy playing video games.

Gain visibility into your Amazon MSK cluster by deploying the Conduktor Platform

Post Syndicated from Stéphane Maarek original https://aws.amazon.com/blogs/big-data/gain-visibility-into-your-amazon-msk-cluster-by-deploying-the-conduktor-platform/

This is a guest post by AWS Data Hero and co-founder of Conduktor, Stephane Maarek.

Deploying Apache Kafka on AWS is now easier, thanks to Amazon Managed Streaming for Apache Kafka (Amazon MSK). In a few clicks, it provides you with a production-ready Kafka cluster on which you can run your applications and create data streams.

Apache Kafka is an open-source project, and no official user interfaces are available. The lack of visibility into Apache Kafka is a factor in the slow development of applications.

The recent announcement of the Conduktor Platform makes Amazon MSK operations simple, and you can solve Kafka issues end to end with solutions for testing, monitoring, data quality, governance, and security.

You can use the Conduktor Platform to monitor both types of MSK clusters, provisioned and serverless. In this post, we demonstrate how to use AWS Identity and Access Management (IAM) based security to administer our MSK cluster.

Solution overview

We look at how we can deploy the Conduktor Platform on Amazon MSK in a production-ready deployment so you can try it out today.

The solution is fully serverless and customizable. Everything is deployed using AWS CloudFormation templates.

The source code and CloudFormation templates used in this post are available in the GitHub repo.

To implement this solution, we complete the following high-level steps:

  1. Deploy a CloudFormation template to create our customized Docker image for the Conduktor Platform using AWS CodeBuild.
  2. Optionally, deploy an MSK cluster in provisioned or serverless mode using a CloudFormation template.
  3. Deploy the Conduktor Platform as an AWS Fargate container against our MSK cluster using a CloudFormation template.

Create a customized configuration for the Conduktor Platform

The Conduktor Platform uses a YAML configuration file to define the cluster connection endpoints. Therefore, we must create a customized Docker image of the Conduktor Platform that is able to connect to a cluster on Amazon MSK with a customized YAML file. For this, we use CodeBuild, and we store our configuration files in Amazon Simple Storage Service (Amazon S3). The final image is stored in Amazon Elastic Container Registry (Amazon ECR). The following diagram illustrates this workflow.

  1. Deploy the first CloudFormation template to create the following resources:
    • An S3 bucket to store our configuration files.
    • An ECR repository to store our final Docker image.
    • A CodeBuild project to build that Docker image.
    • An IAM role and policy to allow CodeBuild to perform the build.

Now we need to upload our files into Amazon S3.

  1. Upload the following files:
    • The file buildspec.yml, which is used by CodeBuild to build our primary Docker image.
    • The Dockerfile, which contains instructions on how to build our final Docker image.
    • The folder conduktor-platform-config (as is), which contains the configuration files to connect to Amazon MSK.

  1. At this stage, you can customize the conduktor-platform.yaml file, allowing you to connect to one MSK cluster:

Alternatively, you can connect to multiple MSK clusters or external ones by specifying multiple Kafka bootstrap servers, as shown in the following code. You can also use the same configuration file to specify the schema registry URL, Kafka Connect connection details, and SSO.

A single-Region Conduktor Platform deployment can work for multi-Region MSK clusters, although natural latency is expected. For latency-sensitive usage, you can deploy this solution in every Region in which you’re using Amazon MSK.

After uploading the files and configurations in your S3 bucket, let’s run CodeBuild to generate a new image.

  1. On the CodeBuild console, navigate to the project and choose Start build.

The build should complete in about 3 minutes.

The final image is pushed to Amazon ECR thanks to the script hosted in our build-spec.yml script run by CodeBuild. We’re now done with our first step. Your Conduktor Platform setup can now fully connect to your MSK cluster.

Start the MSK cluster

If you already have an MSK cluster set up with IAM access control, you can skip this step. If not, you can create one using the provided CloudFormation template.

From the MSK cluster (the new one or existing one), retrieve two essential pieces of information:

We use IAM access control so that we only need to use IAM policies to connect to our cluster.

If you’re using another security mechanism (such as SASL/SCRAM), you need to modify the Conduktor configuration files with the right properties, upload them back into Amazon S3, and rebuild the Conduktor image using CodeBuild.

Conduktor supports every single Kafka authentication method, including the ones supported by Amazon MSK: IAM access control, mutual TLS authentication, and user name/password using SASL/SCRAM.

Deploy the Conduktor Platform on Amazon ECS with Fargate

The last step is to deploy the Conduktor Platform. For this, we prefer running serverless solutions using Amazon Elastic Container Service (Amazon ECS) with Fargate. This allows you to right-size your containers in the future in case your usage of Conduktor grows over time.

Conduktor stores persistent data in the /var/conduktor file system folder, to store configuration, cache computation results, store logs, and run an internal database (for example, if you start creating data masking rules). For the persistence layer, we use Amazon Elastic File System (Amazon EFS), an elastic network file system that can be mounted on Fargate to provide a persistence layer.

Finally, we expose our Fargate container through an Application Load Balancer, giving us a public static DNS endpoint to expose the Conduktor Platform and giving us complete control over the network security to access the Conduktor Platform. The following diagram illustrates our architecture.

Deploy the Conduktor Platform on Amazon ECS with Fargate

We deploy our last CloudFormation file and specify some important parameters:

  • MSKBookstrapServersURL – This parameter is necessary to tell Conduktor which MSK cluster to connect to
  • MSKSecurityGroupID – The MSK security group is necessary to allow the template to add a security group ingress rule to it, thereby allowing our ECS task
  • PublicSubnetIDs – The public subnet IDs are for your Application Load Balancer
  • SubnetIDs – The subnet IDs are for your ECS task and can be the same subnets or private subnets (as long as they have access to the MSK cluster and the other public subnets)
  • VpcID – This is the VPC you’re deploying to

After deploying the template, on the Output tab of the stack, you can find the Application Load Balancer URL.

We use this URL and log in to the Conduktor Platform with the user name [email protected] and password password. These login credentials can be changed using the YAML configuration file, and you can even enable SSO and LDAP.

On the Conduktor console, you can start creating topics, producing data, consuming data, and much more! AWS Glue Schema Registry support is coming soon, and Confluent Schema Registry compatibility is already available.

Clean up

To clean up your AWS account, perform the following steps in order:

  1. Delete the third CloudFormation template (3 – create ECS Service.yaml).
  2. Delete the second CloudFormation template (2 – create MSK cluster.yaml).
  3. Empty the contents of your S3 bucket.
  4. Delete all your images in your ECR repository.
  5. Delete the first CloudFormation template (1 – base conduktor.yaml).

Conclusion

You can use the Conduktor Platform against as many MSK clusters as desired by editing the file conduktor-platform.yaml. You can even connect to your clusters running elsewhere, for example on Amazon Elastic Compute Cloud (Amazon EC2).

On our roadmap, we’re working on a complete integration with Amazon MSK, including AWS Glue Schema Registry support, Amazon MSK Connect support, and complete monitoring capabilities.

The Conduktor Platform offers a limited free tier with no time limit. Head to Conduktor’s Get Started page and create an account to start using the Platform alongside MSK clusters today.


About the Author

Stéphane Maarek is the co-founder of Conduktor. He is also the lead instructor on Udemy for learning Apache Kafka and AWS Certifications, having taught these technologies to over 1.5 million learners. Through Conduktor, he wants to democratize access to Apache Kafka and make its usage seamless and enterprise-ready.

Introducing the Cloud Shuffle Storage Plugin for Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, an open-source, distributed processing system for your data integration tasks and big data workloads.

Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel. In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there isn’t enough disk space left on the executor and there is no recovery. Such Spark jobs can’t typically succeed without adding additional compute and attached storage, wherein compute is often idle, and results in additional cost.

In 2021, we launched Amazon S3 shuffle for AWS Glue 2.0 with Spark 2.4. This feature disaggregated Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle files. Using Amazon S3 for Spark shuffle storage enabled you to run data-intensive workloads more reliably. After the launch, we continued investing in this area, and collected customer feedback.

Today, we’re pleased to release Cloud Shuffle Storage Plugin for Apache Spark. It supports the latest Apache Spark 3.x distribution so you can take advantage of the plugin in AWS Glue or any other Spark environments. It’s now also natively available to use in AWS Glue Spark jobs on AWS Glue 3.0 and the latest AWS Glue version 4.0 without requiring any extra setup or bringing external libraries. Like the Amazon S3 shuffle for AWS Glue 2.0, the Cloud Shuffle Storage Plugin helps you solve constrained disk space errors during shuffle in serverless Spark environments.

We’re also excited to announce the release of software binaries for the Cloud Shuffle Storage Plugin for Apache Spark under the Apache 2.0 license. You can download the binaries and run them on any Spark environment. The new plugin is open-cloud, comes with out-of-the box support for Amazon S3, and can be easily configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage.

Understanding a shuffle operation in Apache Spark

In Apache Spark, there are two types of transformations:

  • Narrow transformation – This includes map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – This includes join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions. Spark SQL queries including JOIN, ORDER BY, GROUP BY require wide transformations.

A wide transformation triggers a shuffle, which occurs whenever data is reorganized into new partitions with each key assigned to one of them. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. The volume of data shuffled is visible in the Spark UI. When shuffle writes take up more space than the local available disk capacity, it causes a No space left on device error.

To illustrate one of the typical scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join and group by.

Let’s run the following query on AWS Glue 3.0 job with 10 G1.X workers where a total of 640GB of local disk space is available:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the Executor tab in the Spark UI.
Spark UI Executor Tab

The following screenshot shows the status of Spark jobs included in the AWS Glue job run.
Spark UI Jobs
In the failed Spark job (job ID=7), we can see the failed Spark stage in the Spark UI.
Spark UI Failed stage
There was 167.8GiB shuffle write during the stage, and 14 tasks failed due to the error java.io.IOException: No space left on device because the host 172.34.97.212 ran out of local disk.
Spark UI Tasks

Cloud Shuffle Storage for Apache Spark

Cloud Shuffle Storage for Apache Spark allows you to store Spark shuffle files on Amazon S3 or other cloud storage services. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably. The following figure illustrates how Spark map tasks write the shuffle files to the Cloud Shuffle Storage. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle storage.

This architecture enables your serverless Spark jobs to use Amazon S3 without the overhead of running, operating, and maintaining additional storage or compute nodes.
Chopper diagram
The following Glue job parameters enable and tune Spark to use S3 buckets for storing shuffle data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key Value Explanation
--write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
--conf spark.shuffle.storage.path=s3://<shuffle-bucket> This is optional, and specifies the S3 bucket where the plugin writes the shuffle files. By default, we use –TempDir/shuffle-data.

The shuffle files are written to the location and create files such as following:

s3://<shuffle-storage-path>/<Spark application ID>/[0-9]/<Shuffle ID>/shuffle_<Shuffle ID>_<Mapper ID>_0.data

With the Cloud Shuffle Storage plugin enabled and using the same AWS Glue job setup, the TPC-DS query now succeeded without any job or stage failures.
Spark UI Jobs with Chopper plugin

Software binaries for the Cloud Shuffle Storage Plugin

You can now also download and use the plugin in your own Spark environments and with other cloud storage services. The plugin binaries are available for use under the Apache 2.0 license.

Bundle the plugin with your Spark applications

You can bundle the plugin with your Spark applications by adding it as a dependency in your Maven pom.xml as you develop your Spark applications, as shown in the follwoing code. For more details on the plugin and Spark versions, refer to Plugin versions.

<repositories>
   ...
    <repository>
        <id>aws-glue-etl-artifacts</id>
        <url>https://aws-glue-etl-artifacts.s3.amazonaws.com/release/</url>
    </repository>
</repositories>
...
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>chopper-plugin</artifactId>
    <version>3.1-amzn-LATEST</version>
</dependency>

You can alternatively download the binaries from AWS Glue Maven artifacts directly and include them in your Spark application as follows:

#!/bin/bash
sudo wget -v https://aws-glue-etl-artifacts.s3.amazonaws.com/release/com/amazonaws/chopper-plugin/3.1-amzn-LATEST/chopper-plugin-3.1-amzn-LATEST.jar -P /usr/lib/spark/jars

Submit the Spark application by including the JAR files on the classpath and specifying the two Spark configs for the plugin:

spark-submit --deploy-mode cluster \
--conf spark.shuffle.sort.io.plugin.class=com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin \
--conf spark.shuffle.storage.path=s3://<s3 bucket>/<shuffle-dir> \
 --class <your class> <your application jar> 

The following Spark parameters enable and configure Spark to use an external storage URI such as Amazon S3 for storing shuffle files; the URI protocol determines which storage system to use.

Key Value Explanation
spark.shuffle.storage.path s3://<shuffle-storage-path> It specifies an URI where the shuffle files are stored, which much be a valid Hadoop FileSystem and be configured as needed
spark.shuffle.sort.io.plugin.class com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin The entry class in the plugin

Other cloud storage integration

This plugin comes with out-of-the box support for Amazon S3 and can also be configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage. To enable other Hadoop FileSystem compatible cloud storage services, you can simply add a storage URI for the corresponding service scheme, such as gs:// for Google Cloud Storage instead of s3:// for Amazon S3, add the FileSystem JAR files for the service, and set the appropriate authentication configurations.

For more information about how to integrate the plugin with Google Cloud Storage and Microsoft Azure Blob Storage, refer to Using AWS Glue Cloud Shuffle Plugin for Apache Spark with Other Cloud Storage Services.

Best practices and considerations

Note the following considerations:

  • This feature replaces local shuffle storage with Amazon S3. You can use it to address common failures with price/performance benefits for your serverless analytics jobs and pipelines. We recommend enabling this feature when you want to ensure reliable runs of your data-intensive workloads that create a large amount of shuffle data or when you’re getting No space left on device error. You can also use this plugin if your job encounters fetch failures org.apache.spark.shuffle.MetadataFetchFailedException or if your data is skewed.
  • We recommend setting S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.storage.s3.path) in order to clean up old shuffle data automatically.
  • The shuffle data on Amazon S3 is encrypted by default. You can also encrypt the data with your own AWS Key Management Service (AWS KMS) keys.

Conclusion

This post introduced the new Cloud Shuffle Storage Plugin for Apache Spark and described its benefits to independently scale storage in your Spark jobs without adding additional workers. With this plugin, you can expect jobs processing terabytes of data to run much more reliably.

The plugin is available in AWS Glue 3.0 and 4.0 Spark jobs in all AWS Glue supported Regions. We’re also releasing the plugin’s software binaries under the Apache 2.0 license. You can use the plugin in AWS Glue or other Spark environments. We look forward to hearing your feedback.


About the Authors

Noritaka Sekiyama s a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.

Chuhan Liu is a Software Development Engineer on the AWS Glue team.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with data integration and connectivity to a variety of sources, efficiently manage data lakes on Amazon S3, and optimizes Apache Spark for fault-tolerance with ETL workloads.

Build your Apache Hudi data lake on AWS using Amazon EMR – Part 1

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/part-1-build-your-apache-hudi-data-lake-on-aws-using-amazon-emr/

Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and data pipeline development. It does this by bringing core warehouse and database functionality directly to a data lake on Amazon Simple Storage Service (Amazon S3) or Apache HDFS. Hudi provides table management, instantaneous views, efficient upserts/deletes, advanced indexes, streaming ingestion services, data and file layout optimizations (through clustering and compaction), and concurrency control, all while keeping your data in open-source file formats such as Apache Parquet and Apache Avro. Furthermore, Apache Hudi is integrated with open-source big data analytics frameworks, such as Apache Spark, Apache Hive, Apache Flink, Presto, and Trino.

In this post, we cover best practices when building Hudi data lakes on AWS using Amazon EMR. This post assumes that you have the understanding of Hudi data layout, file layout, and table and query types. The configuration and features can change with new Hudi versions; the concept of this post applies to Hudi versions of 0.11.0 (Amazon EMR release 6.7), 0.11.1 (Amazon EMR release 6.8) and 0.12.1 (Amazon EMR release 6.9).

Specify the table type: Copy on Write Vs. Merge on Read

When we write data into Hudi, we have the option to specify the table type: Copy on Write (CoW) or Merge on Read (MoR). This decision has to be made at the initial setup, and the table type can’t be changed after the table has been created. These two table types offer different trade-offs between ingest and query performance, and the data files are stored differently based on the chosen table type. If you don’t specify it, the default storage type CoW is used.

The following table summarizes the feature comparison of the two storage types.

CoW MoR
Data is stored in base files (columnar Parquet format). Data is stored as a combination of base files (columnar Parquet format) and log files with incremental changes (row-based Avro format).
COMMIT: Each new write creates a new version of the base files, which contain merged records from older base files and newer incoming records. Each write adds a commit action to the timeline, and each write atomically adds a commit action to the timeline, guaranteeing a write (and all its changes) entirely succeed or get entirely rolled back. DELTA_COMMIT: Each new write creates incremental log files for updates, which are associated with the base Parquet files. For inserts, it creates a new version of the base file similar to CoW. Each write adds a delta commit action to the timeline.
Write
In case of updates, write latency is higher than MoR due to the merge cost because it needs to rewrite the entire affected Parquet files with the merged updates. Additionally, writing in the columnar Parquet format (for CoW updates) is more latent in comparison to the row-based Avro format (for MoR updates). No merge cost for updates during write time, and the write operation is faster because it just appends the data changes to the new log file corresponding to the base file each time.
Compaction isn’t needed because all data is directly written to Parquet files. Compaction is required to merge the base and log files to create a new version of the base file.
Higher write amplification because new versions of base files are created for every write. Write cost will be O(number of files in storage modified by the write). Lower write amplification because updates go to log files. Write cost will be O(1) for update-only datasets and can get higher when there are new inserts.
Read
CoW table supports snapshot query and incremental queries.

MoR offers two ways to query the same underlying storage: ReadOptimized tables and Near-Realtime tables (snapshot queries).

ReadOptimized tables support read-optimized queries, and Near-Realtime tables support snapshot queries and incremental queries.

Read-optimized queries aren’t applicable for CoW because data is already merged to base files while writing. Read-optimized queries show the latest compacted data, which doesn’t include the freshest updates in the not yet compacted log files.
Snapshot queries have no merge cost during read. Snapshot queries merge data while reading if not compacted and therefore can be slower than CoW while querying the latest data.

CoW is the default storage type and is preferred for simple read-heavy use cases. Use cases with the following characteristics are recommended for CoW:

  • Tables with a lower ingestion rate and use cases without real-time ingestion
  • Use cases requiring the freshest data with minimal read latency because merging cost is taken care of at the write phase
  • Append-only workloads where existing data is immutable

MoR is recommended for tables with write-heavy and update-heavy use cases. Use cases with the following characteristics are recommended for MoR:

  • Faster ingestion requirements and real-time ingestion use cases.
  • Varying or bursty write patterns (for example, ingesting bulk random deletes in an upstream database) due to the zero-merge cost for updates during write time
  • Streaming use cases
  • Mix of downstream consumers, where some are looking for fresher data by paying some additional read cost, and others need faster reads with some trade-off in data freshness

For streaming use cases demanding strict ingestion performance with MoR tables, we suggest running the table services (for example, compaction and cleaning) asynchronously, which is discussed in the upcoming Part 3 of this series.

For more details on table types and use cases, refer to How do I choose a storage type for my workload?

Select the record key, key generator, preCombine field, and record payload

This section discusses the basic configurations for the record key, key generator, preCombine field, and record payload.

Record key

Every record in Hudi is uniquely identified by a Hoodie key (similar to primary keys in databases), which is usually a pair of record key and partition path. With Hoodie keys, you can enable efficient updates and deletes on records, as well as avoid duplicate records. Hudi partitions have multiple file groups, and each file group is identified by a file ID. Hudi maps Hoodie keys to file IDs, using an indexing mechanism.

A record key that you select from your data can be unique within a partition or across partitions. If the selected record key is unique within a partition, it can be uniquely identified in the Hudi dataset using the combination of the record key and partition path. You can also combine multiple fields from your dataset into a compound record key. Record keys cannot be null.

Key generator

Key generators are different implementations to generate record keys and partition paths based on the values specified for these fields in the Hudi configuration. The right key generator has to be configured depending on the type of key (simple or composite key) and the column data type used in the record key and partition path columns (for example, TimestampBasedKeyGenerator is used for timestamp data type partition path). Hudi provides several key generators out of the box, which you can specify in your job using the following configuration.

Configuration Parameter Description Value
hoodie.datasource.write.keygenerator.class Key generator class, which generates the record key and partition path Default value is SimpleKeyGenerator

The following table describes the different types of key generators in Hudi.

Key Generators Use-case
SimpleKeyGenerator Use this key generator if your record key refers to a single column by name and similarly your partition path also refers to a single column by name.
ComplexKeyGenerator Use this key generator when record key and partition paths comprise multiple columns. Columns are expected to be comma-separated in the config value (for example, "hoodie.datasource.write.recordkey.field" : “col1,col4”).
GlobalDeleteKeyGenerator

Use this key generator when you can’t determine the partition of incoming records to be deleted and need to delete only based on record key. This key generator ignores the partition path while generating keys to uniquely identify Hudi records.

When using this key generator, set the config hoodie.[bloom|simple|hbase].index.update.partition.path to false in order to avoid redundant data written to the storage.

NonPartitionedKeyGenerator Use this key generator for non-partitioned datasets because it returns an empty partition for all records.
TimestampBasedKeyGenerator Use this key generator for a timestamp data type partition path. With this key generator, the partition path column values are interpreted as timestamps. The record key is the same as before, which is a single column converted to string. If using TimestampBasedKeyGenerator, a few more configs need to be set.
CustomKeyGenerator Use this key generator to take advantage of the benefits of SimpleKeyGenerator, ComplexKeyGenerator, and TimestampBasedKeyGenerator all at the same time. With this you can configure record key and partition paths as a single field or a combination of fields. This is helpful if you want to generate nested partitions with each partition key of different types (for example, field_3:simple,field_5:timestamp). For more information, refer to CustomKeyGenerator.

The key generator class can be automatically inferred by Hudi if the specified record key and partition path require a SimpleKeyGenerator or ComplexKeyGenerator, depending on whether there are single or multiple record key or partition path columns. For all other cases, you need to specify the key generator.

The following flow chart explains how to select the right key generator for your use case.

PreCombine field

This is a mandatory field that Hudi uses to deduplicate the records within the same batch before writing them. When two records have the same record key, they go through the preCombine process, and the record with the largest value for the preCombine key is picked by default. This behavior can be customized through custom implementation of the Hudi payload class, which we describe in the next section.

The following table summarizes the configurations related to preCombine.

Configuration Parameter Description Value
hoodie.datasource.write.precombine.field The field used in preCombining before the actual write. It helps select the latest record whenever there are multiple updates to the same record in a single incoming data batch.

The default value is ts. You can configure it to any column in your dataset that you want Hudi to use to deduplicate the records whenever there are multiple records with the same record key in the same batch. Currently, you can only pick one field as the preCombine field.

Select a column with the timestamp data type or any column that can determine which record holds the latest version, like a monotonically increasing number.

hoodie.combine.before.upsert During upsert, this configuration controls whether deduplication should be done for the incoming batch before ingesting into Hudi. This is applicable only for upsert operations. The default value is true. We recommend keeping it at the default to avoid duplicates.
hoodie.combine.before.delete Same as the preceding config, but applicable only for delete operations. The default value is true. We recommend keeping it at the default to avoid duplicates.
hoodie.combine.before.insert When inserted records share the same key, the configuration controls whether they should be first combined (deduplicated) before writing to storage. The default value is false. We recommend setting it to true if the incoming inserts or bulk inserts can have duplicates.

Record payload

Record payload defines how to merge new incoming records against old stored records for upserts.

The default OverwriteWithLatestAvroPayload payload class always overwrites the stored record with the latest incoming record. This works fine for batch jobs and most use cases. But let’s say you have a streaming job and want to prevent the late-arriving data from overwriting the latest record in storage. You need to use a different payload class implementation (DefaultHoodieRecordPayload) to determine the latest record in storage based on an ordering field, which you provide.

For example, in the following example, Commit 1 has HoodieKey 1, Val 1, preCombine10, and in-flight Commit 2 has HoodieKey 1, Val 2, preCombine 5.

If using the default OverwriteWithLatestAvroPayload, the Val 2 version of the record will be the final version of the record in storage (Amazon S3) because it’s the latest version of the record.

If using DefaultHoodieRecordPayload, it will honor Val 1 because the Val 2’s record version has a lower preCombine value (preCombine 5) compared to Val 1’s record version, while merging multiple versions of the record.

You can select a payload class while writing to the Hudi table using the configuration hoodie.datasource.write.payload.class.

Some useful in-built payload class implementations are described in the following table.

Payload Class Description
OverwriteWithLatestAvroPayload (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload) Chooses the latest incoming record to overwrite any previous version of the records. Default payload class.
DefaultHoodieRecordPayload (org.apache.hudi.common.model.DefaultHoodieRecordPayload) Uses hoodie.payload.ordering.field to determine the final record version while writing to storage.
EmptyHoodieRecordPayload (org.apache.hudi.common.model.EmptyHoodieRecordPayload) Use this as payload class to delete all the records in the dataset.
AWSDmsAvroPayload (org.apache.hudi.common.model.AWSDmsAvroPayload) Use this as payload class if AWS DMS is used as source. It provides support for seamlessly applying changes captured via AWS DMS. This payload implementation performs insert, delete, and update operations on the Hudi table based on the operation type for the CDC record obtained from AWS DMS.

Partitioning

Partitioning is the physical organization of files within a table. They act as virtual columns and can impact the max parallelism we can use on writing.

Extremely fine-grained partitioning (for example, over 20,000 partitions) can create excessive overhead for the Spark engine managing all the small tasks, and can degrade query performance by reducing file sizes. Also, an overly coarse-grained partition strategy, without clustering and data skipping, can negatively impact both read and upsert performance with the need to scan more files in each partition.

Right partitioning helps improve read performance by reducing the amount of data scanned per query. It also improves upsert performance by limiting the number of files scanned to find the file group in which a specific record exists during ingest. A column frequently used in query filters would be a good candidate for partitioning.

For large-scale use cases with evolving query patterns, we suggest coarse-grained partitioning (such as date), while using fine-grained data layout optimization techniques (clustering) within each partition. This opens the possibility of data layout evolution.

By default, Hudi creates the partition folders with just the partition values. We recommend using Hive style partitioning, in which the name of the partition columns is prefixed to the partition values in the path (for example, year=2022/month=07 as opposed to 2022/07). This enables better integration with Hive metastores, such as using msck repair to fix partition paths.

To support Apache Hive style partitions in Hudi, we have to enable it in the config hoodie.datasource.write.hive_style_partitioning.

The following table summarizes the key configurations related to Hudi partitioning.

Configuration Parameter Description Value
hoodie.datasource.write.partitionpath.field Partition path field. This is a required configuration that you need to pass while writing the Hudi dataset. There is no default value set for this. Set it to the column that you have determined for partitioning the data. We recommend that it doesn’t cause extremely fine-grained partitions.
hoodie.datasource.write.hive_style_partitioning Determines whether to use Hive style partitioning. If set to true, the names of partition folders follow <partition_column_name>=<partition_value> format. Default value is false. Set it to true to use Hive style partitioning.
hoodie.datasource.write.partitionpath.urlencode Indicates if we should URL encode the partition path value before creating the folder structure. Default value is false. Set it to true if you want to URL encode the partition path value. For example, if you’re using the data format “yyyy-MM-dd HH:mm:ss“, the URL encode needs to be set to true because it will result in an invalid path due to :.

Note that if the data isn’t partitioned, you need to specifically use NonPartitionedKeyGenerator for the record key, which is explained in the previous section. Additionally, Hudi doesn’t allow partition columns to be changed or evolved.

Choose the right index

After we select the storage type in Hudi and determine the record key and partition path, we need to choose the right index for upsert performance. Apache Hudi employs an index to locate the file group that an update/delete belongs to. This enables efficient upsert and delete operations and enforces uniqueness based on the record keys.

Global index vs. non-global index

When picking the right indexing strategy, the first decision is whether to use a global (table level) or non-global (partition level) index. The main difference between global vs. non-global indexes is the scope of key uniqueness constraints. Global indexes enforce uniqueness of the keys across all partitions of a table. The non-global index implementations enforce this constraint only within a specific partition. Global indexes offer stronger uniqueness guarantees, but they come with a higher update/delete cost, for example global deletes with just the record key need to scan the entire dataset. HBase indexes are an exception here, but come with an operational overhead.

For large-scale global index use cases, use an HBase index or record-level index (available in Hudi 0.13) because for all other global indexes, the update/delete cost grows with the size of the table, O(size of the table).

When using a global index, be aware of the configuration hoodie[bloom|simple|hbase].index.update.partition.path, which is already set to true by default. For existing records getting upserted to a new partition, enabling this configuration will help delete the old record in the old partition and insert it in the new partition.

Hudi index options

After picking the scope of the index, the next step is to decide which indexing option best fits your workload. The following table explains the indexing options available in Hudi as of 0.11.0.

Indexing Option How It Works Characteristic Scope
Simple Index Performs a join of the incoming upsert/delete records against keys extracted from the involved partition in case of non-global datasets and the entire dataset in case of global or non-partitioned datasets. Easiest to configure. Suitable for basic use cases like small tables with evenly spread updates. Even for larger tables where updates are very random to all partitions, a simple index is the right choice because it directly joins with interested fields from every data file without any initial pruning, as compared to Bloom, which in the case of random upserts adds additional overhead and doesn’t give enough pruning benefits because the Bloom filters could indicate true positive for most of the files and end up comparing ranges and filters against all these files. Global/Non-global
Bloom Index (default index in EMR Hudi) Employs Bloom filters built out of the record keys, optionally also pruning candidate files using record key ranges. Bloom filter is stored in the data file footer while writing the data.

More efficient filter compared to simple index for use cases like late-arriving updates to fact tables and deduplication in event tables with ordered record keys such as timestamp. Hudi implements a dynamic Bloom filter mechanism to reduce false positives provided by Bloom filters.

In general, the probability of false positives increases with the number of records in a given file. Check the Hudi FAQ for Bloom filter configuration best practices.

Global/Non-global
Bucket Index It distributes records to buckets using a hash function based on the record keys or subset of it. It uses the same hash function to determine which file group to match with incoming records. New indexing option since hudi 0.11.0. Simple to configure. It has better upsert throughput performance compared to the Bloom filter. As of Hudi 0.11.1, only fixed bucket number is supported. This will no longer be an issue with the upcoming consistent hashing bucket index feature, which can dynamically change bucket numbers. Non-global
HBase Index The index mapping is managed in an external HBase table. Best lookup time, especially for large numbers of partitions and files. It comes with additional operational overhead because you need to manage an external HBase table. Global

Use cases suitable for simple index

Simple indexes are most suitable for workloads with evenly spread updates over partitions and files on small tables, and also for larger tables with dimension kind of workloads because updates are random to all partitions. A common example is a CDC pipeline for a dimension table. In this case, updates end up touching a large number of files and partitions. Therefore, a join with no other pruning is most efficient.

Use cases suitable for Bloom index

Bloom indexes are suitable for most production workloads with uneven update distribution across partitions. For workloads with most updates to recent data like fact tables, Bloom filter rightly fits the bill. It can be clickstream data collected from an ecommerce site, bank transactions in a FinTech application, or CDC logs for a fact table.

When using a Bloom index, be aware of the following configurations:

  • hoodie.bloom.index.use.metadata – By default, it is set to false. When this flag is on, the Hudi writer gets the index metadata information from the metadata table and doesn’t need to open Parquet file footers to get the Bloom filters and stats. You prune out the files by just using the metadata table and therefore have improved performance for larger tables.
  • hoodie.bloom.index.prune.by.rangesEnable or disable range pruning based on use case. By default, it’s already set to true. When this flag is on, range information from files is used to speed up index lookups. This is helpful if the selected record key is monotonously increasing. You can set any record key to be monotonically increasing by adding a timestamp prefix. If the record key is completely random and has no natural ordering (such as UUIDs), it’s better to turn this off, because range pruning will only add extra overhead to the index lookup.

Use cases suitable for bucket index

Bucket indexes are suitable for upsert use cases on huge datasets with a large number of file groups within partitions, relatively even data distribution across partitions, and can achieve relatively even data distribution on the bucket hash field column. It can have better upsert performance in these cases due to no index lookup involved as file groups are located based on a hashing mechanism, which is very fast. This is totally different from both simple and Bloom indexes, where an explicit index lookup step is involved during write. The buckets here has one-one mapping with the hudi file group and since the total number of buckets (defined by hoodie.bucket.index.num.buckets(default – 4)) is fixed here, it can potentially lead to skewed data (data distributed unevenly across buckets) and scalability (buckets can grow over time) issues over time. These issues will be addressed in the upcoming consistent hashing bucket index, which is going to be a special type of bucket index.

Use cases suitable for HBase index

HBase indexes are suitable for use cases where ingestion performance can’t be met using the other index types. These are mostly use cases with global indexes and large numbers of files and partitions. HBase indexes provide the best lookup time but come with large operational overheads if you’re already using HBase for other workloads.

For more information on choosing the right index and indexing strategies for common use cases, refer to Employing the right indexes for fast updates, deletes in Apache Hudi. As you have already seen, Hudi index performance depends heavily on the actual workload. We encourage you to evaluate different indexes for your workload and choose the one which is best suited for your use case.

Migration guidance

With Apache Hudi growing in popularity, one of the fundamental challenges is to efficiently migrate existing datasets to Apache Hudi. Apache Hudi maintains record-level metadata to perform core operations such as upserts and incremental pulls. To take advantage of Hudi’s upsert and incremental processing support, you need to add Hudi record-level metadata to your original dataset.

Using bulk_insert

The recommended way for data migration to Hudi is to perform a full rewrite using bulk_insert. There is no look-up for existing records in bulk_insert and writer optimizations like small file handling. Performing a one-time full rewrite is a good opportunity to write your data in Hudi format with all the metadata and indexes generated and also potentially control file size and sort data by record keys.

You can set the sort mode in a bulk_insert operation using the configuration hoodie.bulkinsert.sort.mode. bulk_insert offers the following sort modes to configure.

Sort Modes Description
NONE No sorting is done to the records. You can get the fastest performance (comparable to writing parquet files with spark) for initial load with this mode.
GLOBAL_SORT Use this to sort records globally across Spark partitions. It is less performant in initial load than other modes as it repartitions data by partition path and sorts it by record key within each partition. This helps in controlling the number of files generated in the target thereby controlling the target file size. Also, the generated target files will not have overlapping min-max values for record keys which will further help speed up index look-ups during upserts/deletes by pruning out files based on record key ranges in bloom index.
PARTITION_SORT Use this to sort records within Spark partitions. It is more performant for initial load than Global_Sort and if your Spark partitions in the data frame are already fairly mapped to the Hudi partitions (dataframe is already repartitioned by partition column), using this mode would be preferred as you can obtain records sorted by record key within each partition.

We recommend to use Global_Sort mode if you can handle the one-time cost. The default sort mode is changed from Global_Sort to None from EMR 6.9 (Hudi 0.12.1). During bulk_insert with Global_Sort, two configurations control the sizes of target files generated by Hudi.

Configuration Parameter Description Value
hoodie.bulkinsert.shuffle.parallelism The number of files generated from the bulk insert is determined by this configuration. The higher the parallelism, the more Spark tasks processing the data. Default value is 200. To control file size and achieve maximum performance (more parallelism), we recommend setting this to a value such that the files generated are equal to the hoodie.parquet.max.file.size. If you make parallelism really high, the max file size can’t be honored because the Spark tasks are working on smaller amounts of data.
hoodie.parquet.max.file.size Target size for Parquet files produced by Hudi write phases. Default value is 120 MB. If the Spark partitions generated with hoodie.bulkinsert.shuffle.parallelism are larger than this size, it splits it and generates multiple files to not exceed the max file size.

Let’s say we have a 100 GB Parquet source dataset and we’re bulk inserting with Global_Sort into a partitioned Hudi table with 10 evenly distributed Hudi partitions. We want to have the preferred target file size of 120 MB (default value for hoodie.parquet.max.file.size). The Hudi bulk insert shuffle parallelism should be calculated as follows:

  • The total data size in MB is 100 * 1024 = 102400 MB
  • hoodie.bulkinsert.shuffle.parallelism should be set to 102400/120 = ~854

Please note that in reality even with Global_Sort, each spark partition can be mapped to more than one hudi partition and this calculation should only be used as a rough estimate and can potentially end up with more files than the parallelism specified.

Using bootstrapping

For customers operating at scale on hundreds of terabytes or petabytes of data, migrating your datasets to start using Apache Hudi can be time-consuming. Apache Hudi provides a feature called bootstrap to help with this challenge.

The bootstrap operation contains two modes: METADATA_ONLY and FULL_RECORD.

FULL_RECORD is the same as full rewrite, where the original data is copied and rewritten with the metadata as Hudi files.

The METADATA_ONLY mode is the key to accelerating the migration progress. The conceptual idea is to decouple the record-level metadata from the actual data by writing only the metadata columns in the Hudi files generated while the data isn’t copied over and stays in its original location. This significantly reduces the amount of data written, thereby improving the time to migrate and get started with Hudi. However, this comes at the expense of read performance, which involves the overhead merging Hudi files and original data files to get the complete record. Therefore, you may not want to use it for frequently queried partitions.

You can pick and choose these modes at partition level. One common strategy is to tier your data. Use FULL_RECORD mode for a small set of hot partitions, which are accessed frequently, and METADATA_ONLY for a larger set of cold partitions.

Consider the following:

Catalog sync

Hudi supports syncing Hudi table partitions and columns to a catalog. On AWS, you can either use the AWS Glue Data Catalog or Hive metastore as the metadata store for your Hudi tables. To register and synchronize the metadata with your regular write pipeline, you need to either enable hive sync or run the hive_sync_tool or AwsGlueCatalogSyncTool command line utility.

We recommend enabling the hive sync feature with your regular write pipeline to make sure the catalog is up to date. If you don’t expect a new partition to be added or the schema changed as part of each batch, then we recommend enabling hoodie.datasource.meta_sync.condition.sync as well so that it allows Hudi to determine if hive sync is necessary for the job.

If you have frequent ingestion jobs and need to maximize ingestion performance, you can disable hive sync and run the hive_sync_tool asynchronously.

If you have the timestamp data type in your Hudi data, we recommend setting hoodie.datasource.hive_sync.support_timestamp to true to convert the int64 (timestamp_micros) to the hive type timestamp. Otherwise, you will see the values in bigint while querying data.

The following table summarizes the configurations related to hive_sync.

Configuration Parameter Description Value
hoodie.datasource.hive_sync.enable To register or sync the table to a Hive metastore or the AWS Glue Data Catalog. Default value is false. We recommend setting the value to true to make sure the catalog is up to date, and it needs to be enabled in every single write to avoid an out-of-sync metastore.
hoodie.datasource.hive_sync.mode This configuration sets the mode for HiveSynctool to connect to the Hive metastore server. For more information, refer to Sync modes. Valid values are hms, jdbc, and hiveql. If the mode isn’t specified, it defaults to jdbc. Hms and jdbc both talk to the underlying thrift server, but jdbc needs a separate jdbc driver. We recommend setting it to ‘hms’, which uses the Hive metastore client to sync Hudi tables using thrift APIs directly. This helps when using the AWS Glue Data Catalog because you don’t need to install Hive as an application on the EMR cluster (because it doesn’t need the server).
hoodie.datasource.hive_sync.database Name of the destination database that we should sync the Hudi table to. Default value is default. Set this to the database name of your catalog.
hoodie.datasource.hive_sync.table Name of the destination table that we should sync the Hudi table to. In Amazon EMR, the value is inferred from the Hudi table name. You can set this config if you need a different table name.
hoodie.datasource.hive_sync.support_timestamp To convert logical type TIMESTAMP_MICROS as hive type timestamp. Default value is false. Set it to true to convert to hive type timestamp.
hoodie.datasource.meta_sync.condition.sync If true, only sync on conditions like schema change or partition change. Default value is false.

Writing and reading Hudi datasets, and its integration with other AWS services

There are different ways you can write the data to Hudi using Amazon EMR, as explained in the following table.

Hudi Write Options Description
Spark DataSource

You can use this option to do upsert, insert, or bulk insert for the write operation.

Refer to Work with a Hudi dataset for an example of how to write data using DataSourceWrite.

Spark SQL You can easily write data to Hudi with SQL statements. It eliminates the need to write Scala or PySpark code and adopt a low-code paradigm.
Flink SQL, Flink DataStream API If you’re using Flink for real-time streaming ingestion, you can use the high-level Flink SQL or Flink DataStream API to write the data to Hudi.
DeltaStreamer DeltaStreamer is a self-managed tool that supports standard data sources like Apache Kafka, Amazon S3 events, DFS, AWS DMS, JDBC, and SQL sources, built-in checkpoint management, schema validations, as well as lightweight transformations. It can also operate in a continuous mode, in which a single self-contained Spark job can pull data from source, write it out to Hudi tables, and asynchronously perform cleaning, clustering, compactions, and catalog syncing, relying on Spark’s job pools for resource management. It’s easy to use and we recommend using it for all the streaming and ingestion use cases where a low-code approach is preferred. For more information, refer to Streaming Ingestion.
Spark structured streaming For use cases that require complex data transformations of the source data frame written in Spark DataFrame APIs or advanced SQL, we recommend the structured streaming sink. The streaming source can be used to obtain change feeds out of Hudi tables for streaming or incremental processing use cases.
Kafka Connect Sink If you standardize on the Apache Kafka Connect framework for your ingestion needs, you can also use the Hudi Connect Sink.

Refer to the following support matrix for query support on specific query engines. The following table explains the different options to read the Hudi dataset using Amazon EMR.

Hudi Read options Description
Spark DataSource You can read Hudi datasets directly from Amazon S3 using this option. The tables don’t need to be registered with Hive metastore or the AWS Glue Data Catalog for this option. You can use this option if your use case doesn’t require a metadata catalog. Refer to Work with a Hudi dataset for example of how to read data using DataSourceReadOptions.
Spark SQL You can query Hudi tables with DML/DDL statements. The tables need to be registered with Hive metastore or the AWS Glue Data Catalog for this option.
Flink SQL After the Flink Hudi tables have been registered to the Flink catalog, they can be queried using the Flink SQL.
PrestoDB/Trino The tables need to be registered with Hive metastore or the AWS Glue Data Catalog for this option. This engine is preferred for interactive queries. There is a new Trino connector in upcoming Hudi 0.13, and we recommend reading datasets through this connector when using Trino for performance benefits.
Hive The tables need to be registered with Hive metastore or the AWS Glue Data Catalog for this option.

Apache Hudi is well integrated with AWS services, and these integrations work when AWS Glue Data Catalog is used, with the exception of Athena, where you can also use a data source connector to an external Hive metastore. The following table summarizes the service integrations.

AWS Service Description
Amazon Athena

You can use Athena for a serverless option to query a Hudi dataset on Amazon S3. Currently, it supports snapshot queries and read-optimized queries, but not incremental queries.

For more details, refer to Using Athena to query Apache Hudi datasets.

Amazon Redshift Spectrum

You can use Amazon Redshift Spectrum to run analytic queries against tables in your Amazon S3 data lake with Hudi format.

Currently, it supports only CoW tables. For more details, refer to Creating external tables for data managed in Apache Hudi.

AWS Lake Formation AWS Lake Formation is used to secure data lakes and define fine-grained access control on the database and table level. Hudi is not currently supported with Amazon EMR Lake Formation integration.
AWS DMS You can use AWS DMS to ingest data from upstream relational databases to your S3 data lakes into an Hudi dataset. For more details, refer to Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service.

Conclusion

This post covered best practices for configuring Apache Hudi data lakes using Amazon EMR. We discussed the key configurations in migrating your existing dataset to Hudi and shared guidance on how to determine the right options for different use cases when setting up Hudi tables.

The upcoming Part 2 of this series focuses on optimizations that can be done on this setup, along with monitoring using Amazon CloudWatch.


About the Authors

Suthan Phillips is a Big Data Architect for Amazon EMR at AWS. He works with customers to provide best practice and technical guidance and helps them achieve highly scalable, reliable and secure solutions for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on Data Analytics, AI/ML and DevOps.

How Etleap and Amazon Redshift Serverless optimize costs for ETL

Post Syndicated from Caius Brindescu original https://aws.amazon.com/blogs/big-data/how-etleap-and-amazon-redshift-serverless-optimize-costs-for-etl/

Amazon Redshift Serverless lets you avoid managing infrastructure while only paying for what you use. Etleap provides data integration software that is natively built on AWS. It’s an AWS Advanced Technology Partner with the AWS Data & Analytics Competency and Amazon Redshift Service Ready designation.

In this post, we share how you can minimize the usage of resources for some workload patterns and maximize savings while seamlessly managing data pipelines. We illustrate an example of how Redshift Serverless and Etleap’s load synchronization feature can reduce active Redshift Serverless time, further optimizing extract, transform, and load (ETL) costs.

Introduction to Redshift Serverless

Redshift Serverless makes it easy to run and scale analytics in seconds without the need to set up and manage data warehouse clusters. With Redshift Serverless, you pay for the compute only when the data warehouse is in use. This is ideal when it’s difficult to predict compute needs such as variable workloads, periodic workloads with idle time, and steady-state workloads with spikes. As your demand evolves with new workloads and more concurrent users, Redshift Serverless automatically provisions the right compute resources, and your data warehouse scales seamlessly and automatically.

You can create a Redshift Serverless data warehouse either using the default settings or custom settings. Redshift Serverless creates a default workgroup and associates that to the default namespace. You can also create multiple Redshift Serverless endpoints per AWS account and Region using namespaces and workgroups.

A namespace is a collection of database objects and users, with properties such as database name and password, permissions, and encryption and security. The following screenshot shows an example of a namespace configuration on the Redshift Serverless console.

Namespace-Amazon Redshift Serverless

A workgroup is a collection of compute resources, which includes network and security settings. Workgroup configuration allows you to create a private or public serverless endpoint that you can use to connect with your applications. The following screenshot shows an example workgroup on the Redshift Serverless console.

Workgroup - Amazon Redshift Serverless

When the Redshift Serverless endpoint is available, choose Query data to launch the Amazon Redshift Query Editor v2 to create database objects, load data, and analyze and visualize data. You can also connect to Redshift Serverless endpoints using your preferred SQL client tools via Amazon Redshift JDBC/ODBC drivers.

With Redshift Serverless, you pay separately for the compute and storage you use. Compute capacity is measured in Redshift Processing Units (RPUs), and you pay for the workloads in RPU-hours with a minimum charge of 60 seconds, metered on a per-second basis. Data lake queries are also part of the same RPU-hours, and Redshift Serverless doesn’t charge separately for the per-TB based pricing of Amazon Redshift Spectrum. The default base capacity is 128 RPUs, but you can adjust it from 32 RPUs to 512 RPUs in units of 8 using the Redshift Serverless console. For storage, you pay for data stored in Amazon Redshift-managed storage and storage used for manual snapshots, similar to what you would pay with Amazon Redshift provisioned RA3 instances.

To control your costs, you can specify usage limits and define actions that Amazon Redshift automatically takes if those limits are reached. You can specify usage limits in RPU-hours and associated with a daily, weekly, or monthly duration. Setting higher usage limits can improve the overall throughput of the system, especially for workloads that need to handle high concurrency while maintaining consistently high performance.

Why Etleap customers need Redshift Serverless

Etleap gives customers robust and flexible pipelines without the hassle of coding and managing infrastructure. Redshift Serverless has a similar benefit, letting you run Amazon Redshift without worrying about provisioning and maintaining data warehouse.

With the close Etleap-AWS integration, you can get started working with multiple data sources in Redshift Serverless in minutes.

Redshift Serverless can also reduce users’ costs because it automatically scales data warehouse capacity up and down to match usage and only charges when the serverless instance is active. ETL workloads are often batch-based and characterized by spikes, so the dynamic scaling of Redshift Serverless reduces unnecessary costs.

The following diagram illustrates this solution architecture.

Etleap Integration with Amazon Redshift Serverless

Etleap uses Amazon Database Migration Service (AWS DMS), Amazon EMR, and Amazon Simple Storage Service (Amazon S3) to process data from databases, files, applications, and streams into Redshift Serverless.

Optimize costs for Redshift Serverless

One of the main sources of cost savings when using Redshift Serverless comes from its auto-pausing feature. When a Redshift Serverless instance is idle, it will auto-pause and you aren’t charged during this period of inactivity.

However, high frequency ETL pipelines (such as those from streams or CDC sources) can constantly resume the Redshift Serverless instance, negating the cost benefit. To maximize the advantages of the auto-pausing feature of Redshift Serverless, Etleap provides the option of load synchronization. As shown in the following figure, this reduces the number of load batches, thereby lowering active Redshift Serverless instance time and cost.

Etleap Load Synchronization

It sometimes makes sense to maximize the frequency of data ingestion, but not all use cases justify the higher cost of an always-on Amazon Redshift instance. Etleap users can set their load frequency at a cost-efficient once-per-hour or as frequently as every 5 minutes.

Amazon Redshift users typically run some SQL transformations after data is loaded in the warehouse. Etleap’s models feature lets you define the SQL transformations and their dependencies and control when these transformations are run. As with data loading, however, if these aren’t designed thoughtfully, there is a risk that models will trigger updates that unnecessarily wake up an idle Redshift Serverless instance, negating the cost savings of the Redshift Serverless auto-pausing feature.

To avoid this, Etleap schedules the models to update immediately after all the dependent tables have been updated. This maximizes the instance usage while it’s awake and allows it to pause when the loads and updates have completed.

Cost savings example

Let’s illustrate the cost savings benefits of Redshift Serverless by means of an example. A customer has set a 1-hour load synchronization schedule and has 100 pipelines and 10 models. Although by default Redshift Serverless has a provisioned base capacity of 128 RPUs, a provisioned base capacity of 32 RPUs is sufficient for the load requirements of this example. A typical average load time for Etleap customers into Amazon Redshift is 6 seconds. In Etleap, we perform a maximum of five loads at a time to avoid overloading the Redshift Serverless instance.

Here is an example of how the sequence would work for the pipelines:

  1. When the hourly schedule triggers, Etleap begins the extraction and transformation of source data for all pipelines with new data to process.
  2. After all the pipelines have finished extraction and transformation, Etleap begins to load the data into Amazon Redshift. This resumes the serverless instance. At an average of 6 seconds per load and five loads running in parallel, it takes 120 seconds to load all the pipelines (100 / 5 pipeline cycles * 6 seconds each).
  3. When the load is complete, Etleap triggers the model updates. A typical model in Etleap takes about 130 seconds to update. As with loads, Etleap limits models to five simultaneous updates to reduce the load on the Redshift Serverless instance. Therefore, updating all 10 models takes 260 seconds of total instance run time (130 seconds * 10/5 model cycles).
  4. At this point, you’re being charged for 380 seconds of active workload, and Redshift Serverless will become idle after some time.

Additionally, Etleap runs daily vacuum operations on applicable tables to minimize storage and improve query efficiency. The length of this process depends on the tables and the number of updates and deletes. For a customer with this amount of pipeline volume, 20 minutes is a typical length of time to vacuum the tables, adding that much daily runtime for the instance.

This results in a total daily runtime of 172 minutes ((380 seconds * 24 daily cycles / 60) + 20 minutes), which translates into a cost of $34.40 per day for a 32 RPU serverless instance. This is 88% lower cost than a comparable Amazon Redshift provisioned environment without the benefits of Etleap and Redshift Serverless: an always-on provisioned Amazon Redshift cluster with similar performance (1 year reserved instance pricing for 16 ra3.xlplus nodes running 24 hours/day).

Other ETL optimizations on Etleap using Redshift Serverless

Etleap natively supports Redshift Serverless by updating its ETL solution to ensure you can continue to seamlessly ingest diverse data sources.

Redshift Serverless offers new system views that are used for tracking and managing ingestion, and Etleap utilizes these new system views to natively handle tracking ingestion loads and vacuuming operations in their platform. For example, Etleap uses sys_query_history to determine which loads are in progress or complete, and thereby helps avoid double loading a batch.

Redshift Serverless automatically initiates optimizations such as sort and vacuum in the background and doesn’t charge for these automatic optimizations. As a best practice, after Etleap load synchronization, Etleap periodically runs the vacuum function on applicable tables, which reduces storage and improves query performance. Etleap uses the vacuum_sort_benefit column in svv_table_info, which provides the statistics for each table, informing which would benefit from vacuuming.

Summary

In this post, we described how Redshift Serverless frees you from managing data warehouse infrastructure and reduces costs. In particular, we illustrated a data integration pattern where Etleap can ensure further cost savings through its load synchronization feature by optimally choosing a cost-efficient once-per-hour load frequency. Although this proves to be an optimal solution for uses cases where you prefer cost efficiency over real-time data insights, Etleap also allows you to set the load frequency as low as 5 minutes for use cases where near-real-time data insights are important.

Start using Redshift Serverless to run and scale analytics without having to manage data warehouse infrastructure and take advantage of further cost savings through Etleap’s load synchronization feature. To get started with Etleap, start a free trial  or request a tailored demo.


About the Authors

Caius Brindescu is an engineer at Etleap with over 4 years of experience in developing ETL software. In addition to development work, he helps customers make the most out of Etleap and Amazon Redshift. He holds a PhD from Oregon State University and one AWS certification (Big Data – Specialty).

Maneesh Sharma is a Senior Database Engineer at AWS with more than a decade of experience designing and implementing large-scale data warehouse and analytics solutions. He collaborates with various Amazon Redshift Partners and customers to drive better integration.

Sathisan Vannadil is a Senior Partner Solutions Architect at Amazon Web Services (AWS). His primary focus is on helping independent software vendor (ISV) partners design and build solutions at scale on AWS. Prior to AWS, Sathisan held diverse technical positions and has over 20 years of experience in the field of data and analytics.

How GoDaddy built a data mesh to decentralize data ownership using AWS Lake Formation

Post Syndicated from Ankit Jhalaria original https://aws.amazon.com/blogs/big-data/how-godaddy-built-a-data-mesh-to-decentralize-data-ownership-using-aws-lake-formation/

This is a guest post co-written with Ankit Jhalaria from GoDaddy.

GoDaddy is empowering everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their idea, build a professional website, attract customers, and manage their work.

GoDaddy is a data-driven company, and getting meaningful insights from data helps them drive business decisions to delight their customers. In 2018, GoDaddy began a large infrastructure revamp and partnered with AWS to innovate faster than ever before to meet the needs of its customer growth around the world. As part of this revamp, the GoDaddy Data Platform team wanted to set the company up for long-term success by creating a well-defined data strategy and setting goals to decentralize the ownership and processing of data.

In this post, we discuss how GoDaddy uses AWS Lake Formation to simplify security management and data governance at scale, and enable data as a service (DaaS) supporting organization-wide data accessibility with cross-account data sharing using a data mesh architecture.

The challenge

In the vast ocean of data, deriving useful insights is an art. Prior to the AWS partnership, GoDaddy had a shared Hadoop cluster on premises that various teams used to create and share datasets with other analysts for collaboration. As the teams grew, copies of data started to grow in the Hadoop Distributed File System (HDFS). Several teams started to build tooling to manage this challenge independently, duplicating efforts. Managing permissions on these data assets became harder. Making data discoverable across a growing number of data catalogs and systems is something that had started to become a big challenge. Although the cost of storage these days is relatively inexpensive, when there are several copies of the same data asset available, it makes it harder for analysts to efficiently and reliably use the data available to them. Business analysts need robust pipelines on key datasets that they rely upon to make business decisions.

Solution overview

In GoDaddy’s data mesh hub and spoke model, a central data catalog contains information about all the data products that exist in the company. In AWS terminology, this is the AWS Glue Data Catalog. The data platform team provides APIs, SDKs, and Airflow Operators as components that different teams use to interact with the catalog. Activities such as updating the metastore to reflect a new partition for a given data product, and occasionally running MSCK repair operations, are all handled in the central governance account, and Lake Formation is used to secure access to the Data Catalog.

The data platform team introduced a layer of data governance that ensures best practices for building data products are followed throughout the company. We provide the tooling to support data engineers and business analysts while leaving the domain experts to run their data pipelines. With this approach, we have well-curated data products that are intuitive and easy to understand for our business analysts.

A data product refers to an entity that powers insights for analytical purposes. In simple terms, this could refer to an actual dataset pointing to a location in Amazon Simple Storage Service (Amazon S3). Data producers are responsible for the processing of data and creating new snapshots or partitions depending on the business needs. In some cases, data is refreshed every 24 hours, and other cases, every hour. Data consumers come to the data mesh to consume data, and permissions are managed in the central governance account through Lake Formation. Lake Formation uses AWS Resource Access Manager (AWS RAM) to send resource shares to different consumer accounts to be able to access the data from the central governance account. We go into details about this functionality later in the post.

The following diagram illustrates the solution architecture.

Solution architecture illustrated

Defining metadata with the central schema repository

Data is only useful if end-users can derive meaningful insights from it—otherwise, it’s just noise. As part of onboarding with the data platform, a data producer registers their schema with the data platform along with relevant metadata. This is reviewed by the data governance team that ensures best practices for creating datasets are followed. We have automated some of the most common data governance review items. This is also the place where producers define a contract about reliable data deliveries, often referred to as Service Level Objective (SLO). After a contract is in place, the data platform team’s background processes monitor and send out alerts when data producers fail to meet their contract or SLO.

When managing permissions with Lake Formation, you register the Amazon S3 location of different S3 buckets. Lake Formation uses AWS RAM to share the named resource.

When managing resources with AWS RAM, the central governance account creates AWS RAM shares. The data platform provides a custom AWS Service Catalog product to accept AWS RAM shares in consumer accounts.

Having consistent schemas with meaningful names and descriptions makes the discovery of datasets easy. Every data producer who is a domain expert is responsible for creating well-defined schemas that business users use to generate insights to make key business decisions. Data producers register their schemas along with additional metadata with the data lake repository. Metadata includes information about the team responsible for the dataset, such as their SLO contract, description, and contact information. This information gets checked into a Git repository where automation kicks in and validates the request to make sure it conforms to standards and best practices. We use AWS CloudFormation templates to provision resources. The following code is a sample of what the registration metadata looks like.

Sample code of what the registration metadata looks like

As part of the registration process, automation steps run in the background to take care of the following on behalf of the data producer:

  • Register the producer’s Amazon S3 location of the data with Lake Formation – This allows us to use Lake Formation for fine-grained access to control the table in the AWS Glue Data Catalog that refers to this location as well as to the underlying data.
  • Create the underlying AWS Glue database and table – Based on the schema specified by the data producer along with the metadata, we create the underlying AWS Glue database and table in the central governance account. As part of this, we also use table properties of AWS Glue to store additional metadata to use later for analysis.
  • Define the SLO contract – Any business-critical dataset needs to have a well-defined SLO contract. As part of dataset registration, the data producer defines a contract with a cron expression that gets used by the data platform to create an event rule in Amazon EventBridge. This rule triggers an AWS Lambda function to watch for deliveries of the data and triggers an alert to the data producer’s Slack channel if they breach the contract.

Consuming data from the data mesh catalog

When a data consumer belonging to a given line of business (LOB) identifies the data product that they’re interested in, they submit a request to the central governance team containing their AWS account ID that they use to query the data. The data platform provides a portal to discover datasets across the company. After the request is approved, automation runs to create an AWS RAM share with the consumer account covering the AWS Glue database and tables mapped to the data product registered in the AWS Glue Data Catalog of the central governance account.

The following screenshot shows an example of a resource share.

Example of a resource share

The consumer data lake admin needs to accept the AWS RAM share and create a resource link in Lake Formation to start querying the shared dataset within their account. We automated this process by building an AWS Service Catalog product that runs in the consumer’s account as a Lambda function that accepts shares on behalf of consumers.

When the resource linked datasets are available in the consumer account, the consumer data lake admin provides grants to IAM users and roles mapping to data consumers within the account. These consumers (application or user persona) can now query the datasets using AWS analytics services of their choice like Amazon Athena and Amazon EMR based on the access privileges granted by the consumer data lake admin.

Day-to-day operations and metrics

Managing permissions using Lake Formation is one part of the overall ecosystem. After permissions have been granted, data producers create new snapshots of the data at a certain cadence that can vary from every 15 minutes to a day. Data producers are integrated with the data platform APIs that informs the platform about any new refreshes of the data. The data platform automatically writes a 0-byte _SUCCESS file for every dataset that gets refreshed, and notifies the subscribed consumer account via an Amazon Simple Notification Service (Amazon SNS) topic in the central governance account. Consumers use this as a signal to trigger their data pipelines and processes to start processing newer version of the data utilizing an event-driven approach.

There are over 2,000 data products built on the GoDaddy data mesh on AWS. Every day, there are thousands of updates to the AWS Glue metastore in the central data governance account. There are hundreds of data producers generating data every hour in a wide array of S3 buckets, and thousands of data consumers consuming data across a wide array of tools, including Athena, Amazon EMR, and Tableau from different AWS accounts.

Business outcomes

With the move to AWS, GoDaddy’s Data Platform team laid the foundations to build a modern data platform that has increased our velocity of building data products and delighting our customers. The data platform has successfully transitioned from a monolithic platform to a model where ownership of data has been decentralized. We accelerated the data platform adoption to over 10 lines of business and over 300 teams globally, and are successfully managing multiple petabytes of data spread across hundreds of accounts to help our business derive insights faster.

Conclusion

GoDaddy’s hub and spoke data mesh architecture built using Lake Formation simplifies security management and data governance at scale, to deliver data as a service supporting company-wide data accessibility. Our data mesh manages multiple petabytes of data across hundreds of accounts, enabling decentralized ownership of well-defined datasets with automation in place, which helps the business discover data assets quicker and derive business insights faster.

This post illustrates the use of Lake Formation to build a data mesh architecture that enables a DaaS model for a modernized enterprise data platform. For more information, see Design a data mesh architecture using AWS Lake Formation and AWS Glue.


About the Authors

Ankit Jhalaria is the Director Of Engineering on the Data Platform at GoDaddy. He has over 10 years of experience working in big data technologies. Outside of work, Ankit loves hiking, playing board games, building IoT projects, and contributing to open-source projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in Analytics. He has over 6 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Kyle Tedeschi is a Principal Solutions Architect at AWS. He enjoys helping customers innovate, transform, and become leaders in their respective domains. Outside of work, Kyle is an avid snowboarder, car enthusiast, and traveler.

Get started with data integration from Amazon S3 to Amazon Redshift using AWS Glue interactive sessions

Post Syndicated from Vikas Omer original https://aws.amazon.com/blogs/big-data/get-started-with-data-integration-from-amazon-s3-to-amazon-redshift-using-aws-glue-interactive-sessions/

Organizations are placing a high priority on data integration, especially to support analytics, machine learning (ML), business intelligence (BI), and application development initiatives. Data is growing exponentially and is generated by increasingly diverse data sources. Data integration becomes challenging when processing data at scale and the inherent heavy lifting associated with infrastructure required to manage it. This is one of the key reasons why organizations are constantly looking for easy-to-use and low maintenance data integration solutions to move data from one location to another or to consolidate their business data from several sources into a centralized location to make strategic business decisions.

Most organizations use Spark for their big data processing needs. If you’re looking to simplify data integration, and don’t want the hassle of spinning up servers, managing resources, or setting up Spark clusters, we have the solution for you.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, ML, and application development. AWS Glue provides both visual and code-based interfaces to make data integration simple and accessible for everyone.

If you prefer a code-based experience and want to interactively author data integration jobs, we recommend interactive sessions. Interactive sessions is a recently launched AWS Glue feature that allows you to interactively develop AWS Glue processes, run and test each step, and view the results.

There are different options to use interactive sessions. You can create and work with interactive sessions through the AWS Command Line Interface (AWS CLI) and API. You can also use Jupyter-compatible notebooks to visually author and test your notebook scripts. Interactive sessions provide a Jupyter kernel that integrates almost anywhere that Jupyter does, including integrating with IDEs such as PyCharm, IntelliJ, and Visual Studio Code. This enables you to author code in your local environment and run it seamlessly on the interactive session backend. You can also start a notebook through AWS Glue Studio; all the configuration steps are done for you so that you can explore your data and start developing your job script after only a few seconds. When the code is ready, you can configure, schedule, and monitor job notebooks as AWS Glue jobs.

If you haven’t tried AWS Glue interactive sessions before, this post is highly recommended. We work through a simple scenario where you might need to incrementally load data from Amazon Simple Storage Service (Amazon S3) into Amazon Redshift or transform and enrich your data before loading into Amazon Redshift. In this post, we use interactive sessions within an AWS Glue Studio notebook to load the NYC Taxi dataset into an Amazon Redshift Serverless cluster, query the loaded dataset, save our Jupyter notebook as a job, and schedule it to run using a cron expression. Let’s get started.

Solution overview

We walk you through the following steps:

  1. Set up an AWS Glue Jupyter notebook with interactive sessions.
  2. Use notebook’s magics, including AWS Glue connection and bookmarks.
  3. Read data from Amazon S3, and transform and load it into Redshift Serverless.
  4. Save the notebook as an AWS Glue job and schedule it to run.

Prerequisites

For this walkthrough, we must complete the following prerequisites:

  1. Upload Yellow Taxi Trip Records data and the taxi zone lookup table datasets into Amazon S3. Steps to do that are listed in the next section.
  2. Prepare the necessary AWS Identity and Access Management (IAM) policies and roles to work with AWS Glue Studio Jupyter notebooks, interactive sessions, and AWS Glue.
  3. Create the AWS Glue connection for Redshift Serverless.

Upload datasets into Amazon S3

Download Yellow Taxi Trip Records data and taxi zone lookup table data to your local environment. For this post, we download the January 2022 data for yellow taxi trip records data in Parquet format. The taxi zone lookup data is in CSV format. You can also download the data dictionary for the trip record dataset.

  1. On the Amazon S3 console, create a bucket called my-first-aws-glue-is-project-<random number> in the us-east-1 Region to store the data.S3 bucket names must be unique across all AWS accounts in all the Regions.
  2. Create folders nyc_yellow_taxi and taxi_zone_lookup in the bucket you just created and upload the files you downloaded.
    Your folder structures should look like the following screenshots.s3 yellow taxi datas3 lookup data

Prepare IAM policies and role

Let’s prepare the necessary IAM policies and role to work with AWS Glue Studio Jupyter notebooks and interactive sessions. To get started with notebooks in AWS Glue Studio, refer to Getting started with notebooks in AWS Glue Studio.

Create IAM policies for the AWS Glue notebook role

Create the policy AWSGlueInteractiveSessionPassRolePolicy with the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": "iam:PassRole",
        "Resource":"arn:aws:iam::<AWS account ID>:role/AWSGlueServiceRole-GlueIS"
        }
    ]
}

This policy allows the AWS Glue notebook role to pass to interactive sessions so that the same role can be used in both places. Note that AWSGlueServiceRole-GlueIS is the role that we create for the AWS Glue Studio Jupyter notebook in a later step. Next, create the policy AmazonS3Access-MyFirstGlueISProject with the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<your s3 bucket name>",
                "arn:aws:s3:::<your s3 bucket name>/*"
            ]
        }
    ]
}

This policy allows the AWS Glue notebook role to access data in the S3 bucket.

Create an IAM role for the AWS Glue notebook

Create a new AWS Glue role called AWSGlueServiceRole-GlueIS with the following policies attached to it:

Create the AWS Glue connection for Redshift Serverless

Now we’re ready to configure a Redshift Serverless security group to connect with AWS Glue components.

  1. On the Redshift Serverless console, open the workgroup you’re using.
    You can find all the namespaces and workgroups on the Redshift Serverless dashboard.
  2. Under Data access, choose Network and security.
  3. Choose the link for the Redshift Serverless VPC security group.redshift serverless vpc security groupYou’re redirected to the Amazon Elastic Compute Cloud (Amazon EC2) console.
  4. In the Redshift Serverless security group details, under Inbound rules, choose Edit inbound rules.
  5. Add a self-referencing rule to allow AWS Glue components to communicate:
    1. For Type, choose All TCP.
    2. For Protocol, choose TCP.
    3. For Port range, include all ports.
    4. For Source, use the same security group as the group ID.
      redshift inbound security group
  6. Similarly, add the following outbound rules:
    1. A self-referencing rule with Type as All TCP, Protocol as TCP, Port range including all ports, and Destination as the same security group as the group ID.
    2. An HTTPS rule for Amazon S3 access. The s3-prefix-list-id value is required in the security group rule to allow traffic from the VPC to the Amazon S3 VPC endpoint.
      redshift outbound security group

If you don’t have an Amazon S3 VPC endpoint, you can create one on the Amazon Virtual Private Cloud (Amazon VPC) console.

s3 vpc endpoint

You can check the value for s3-prefix-list-id on the Managed prefix lists page on the Amazon VPC console.

s3 prefix list

Next, go to the Connectors page on AWS Glue Studio and create a new JDBC connection called redshiftServerless to your Redshift Serverless cluster (unless one already exists). You can find the Redshift Serverless endpoint details under your workgroup’s General Information section. The connection setting looks like the following screenshot.

redshift serverless connection page

Write interactive code on an AWS Glue Studio Jupyter notebook powered by interactive sessions

Now you can get started with writing interactive code using AWS Glue Studio Jupyter notebook powered by interactive sessions. Note that it’s a good practice to keep saving the notebook at regular intervals while you work through it.

  1. On the AWS Glue Studio console, create a new job.
  2. Select Jupyter Notebook and select Create a new notebook from scratch.
  3. Choose Create.
    glue interactive session create notebook
  4. For Job name, enter a name (for example, myFirstGlueISProject).
  5. For IAM Role, choose the role you created (AWSGlueServiceRole-GlueIS).
  6. Choose Start notebook job.
    glue interactive session notebook setupAfter the notebook is initialized, you can see some of the available magics and a cell with boilerplate code. To view all the magics of interactive sessions, run %help in a cell to print a full list. With the exception of %%sql, running a cell of only magics doesn’t start a session, but sets the configuration for the session that starts when you run your first cell of code.glue interactive session jupyter notebook initializationFor this post, we configure AWS Glue with version 3.0, three G.1X workers, idle timeout, and an Amazon Redshift connection with the help of available magics.
  7. Let’s enter the following magics into our first cell and run it:
    %glue_version 3.0
    %number_of_workers 3
    %worker_type G.1X
    %idle_timeout 60
    %connections redshiftServerless

    We get the following response:

    Welcome to the Glue Interactive Sessions Kernel
    For more information on available magic commands, please type %help in any new cell.
    
    Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
    Installed kernel version: 0.35 
    Setting Glue version to: 3.0
    Previous number of workers: 5
    Setting new number of workers to: 3
    Previous worker type: G.1X
    Setting new worker type to: G.1X
    Current idle_timeout is 2880 minutes.
    idle_timeout has been set to 60 minutes.
    Connections to be included:
    redshiftServerless

  8. Let’s run our first code cell (boilerplate code) to start an interactive notebook session within a few seconds:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

    We get the following response:

    Authenticating with environment variables and user-defined glue_role_arn:arn:aws:iam::xxxxxxxxxxxx:role/AWSGlueServiceRole-GlueIS
    Attempting to use existing AssumeRole session credentials.
    Trying to create a Glue session for the kernel.
    Worker Type: G.1X
    Number of Workers: 3
    Session ID: 7c9eadb1-9f9b-424f-9fba-d0abc57e610d
    Applying the following default arguments:
    --glue_kernel_version 0.35
    --enable-glue-datacatalog true
    --job-bookmark-option job-bookmark-enable
    Waiting for session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d to get into ready status...
    Session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d has been created

  9. Next, read the NYC yellow taxi data from the S3 bucket into an AWS Glue dynamic frame:
    nyc_taxi_trip_input_dyf = glueContext.create_dynamic_frame.from_options(
        connection_type = "s3", 
        connection_options = {
            "paths": ["s3://<your-s3-bucket-name>/nyc_yellow_taxi/"]
        }, 
        format = "parquet",
        transformation_ctx = "nyc_taxi_trip_input_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset.

  10. Count the rows with the following code:
    nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    nyc_taxi_trip_input_df.count()

    We get the following response:

    2463931

  11. View the schema with the following code:
    nyc_taxi_trip_input_df.printSchema()

    We get the following response:

    root
     |-- VendorID: long (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- passenger_count: double (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- RatecodeID: double (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- PULocationID: long (nullable = true)
     |-- DOLocationID: long (nullable = true)
     |-- payment_type: long (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- extra: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- airport_fee: double (nullable = true)

  12. View a few rows of the dataset with the following code:
    nyc_taxi_trip_input_df.show(5)

    We get the following response:

    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |       2| 2022-01-18 15:04:43|  2022-01-18 15:12:51|            1.0|         1.13|       1.0|                 N|         141|         229|           2|        7.0|  0.0|    0.5|       0.0|         0.0|                  0.3|        10.3|                 2.5|        0.0|
    |       2| 2022-01-18 15:03:28|  2022-01-18 15:15:52|            2.0|         1.36|       1.0|                 N|         237|         142|           1|        9.5|  0.0|    0.5|      2.56|         0.0|                  0.3|       15.36|                 2.5|        0.0|
    |       1| 2022-01-06 17:49:22|  2022-01-06 17:57:03|            1.0|          1.1|       1.0|                 N|         161|         229|           2|        7.0|  3.5|    0.5|       0.0|         0.0|                  0.3|        11.3|                 2.5|        0.0|
    |       2| 2022-01-09 20:00:55|  2022-01-09 20:04:14|            1.0|         0.56|       1.0|                 N|         230|         230|           1|        4.5|  0.5|    0.5|      1.66|         0.0|                  0.3|        9.96|                 2.5|        0.0|
    |       2| 2022-01-24 16:16:53|  2022-01-24 16:31:36|            1.0|         2.02|       1.0|                 N|         163|         234|           1|       10.5|  1.0|    0.5|       3.7|         0.0|                  0.3|        18.5|                 2.5|        0.0|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    only showing top 5 rows

  13. Now, read the taxi zone lookup data from the S3 bucket into an AWS Glue dynamic frame:
    nyc_taxi_zone_lookup_dyf = glueContext.create_dynamic_frame.from_options(
        connection_type = "s3", 
        connection_options = {
            "paths": ["s3://<your-s3-bucket-name>/taxi_zone_lookup/"]
        }, 
        format = "csv",
        format_options= {
            'withHeader': True
        },
        transformation_ctx = "nyc_taxi_zone_lookup_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset.

  14. Count the rows with the following code:
    nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    nyc_taxi_zone_lookup_df.count()

    We get the following response:

    265

  15. View the schema with the following code:
    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- LocationID: string (nullable = true)
     |-- Borough: string (nullable = true)
     |-- Zone: string (nullable = true)
     |-- service_zone: string (nullable = true)

  16. View a few rows with the following code:
    nyc_taxi_zone_lookup_df.show(5)

    We get the following response:

    +----------+-------------+--------------------+------------+
    |LocationID|      Borough|                Zone|service_zone|
    +----------+-------------+--------------------+------------+
    |         1|          EWR|      Newark Airport|         EWR|
    |         2|       Queens|         Jamaica Bay|   Boro Zone|
    |         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
    |         4|    Manhattan|       Alphabet City| Yellow Zone|
    |         5|Staten Island|       Arden Heights|   Boro Zone|
    +----------+-------------+--------------------+------------+
    only showing top 5 rows

  17. Based on the data dictionary, lets recalibrate the data types of attributes in dynamic frames corresponding to both dynamic frames:
    nyc_taxi_trip_apply_mapping_dyf = ApplyMapping.apply(
        frame = nyc_taxi_trip_input_dyf, 
        mappings = [
            ("VendorID","Long","VendorID","Integer"), 
            ("tpep_pickup_datetime","Timestamp","tpep_pickup_datetime","Timestamp"), 
            ("tpep_dropoff_datetime","Timestamp","tpep_dropoff_datetime","Timestamp"), 
            ("passenger_count","Double","passenger_count","Integer"), 
            ("trip_distance","Double","trip_distance","Double"),
            ("RatecodeID","Double","RatecodeID","Integer"), 
            ("store_and_fwd_flag","String","store_and_fwd_flag","String"), 
            ("PULocationID","Long","PULocationID","Integer"), 
            ("DOLocationID","Long","DOLocationID","Integer"),
            ("payment_type","Long","payment_type","Integer"), 
            ("fare_amount","Double","fare_amount","Double"),
            ("extra","Double","extra","Double"), 
            ("mta_tax","Double","mta_tax","Double"),
            ("tip_amount","Double","tip_amount","Double"), 
            ("tolls_amount","Double","tolls_amount","Double"), 
            ("improvement_surcharge","Double","improvement_surcharge","Double"), 
            ("total_amount","Double","total_amount","Double"), 
            ("congestion_surcharge","Double","congestion_surcharge","Double"), 
            ("airport_fee","Double","airport_fee","Double")
        ],
        transformation_ctx = "nyc_taxi_trip_apply_mapping_dyf"
    )

    nyc_taxi_zone_lookup_apply_mapping_dyf = ApplyMapping.apply(
        frame = nyc_taxi_zone_lookup_dyf, 
        mappings = [ 
            ("LocationID","String","LocationID","Integer"), 
            ("Borough","String","Borough","String"), 
            ("Zone","String","Zone","String"), 
            ("service_zone","String", "service_zone","String")
        ],
        transformation_ctx = "nyc_taxi_zone_lookup_apply_mapping_dyf"
    )

  18. Now let’s check their schema:
    nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- VendorID: integer (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- passenger_count: integer (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- RatecodeID: integer (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- PULocationID: integer (nullable = true)
     |-- DOLocationID: integer (nullable = true)
     |-- payment_type: integer (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- extra: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- airport_fee: double (nullable = true)

    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- LocationID: integer (nullable = true)
     |-- Borough: string (nullable = true)
     |-- Zone: string (nullable = true)
     |-- service_zone: string (nullable = true)

  19. Let’s add the column trip_duration to calculate the duration of each trip in minutes to the taxi trip dynamic frame:
    # Function to calculate trip duration in minutes
    def trip_duration(start_timestamp,end_timestamp):
        minutes_diff = (end_timestamp - start_timestamp).total_seconds() / 60.0
        return(minutes_diff)

    # Transformation function for each record
    def transformRecord(rec):
        rec["trip_duration"] = trip_duration(rec["tpep_pickup_datetime"], rec["tpep_dropoff_datetime"])
        return rec
    nyc_taxi_trip_final_dyf = Map.apply(
        frame = nyc_taxi_trip_apply_mapping_dyf, 
        f = transformRecord, 
        transformation_ctx = "nyc_taxi_trip_final_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset after applying the above transformation.

  20. Get a record count with the following code:
    nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    nyc_taxi_trip_final_df.count()

    We get the following response:

    2463931

  21. View the schema with the following code:
    nyc_taxi_trip_final_df.printSchema()

    We get the following response:

    root
     |-- extra: double (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- trip_duration: double (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- DOLocationID: integer (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- airport_fee: double (nullable = true)
     |-- payment_type: integer (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- RatecodeID: integer (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- VendorID: integer (nullable = true)
     |-- PULocationID: integer (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- passenger_count: integer (nullable = true)

  22. View a few rows with the following code:
    nyc_taxi_trip_final_df.show(5)

    We get the following response:

    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |extra|tpep_dropoff_datetime|     trip_duration|trip_distance|mta_tax|improvement_surcharge|DOLocationID|congestion_surcharge|total_amount|airport_fee|payment_type|fare_amount|RatecodeID|tpep_pickup_datetime|VendorID|PULocationID|tip_amount|tolls_amount|store_and_fwd_flag|passenger_count|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |  0.0|  2022-01-18 15:12:51| 8.133333333333333|         1.13|    0.5|                  0.3|         229|                 2.5|        10.3|        0.0|           2|        7.0|         1| 2022-01-18 15:04:43|       2|         141|       0.0|         0.0|                 N|              1|
    |  0.0|  2022-01-18 15:15:52|              12.4|         1.36|    0.5|                  0.3|         142|                 2.5|       15.36|        0.0|           1|        9.5|         1| 2022-01-18 15:03:28|       2|         237|      2.56|         0.0|                 N|              2|
    |  3.5|  2022-01-06 17:57:03| 7.683333333333334|          1.1|    0.5|                  0.3|         229|                 2.5|        11.3|        0.0|           2|        7.0|         1| 2022-01-06 17:49:22|       1|         161|       0.0|         0.0|                 N|              1|
    |  0.5|  2022-01-09 20:04:14| 3.316666666666667|         0.56|    0.5|                  0.3|         230|                 2.5|        9.96|        0.0|           1|        4.5|         1| 2022-01-09 20:00:55|       2|         230|      1.66|         0.0|                 N|              1|
    |  1.0|  2022-01-24 16:31:36|14.716666666666667|         2.02|    0.5|                  0.3|         234|                 2.5|        18.5|        0.0|           1|       10.5|         1| 2022-01-24 16:16:53|       2|         163|       3.7|         0.0|                 N|              1|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    only showing top 5 rows

  23. Next, load both the dynamic frames into our Amazon Redshift Serverless cluster:
    nyc_taxi_trip_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame = nyc_taxi_trip_final_dyf, 
        catalog_connection = "redshiftServerless", 
        connection_options =  {"dbtable": "public.f_nyc_yellow_taxi_trip","database": "dev"}, 
        redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", 
        transformation_ctx = "nyc_taxi_trip_sink_dyf"
    )

    nyc_taxi_zone_lookup_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame = nyc_taxi_zone_lookup_apply_mapping_dyf, 
        catalog_connection = "redshiftServerless", 
        connection_options = {"dbtable": "public.d_nyc_taxi_zone_lookup", "database": "dev"}, 
        redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", 
        transformation_ctx = "nyc_taxi_zone_lookup_sink_dyf"
    )

    Now let’s validate the data loaded in Amazon Redshift Serverless cluster by running a few queries in Amazon Redshift query editor v2. You can also use your preferred query editor.

  24. First, we count the number of records and select a few rows in both the target tables (f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup):
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    redshift table record count query output

    The number of records in f_nyc_yellow_taxi_trip (2,463,931) and d_nyc_taxi_zone_lookup (265) match the number of records in our input dynamic frame. This validates that all records from files in Amazon S3 have been successfully loaded into Amazon Redshift.

    You can view some of the records for each table with the following commands:

    SELECT * FROM public.f_nyc_yellow_taxi_trip LIMIT 10;

    redshift fact data select query

    SELECT * FROM public.d_nyc_taxi_zone_lookup LIMIT 10;

    redshift lookup data select query

  25. One of the insights that we want to generate from the datasets is to get the top five routes with their trip duration. Let’s run the SQL for that on Amazon Redshift:
    SELECT 
        CASE WHEN putzl.zone >= dotzl.zone 
            THEN putzl.zone || ' - ' || dotzl.zone 
            ELSE  dotzl.zone || ' - ' || putzl.zone 
        END AS "Route",
        COUNT(1) AS "Frequency",
        ROUND(SUM(trip_duration),1) AS "Total Trip Duration (mins)"
    FROM 
        public.f_nyc_yellow_taxi_trip ytt
    INNER JOIN 
        public.d_nyc_taxi_zone_lookup putzl ON ytt.pulocationid = putzl.locationid
    INNER JOIN 
        public.d_nyc_taxi_zone_lookup dotzl ON ytt.dolocationid = dotzl.locationid
    GROUP BY 
        "Route"
    ORDER BY 
        "Frequency" DESC, "Total Trip Duration (mins)" DESC
    LIMIT 5;

    redshift top 5 route query

Transform the notebook into an AWS Glue job and schedule it

Now that we have authored the code and tested its functionality, let’s save it as a job and schedule it.

Let’s first enable job bookmarks. Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data. With job bookmarks, you can process new data when rerunning on a scheduled interval.

  1. Add the following magic command after the first cell that contains other magic commands initialized during authoring the code:
    %%configure
    {
        "--job-bookmark-option": "job-bookmark-enable"
    }

    To initialize job bookmarks, we run the following code with the name of the job as the default argument (myFirstGlueISProject for this post). Job bookmarks store the states for a job. You should always have job.init() in the beginning of the script and the job.commit() at the end of the script. These two functions are used to initialize the bookmark service and update the state change to the service. Bookmarks won’t work without calling them.

  2. Add the following piece of code after the boilerplate code:
    params = []
    if '--JOB_NAME' in sys.argv:
        params.append('JOB_NAME')
    args = getResolvedOptions(sys.argv, params)
    if 'JOB_NAME' in args:
        jobname = args['JOB_NAME']
    else:
        jobname = "myFirstGlueISProject"
    job.init(jobname, args)

  3. Then comment out all the lines of code that were authored to verify the desired outcome and aren’t necessary for the job to deliver its purpose:
    #nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    #nyc_taxi_trip_input_df.count()
    #nyc_taxi_trip_input_df.printSchema()
    #nyc_taxi_trip_input_df.show(5)
    
    #nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    #nyc_taxi_zone_lookup_df.count()
    #nyc_taxi_zone_lookup_df.printSchema()
    #nyc_taxi_zone_lookup_df.show(5)
    
    #nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()
    #nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()
    
    #nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    #nyc_taxi_trip_final_df.count()
    #nyc_taxi_trip_final_df.printSchema()
    #nyc_taxi_trip_final_df.show(5)

  4. Save the notebook.
    glue interactive session save job
    You can check the corresponding script on the Script tab.glue interactive session script tabNote that job.commit() is automatically added at the end of the script.Let’s run the notebook as a job.
  5. First, truncate f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup tables in Amazon Redshift using the query editor v2 so that we don’t have duplicates in both the tables:
    truncate "public"."f_nyc_yellow_taxi_trip";
    truncate "public"."d_nyc_taxi_zone_lookup";

  6. Choose Run to run the job.
    glue interactive session run jobYou can check its status on the Runs tab.glue interactive session job run statusThe job completed in less than 5 minutes with G1.x 3 DPUs.
  7. Let’s check the count of records in f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup tables in Amazon Redshift:
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    redshift count query output

    With job bookmarks enabled, even if you run the job again with no new files in corresponding folders in the S3 bucket, it doesn’t process the same files again. The following screenshot shows a subsequent job run in my environment, which completed in less than 2 minutes because there were no new files to process.

    glue interactive session job re-run

    Now let’s schedule the job.

  8. On the Schedules tab, choose Create schedule.
    glue interactive session create schedule
  9. For Name¸ enter a name (for example, myFirstGlueISProject-testSchedule).
  10. For Frequency, choose Custom.
  11. Enter a cron expression so the job runs every Monday at 6:00 AM.
  12. Add an optional description.
  13. Choose Create schedule.
    glue interactive session add schedule

The schedule has been saved and activated. You can edit, pause, resume, or delete the schedule from the Actions menu.

glue interactive session schedule action

Clean up

To avoid incurring future charges, delete the AWS resources you created.

  • Delete the AWS Glue job (myFirstGlueISProject for this post).
  • Delete the Amazon S3 objects and bucket (my-first-aws-glue-is-project-<random number> for this post).
  • Delete the AWS IAM policies and roles (AWSGlueInteractiveSessionPassRolePolicy, AmazonS3Access-MyFirstGlueISProject and AWSGlueServiceRole-GlueIS).
  • Delete the Amazon Redshift tables (f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup).
  • Delete the AWS Glue JDBC Connection (redshiftServerless).
  • Also delete the self-referencing Redshift Serverless security group, and Amazon S3 endpoint (if you created it while following the steps for this post).

Conclusion

In this post, we demonstrated how to do the following:

  • Set up an AWS Glue Jupyter notebook with interactive sessions
  • Use the notebook’s magics, including the AWS Glue connection onboarding and bookmarks
  • Read the data from Amazon S3, and transform and load it into Amazon Redshift Serverless
  • Configure magics to enable job bookmarks, save the notebook as an AWS Glue job, and schedule it using a cron expression

The goal of this post is to give you step-by-step fundamentals to get you going with AWS Glue Studio Jupyter notebooks and interactive sessions. You can set up an AWS Glue Jupyter notebook in minutes, start an interactive session in seconds, and greatly improve the development experience with AWS Glue jobs. Interactive sessions have a 1-minute billing minimum with cost control features that reduce the cost of developing data preparation applications. You can build and test applications from the environment of your choice, even on your local environment, using the interactive sessions backend.

Interactive sessions provide a faster, cheaper, and more flexible way to build and run data preparation and analytics applications. To learn more about interactive sessions, refer to Job development (interactive sessions), and start exploring a whole new development experience with AWS Glue. Additionally, check out the following posts to walk through more examples of using interactive sessions with different options:


About the Authors

Vikas blog picVikas Omer is a principal analytics specialist solutions architect at Amazon Web Services. Vikas has a strong background in analytics, customer experience management (CEM), and data monetization, with over 13 years of experience in the industry globally. With six AWS Certifications, including Analytics Specialty, he is a trusted analytics advocate to AWS customers and partners. He loves traveling, meeting customers, and helping them become successful in what they do.

Nori profile picNoritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Gal blog picGal Heyne is a Product Manager for AWS Glue and has over 15 years of experience as a product manager, data engineer and data architect. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design elegant, powerful and easy to use data products. Gal has a Master’s degree in Data Science from UC Berkeley and she enjoys traveling, playing board games and going to music concerts.

Share and publish your Snowflake data to AWS Data Exchange using Amazon Redshift data sharing

Post Syndicated from Raks Khare original https://aws.amazon.com/blogs/big-data/share-and-publish-your-snowflake-data-to-aws-data-exchange-using-amazon-redshift-data-sharing/

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. Today, tens of thousands of AWS customers—from Fortune 500 companies, startups, and everything in between—use Amazon Redshift to run mission-critical business intelligence (BI) dashboards, analyze real-time streaming data, and run predictive analytics. With the constant increase in generated data, Amazon Redshift customers continue to achieve successes in delivering better service to their end-users, improving their products, and running an efficient and effective business.

In this post, we discuss a customer who is currently using Snowflake to store analytics data. The customer needs to offer this data to clients who are using Amazon Redshift via AWS Data Exchange, the world’s most comprehensive service for third-party datasets. We explain in detail how to implement a fully integrated process that will automatically ingest data from Snowflake into Amazon Redshift and offer it to clients via AWS Data Exchange.

Overview of the solution

The solution consists of four high-level steps:

  1. Configure Snowflake to push the changed data for identified tables into an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use a custom-built Redshift Auto Loader to load this Amazon S3 landed data to Amazon Redshift.
  3. Merge the data from the change data capture (CDC) S3 staging tables to Amazon Redshift tables.
  4. Use Amazon Redshift data sharing to license the data to customers via AWS Data Exchange as a public or private offering.

The following diagram illustrates this workflow.

Solution Architecture Diagram

Prerequisites

To get started, you need the following prerequisites:

Configure Snowflake to track the changed data and unload it to Amazon S3

In Snowflake, identify the tables that you need to replicate to Amazon Redshift. For the purpose of this demo, we use the data in the TPCH_SF1 schema’s Customer, LineItem, and Orders tables of the SNOWFLAKE_SAMPLE_DATA database, which comes out of the box with your Snowflake account.

  1. Make sure that the Snowflake external stage name unload_to_s3 created in the prerequisites is pointing to the S3 prefix s3-redshift-loader-sourcecreated in the previous step.
  2. Create a new schema BLOG_DEMO in the DEMO_DB database:CREATE SCHEMA demo_db.blog_demo;
  3. Duplicate the Customer, LineItem, and Orders tables in the TPCH_SF1 schema to the BLOG_DEMO schema:
    CREATE TABLE CUSTOMER AS 
    SELECT * FROM snowflake_sample_data.tpch_sf1.CUSTOMER;
    CREATE TABLE ORDERS AS
    SELECT * FROM snowflake_sample_data.tpch_sf1.ORDERS;
    CREATE TABLE LINEITEM AS 
    SELECT * FROM snowflake_sample_data.tpch_sf1.LINEITEM;

  4. Verify that the tables have been duplicated successfully:
    SELECT table_catalog, table_schema, table_name, row_count, bytes
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_SCHEMA = 'BLOG_DEMO'
    ORDER BY ROW_COUNT;

    unload-step-4

  5. Create table streams to track data manipulation language (DML) changes made to the tables, including inserts, updates, and deletes:
    CREATE OR REPLACE STREAM CUSTOMER_CHECK ON TABLE CUSTOMER;
    CREATE OR REPLACE STREAM ORDERS_CHECK ON TABLE ORDERS;
    CREATE OR REPLACE STREAM LINEITEM_CHECK ON TABLE LINEITEM;

  6. Perform DML changes to the tables (for this post, we run UPDATE on all tables and MERGE on the customer table):
    UPDATE customer 
    SET c_comment = 'Sample comment for blog demo' 
    WHERE c_custkey between 0 and 10; 
    UPDATE orders 
    SET o_comment = 'Sample comment for blog demo' 
    WHERE o_orderkey between 1800001 and 1800010; 
    UPDATE lineitem 
    SET l_comment = 'Sample comment for blog demo' 
    WHERE l_orderkey between 3600001 and 3600010;
    MERGE INTO customer c 
    USING 
    ( 
    SELECT n_nationkey 
    FROM snowflake_sample_data.tpch_sf1.nation s 
    WHERE n_name = 'UNITED STATES') n 
    ON n.n_nationkey = c.c_nationkey 
    WHEN MATCHED THEN UPDATE SET c.c_comment = 'This is US based customer1';

  7. Validate that the stream tables have recorded all changes:
    SELECT * FROM CUSTOMER_CHECK; 
    SELECT * FROM ORDERS_CHECK; 
    SELECT * FROM LINEITEM_CHECK;

    For example, we can query the following customer key value to verify how the events were recorded for the MERGE statement on the customer table:

    SELECT * FROM CUSTOMER_CHECK where c_custkey = 60027;

    We can see the METADATA$ISUPDATE column as TRUE, and we see DELETE followed by INSERT in the METADATA$ACTION column.
    unload-val-step-7

  8. Run the COPY command to offload the CDC from the stream tables to the S3 bucket using the external stage name unload_to_s3.In the following code, we’re also copying the data to S3 folders ending with _stg to ensure that when Redshift Auto Loader automatically creates these tables in Amazon Redshift, they get created and marked as staging tables:
    COPY INTO @unload_to_s3/customer_stg/
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    COPY INTO @unload_to_s3/customer_stg/
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    COPY INTO @unload_to_s3/lineitem_stg/ 
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check) 
    FILE_FORMAT = (TYPE = PARQUET) 
    OVERWRITE = TRUE HEADER = TRUE;

  9. Verify the data in the S3 bucket. There will be three sub-folders created in the s3-redshift-loader-source folder of the S3 bucket, and each will have .parquet data files.unload-step-9-valunload-step-9-valYou can also automate the preceding COPY commands using tasks, which can be scheduled to run at a set frequency for automatic copy of CDC data from Snowflake to Amazon S3.
  10. Use the ACCOUNTADMIN role to assign the EXECUTE TASK privilege. In this scenario, we’re assigning the privileges to the SYSADMIN role:
    USE ROLE accountadmin;
    GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE sysadmin;

  11. Use the SYSADMIN role to create three separate tasks to run three COPY commands every 5 minutes: USE ROLE sysadmin;
    /* Task to offload Customer CDC table */ 
    CREATE TASK sf_rs_customer_cdc 
    WAREHOUSE = SMALL 
    SCHEDULE = 'USING CRON 5 * * * * UTC' 
    AS 
    COPY INTO @unload_to_s3/customer_stg/ 
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check) 
    FILE_FORMAT = (TYPE = PARQUET) 
    OVERWRITE = TRUE 
    HEADER = TRUE;
    /*Task to offload Orders CDC table */ 
    CREATE TASK sf_rs_orders_cdc 
    WAREHOUSE = SMALL 
    SCHEDULE = 'USING CRON 5 * * * * UTC' 
    AS 
    COPY INTO @unload_to_s3/orders_stg/ 
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.orders_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    /* Task to offload Lineitem CDC table */ 
    CREATE TASK sf_rs_lineitem_cdc 
    WAREHOUSE = SMALL 
    SCHEDULE = 'USING CRON 5 * * * * UTC' 
    AS 
    COPY INTO @unload_to_s3/lineitem_stg/ 
    FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    When the tasks are first created, they’re in a SUSPENDED state.

  12. Alter the three tasks and set them to RESUME state:
    ALTER TASK sf_rs_customer_cdc RESUME;
    ALTER TASK sf_rs_orders_cdc RESUME;
    ALTER TASK sf_rs_lineitem_cdc RESUME;

  13. Validate that all three tasks have been resumed successfully: SHOW TASKS;unload-setp-13-valNow the tasks will run every 5 minutes and look for new data in the stream tables to offload to Amazon S3.As soon as data is migrated from Snowflake to Amazon S3, Redshift Auto Loader automatically infers the schema and instantly creates corresponding tables in Amazon Redshift. Then, by default, it starts loading data from Amazon S3 to Amazon Redshift every 5 minutes. You can also change the default setting of 5 minutes.
  14. On the Amazon Redshift console, launch the query editor v2 and connect to your Amazon Redshift cluster.
  15. Browse to the dev database, public schema, and expand Tables.
    You can see three staging tables created with the same name as the corresponding folders in Amazon S3.
  16. Validate the data in one of the tables by running the following query:SELECT * FROM "dev"."public"."customer_stg";unload-step-16-val

Configure the Redshift Auto Loader utility

The Redshift Auto Loader makes data ingestion to Amazon Redshift significantly easier because it automatically loads data files from Amazon S3 to Amazon Redshift. The files are mapped to the respective tables by simply dropping files into preconfigured locations on Amazon S3. For more details about the architecture and internal workflow, refer to the GitHub repo.

We use an AWS CloudFormation template to set up Redshift Auto Loader. Complete the following steps:

  1. Launch the CloudFormation template.
  2. Choose Next.
    autoloader-step-2
  3. For Stack name, enter a name.
  4. Provide the parameters listed in the following table.

    CloudFormation Template Parameter Allowed Values Description
    RedshiftClusterIdentifier Amazon Redshift cluster identifier Enter the Amazon Redshift cluster identifier.
    DatabaseUserName Database user name in the Amazon Redshift cluster The Amazon Redshift database user name that has access to run the SQL script.
    DatabaseName S3 bucket name The name of the Amazon Redshift primary database where the SQL script is run.
    DatabaseSchemaName Database name in Amazon Redshift The Amazon Redshift schema name where the tables are created.
    RedshiftIAMRoleARN Default or the valid IAM role ARN attached to the Amazon Redshift cluster The IAM role ARN associated with the Amazon Redshift cluster. Your default IAM role is set for the cluster and has access to your S3 bucket, leave it at the default.
    CopyCommandOptions Copy option; default is delimiter ‘|’ gzip

    Provide the additional COPY command data format parameters.

    If InitiateSchemaDetection = Yes, then the process attempts to detect the schema and automatically set the suitable copy command options.

    In the event of failure on schema detection or when InitiateSchemaDetection = No, then this value is used as the default COPY command options to load data.

    SourceS3Bucket S3 bucket name The S3 bucket where the data is stored. Make sure the IAM role that is associated to the Amazon Redshift cluster has access to this bucket.
    InitiateSchemaDetection Yes/No

    Set to Yes to dynamically detect the schema prior to file load and create a table in Amazon Redshift if it doesn’t exist already. If a table already exists, then it won’t drop or recreate the table in Amazon Redshift.

    If schema detection fails, the process uses the default COPY options as specified in CopyCommandOptions.

    The Redshift Auto Loader uses the COPY command to load data into Amazon Redshift. For this post, set CopyCommandOptions as follows, and configure any supported COPY command options:

    delimiter '|' dateformat 'auto' TIMEFORMAT 'auto'

    autoloader-input-parameters

  5. Choose Next.
  6. Accept the default values on the next page and choose Next.
  7. Select the acknowledgement check box and choose Create stack.
    autoloader-step-7
  8. Monitor the progress of the Stack creation and wait until it is complete.
  9. To verify the Redshift Auto Loader configuration, sign in to the Amazon S3 console and navigate to the S3 bucket you provided.
    You should see a new directory s3-redshift-loader-source is created.
    autoloader-step-9

Copy all the data files exported from Snowflake under s3-redshift-loader-source.

Merge the data from the CDC S3 staging tables to Amazon Redshift tables

To merge your data from Amazon S3 to Amazon Redshift, complete the following steps:

  1. Create a temporary staging table merge_stg and insert all the rows from the S3 staging table that have metadata_action as INSERT, using the following code. This includes all the new inserts as well as the update.
    CREATE TEMP TABLE merge_stg 
    AS
    SELECT * FROM
    (
    SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC
    ) AS rnk
    FROM customer_stg WHERE rnk = 1 AND metadata$action = 'INSERT'

    The preceding code uses a window function DENSE_RANK() to select the latest entries for a given c_custkey by assigning a rank to each row for a given c_custkey and arrange the data in descending order using last_updated_ts. We then select the rows with rnk=1 and metadata$action = ‘INSERT’ to capture all the inserts.

  2. Use the S3 staging table customer_stg to delete the records from the base table customer, which are marked as deletes or updates:
    DELETE FROM customer 
    USING customer_stg 
    WHERE customer.c_custkey = customer_stg.c_custkey;

    This deletes all the rows that are present in the CDC S3 staging table, which takes care of rows marked for deletion and updates.

  3. Use the temporary staging table merge_stg to insert the records marked for updates or inserts:
    INSERT INTO customer 
    SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment 
    FROM merge_stg;

  4. Truncate the staging table, because we have already updated the target table:truncate customer_stg;
  5. You can also run the preceding steps as a stored procedure:
    CREATE OR REPLACE PROCEDURE merge_customer()
    AS $$
    BEGIN
    /*CREATING TEMP TABLE TO GET THE MOST LATEST RECORDS FOR UPDATES/NEW INSERTS*/
    CREATE TEMP TABLE merge_stg AS
    SELECT * FROM
    (
    SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC ) AS rnk
    FROM customer_stg
    )
    WHERE rnk = 1 AND metadata$action = 'INSERT';
    /* DELETING FROM THE BASE TABLE USING THE CDC STAGING TABLE ALL THE RECORDS MARKED AS DELETES OR UPDATES*/
    DELETE FROM customer
    USING customer_stg
    WHERE customer.c_custkey = customer_stg.c_custkey;
    /*INSERTING NEW/UPDATED RECORDS IN THE BASE TABLE*/ 
    INSERT INTO customer
    SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment
    FROM merge_stg;
    truncate customer_stg;
    END;
    $$ LANGUAGE plpgsql;

    For example, let’s look at the before and after states of the customer table when there’s been a change in data for a particular customer.

    The following screenshot shows the new changes recorded in the customer_stg table for c_custkey = 74360.
    merge-process-new-changes
    We can see two records for a customer with c_custkey=74360 one with metadata$action as DELETE and one with metadata$action as INSERT. That means the record with c_custkey was updated at the source and these changes need to be applied to the target customer table in Amazon Redshift.

    The following screenshot shows the current state of the customer table before these changes have been merged using the preceding stored procedure:
    merge-process-current-state

  6. Now, to update the target table, we can run the stored procedure as follows: CALL merge_customer()The following screenshot shows the final state of the target table after the stored procedure is complete.
    merge-process-after-sp

Run the stored procedure on a schedule

You can also run the stored procedure on a schedule via Amazon EventBridge. The scheduling steps are as follows:

  1. On the EventBridge console, choose Create rule.
    sp-schedule-1
  2. For Name, enter a meaningful name, for example, Trigger-Snowflake-Redshift-CDC-Merge.
  3. For Event bus, choose default.
  4. For Rule Type, select Schedule.
  5. Choose Next.
    sp-schedule-step-5
  6. For Schedule pattern, select A schedule that runs at a regular rate, such as every 10 minutes.
  7. For Rate expression, enter Value as 5 and choose Unit as Minutes.
  8. Choose Next.
    sp-schedule-step-8
  9. For Target types, choose AWS service.
  10. For Select a Target, choose Redshift cluster.
  11. For Cluster, choose the Amazon Redshift cluster identifier.
  12. For Database name, choose dev.
  13. For Database user, enter a user name with access to run the stored procedure. It uses temporary credentials to authenticate.
  14. Optionally, you can also use AWS Secrets Manager for authentication.
  15. For SQL statement, enter CALL merge_customer().
  16. For Execution role, select Create a new role for this specific resource.
  17. Choose Next.
    sp-schedule-step-17
  18. Review the rule parameters and choose Create rule.

After the rule has been created, it automatically triggers the stored procedure in Amazon Redshift every 5 minutes to merge the CDC data into the target table.

Configure Amazon Redshift to share the identified data with AWS Data Exchange

Now that you have the data stored inside Amazon Redshift, you can publish it to customers using AWS Data Exchange.

  1. In Amazon Redshift, using any query editor, create the data share and add the tables to be shared:
    CREATE DATASHARE salesshare MANAGEDBY ADX;
    ALTER DATASHARE salesshare ADD SCHEMA tpch_sf1;
    ALTER DATASHARE salesshare ADD TABLE tpch_sf1.customer;

    ADX-step1

  2. On the AWS Data Exchange console, create your dataset.
  3. Select Amazon Redshift datashare.
    ADX-step3-create-datashare
  4. Create a revision in the dataset.
    ADX-step4-create-revision
  5. Add assets to the revision (in this case, the Amazon Redshift data share).
    ADX-addassets
  6. Finalize the revision.
    ADX-step-6-finalizerevision

After you create the dataset, you can publish it to the public catalog or directly to customers as a private product. For instructions on how to create and publish products, refer to NEW – AWS Data Exchange for Amazon Redshift

Clean up

To avoid incurring future charges, complete the following steps:

  1. Delete the CloudFormation stack used to create the Redshift Auto Loader.
  2. Delete the Amazon Redshift cluster created for this demonstration.
  3. If you were using an existing cluster, drop the created external table and external schema.
  4. Delete the S3 bucket you created.
  5. Delete the Snowflake objects you created.

Conclusion

In this post, we demonstrated how you can set up a fully integrated process that continuously replicates data from Snowflake to Amazon Redshift and then uses Amazon Redshift to offer data to downstream clients over AWS Data Exchange. You can use the same architecture for other purposes, such as sharing data with other Amazon Redshift clusters within the same account, cross-accounts, or even cross-Regions if needed.


About the Authors

Raks KhareRaks Khare is an Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers architect data analytics solutions at scale on the AWS platform.

Ekta Ahuja is a Senior Analytics Specialist Solutions Architect at AWS. She is passionate about helping customers build scalable and robust data and analytics solutions. Before AWS, she worked in several different data engineering and analytics roles. Outside of work, she enjoys baking, traveling, and board games.

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 13 years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling
and cooking.

Ahmed Shehata is a Senior Analytics Specialist Solutions Architect at AWS based on Toronto. He has more than two decades of experience helping customers modernize their data platforms, Ahmed is passionate about helping customers build efficient, performant and scalable Analytic solutions.

Use an event-driven architecture to build a data mesh on AWS

Post Syndicated from Jan Michael Go Tan original https://aws.amazon.com/blogs/big-data/use-an-event-driven-architecture-to-build-a-data-mesh-on-aws/

In this post, we take the data mesh design discussed in Design a data mesh architecture using AWS Lake Formation and AWS Glue, and demonstrate how to initialize data domain accounts to enable managed sharing; we also go through how we can use an event-driven approach to automate processes between the central governance account and data domain accounts (producers and consumers). We build a data mesh pattern from scratch as Infrastructure as Code (IaC) using AWS CDK and use an open-source self-service data platform UI to share and discover data between business units.

The key advantage of this approach is being able to add actions in response to data mesh events such as permission management, tag propagation, search index management, and to automate different processes.

Before we dive into it, let’s look at AWS Analytics Reference Architecture, an open-source library that we use to build our solution.

AWS Analytics Reference Architecture

AWS Analytics Reference Architecture (ARA) is a set of analytics solutions put together as end-to-end examples. It regroups AWS best practices for designing, implementing, and operating analytics platforms through different purpose-built patterns, handling common requirements, and solving customers’ challenges.

ARA exposes reusable core components in an AWS CDK library, currently available in Typescript and Python. This library contains AWS CDK constructs (L3) that can be used to quickly provision analytics solutions in demos, prototypes, proofs of concept, and end-to-end reference architectures.

The following table lists data mesh specific constructs in the AWS Analytics Reference Architecture library.

Construct Name Purpose
CentralGovernance Creates an Amazon EventBridge event bus for central governance account that is used to communicate with data domain accounts (producer/consumer). Creates workflows to automate data product registration and sharing.
DataDomain Creates an Amazon EventBridge event bus for data domain account (producer/consumer) to communicate with central governance account. It creates data lake storage (Amazon S3), and workflow to automate data product registration. It also creates a workflow to populate AWS Glue Catalog metadata for newly registered data product.

You can find AWS CDK constructs for the AWS Analytics Reference Architecture on Construct Hub.

In addition to ARA constructs, we also use an open-source Self-service data platform (User Interface). It is built using AWS Amplify, Amazon DynamoDB, AWS Step Functions, AWS Lambda, Amazon API Gateway, Amazon EventBridge, Amazon Cognito, and Amazon OpenSearch. The frontend is built with React. Through the self-service data platform you can: 1) manage data domains and data products, and 2) discover and request access to data products.

Central Governance and data sharing

For the governance of our data mesh, we will use AWS Lake Formation. AWS Lake Formation is a fully managed service that simplifies data lake setup, supports centralized security management, and provides transactional access on top of your data lake. Moreover, it enables data sharing across accounts and organizations. This centralized approach has a number of key benefits, such as: centralized audit; centralized permission management; and centralized data discovery. More importantly, this allows organizations to gain the benefits of centralized governance while taking advantage of the inherent scaling characteristics of decentralized data product management.

There are two ways to share data resources in Lake Formation: 1) Named Based Access Control (NRAC), and 2) Tag-Based Access Control (LF-TBAC). NRAC uses AWS Resource Access Manager (AWS RAM) to share data resources across accounts. Those are consumed via resource links that are based on created resource shares. Tag-Based Access Control (LF-TBAC) is another approach to share data resources in AWS Lake Formation, that defines permissions based on attributes. These attributes are called LF-tags. You can read this blog to learn about LF-TBAC in the context of data mesh.

The following diagram shows how NRAC and LF-TBAC data sharing works. In this example, data domain is registered as a node on mesh and therefore we create two databases in the central governance account. NRAC database is shared with data domain via AWS RAM. Access to data products that we register in this database will be handled through NRAC. LF-TBAC database is tagged with data domain N line of business (LOB) LF-tag: <LOB:N>. LOB tag is automatically shared with data domain N account and therefore database is available in that account. Access to Data Products in this database will be handled through LF-TBAC.

BDB-2279-ram-tag-share

In our solution we will demonstrate both NRAC and LF-TBAC approaches. With the NRAC approach, we will build up an event-based workflow that would automatically accept RAM share in the data domain accounts and automate the creation of the necessary metadata objects (eg. local database, resource links, etc). While with the LF-TBAC approach, we rely on permissions associated with the shared LF-Tags to allow producer data domains to manage their data products, and consumer data domains read access to the relevant data products associated with the LF-Tags that they requested access to.

We use CentralGovernance construct from ARA library to build a central governance account. It creates an EventBridge event bus to enable communication with data domain accounts that register as nodes on mesh. For each registered data domain, specific event bus rules are created that route events towards that account. Central governance account has a central metadata catalog that allows for data to be stored in different data domains, as opposed to a single central lake. For each registered data domain, we create two separate databases in central governance catalog to demonstrate both NRAC and LF-TBAC data sharing. CentralGovernance construct creates workflows for data product registration and data product sharing. We also deploy a self-service data platform UI  to enable good user experience to manage data domains, data products, and to simplify data discovery and sharing.

BDB-2279-central-gov

A data domain: producer and consumer

We use DataDomain construct from ARA library to build a data domain account that can be either producer, consumer, or both. Producers manage the lifecycle of their respective data products in their own AWS accounts. Typically, this data is stored in Amazon Simple Storage Service (Amazon S3). DataDomain construct creates a data lake storage with cross-account bucket policy that enables central governance account to access the data. Data is encrypted using AWS KMS, and central governance account has a permission to use the key. Config secret in AWS Secrets Manager contains all the necessary information to register data domain as a node on mesh in central governance. It includes: 1) data domain name, 2) S3 location that holds data products, and 3) encryption key ARN. DataDomain construct also creates data domain and crawler workflows to automate data product registration.

BDB-2279-data-domain

Creating an event-driven data mesh

Data mesh architectures typically require some level of communication and trust policy management to maintain least privileges of the relevant principals between the different accounts (for example, central governance to producer, central governance to consumer). We use event-driven approach via EventBridge to securely forward events from one event bus to event bus in another account while maintaining the least privilege access. When we register data domain to central governance account through the self-service data platform UI, we establish bi-directional communication between the accounts via EventBridge. Domain registration process also creates database in the central governance catalog to hold data products for that particular domain. Registered data domain is now a node on mesh and we can register new data products.

The following diagram shows data product registration process:

BDB-2279-register-dd-small

  1. Starts Register Data Product workflow that creates an empty table (the schema is managed by the producers in their respective producer account). This workflow also grants a cross-account permission to the producer account that allows producer to manage the schema of the table.
  2. When complete, this emits an event into the central event bus.
  3. The central event bus contains a rule that forwards the event to the producer’s event bus. This rule was created during the data domain registration process.
  4. When the producer’s event bus receives the event, it triggers the Data Domain workflow, which creates resource-links and grants permissions.
  5. Still in the producer account, Crawler workflow gets triggered when the Data Domain workflow state changes to Successful. This creates the crawler, runs it, waits and checks if the crawler is done, and deletes the crawler when it’s complete. This workflow is responsible for populating tables’ schemas.

Now other data domains can find newly registered data products using the self-service data platform UI and request access. The sharing process works in the same way as product registration by sending events from the central governance account to consumer data domain, and triggering specific workflows.

Solution Overview

The following high-level solution diagram shows how everything fits together and how event-driven architecture enables multiple accounts to form a data mesh. You can follow the workshop that we released to deploy the solution that we covered in this blog post. You can deploy multiple data domains and test both data registration and data sharing. You can also use self-service data platform UI to search through data products and request access using both LF-TBAC and NRAC approaches.

BDB-2279-arch-diagram

Conclusion

Implementing a data mesh on top of an event-driven architecture provides both flexibility and extensibility. A data mesh by itself has several moving parts to support various functionalities, such as onboarding, search, access management and sharing, and more. With an event-driven architecture, we can implement these functionalities in smaller components to make them easier to test, operate, and maintain. Future requirements and applications can use the event stream to provide their own functionality, making the entire mesh much more valuable to your organization.

To learn more how to design and build applications based on event-driven architecture, see the AWS Event-Driven Architecture page. To dive deeper into data mesh concepts, see the Design a Data Mesh Architecture using AWS Lake Formation and AWS Glue blog.

If you’d like our team to run data mesh workshop with you, please reach out to your AWS team.


About the authors


Jan Michael Go Tan is a Principal Solutions Architect for Amazon Web Services. He helps customers design scalable and innovative solutions with the AWS Cloud.

Dzenan Softic is a Senior Solutions Architect at AWS. He works with startups to help them define and execute their ideas. His main focus is in data engineering and infrastructure.

David Greenshtein is a Specialist Solutions Architect for Analytics at AWS with a passion for ETL and automation. He works with AWS customers to design and build analytics solutions enabling business to make data-driven decisions. In his free time, he likes jogging and riding bikes with his son.
Vincent Gromakowski is an Analytics Specialist Solutions Architect at AWS where he enjoys solving customers’ analytics, NoSQL, and streaming challenges. He has a strong expertise on distributed data processing engines and resource orchestration platform.

How Hudl built a cost-optimized AWS Glue pipeline with Apache Hudi datasets

Post Syndicated from Indira Balakrishnan original https://aws.amazon.com/blogs/big-data/how-hudl-built-a-cost-optimized-aws-glue-pipeline-with-apache-hudi-datasets/

This is a guest blog post co-written with Addison Higley and Ramzi Yassine from Hudl.

Hudl Agile Sports Technologies, Inc. is a Lincoln, Nebraska based company that provides tools for coaches and athletes to review game footage and improve individual and team play. Its initial product line served college and professional American football teams. Today, the company provides video services to youth, amateur, and professional teams in American football as well as other sports, including soccer, basketball, volleyball, and lacrosse. It now serves 170,000 teams in 50 different sports around the world. Hudl’s overall goal is to capture and bring value to every moment in sports.

Hudl’s mission is to make every moment in sports count. Hudl does this by expanding access to more moments through video and data and putting those moments in context. Our goal is to increase access by different people and increase context with more data points for every customer we serve. Using data to generate analytics, Hudl is able to turn data into actionable insights, telling powerful stories with video and data.

To best serve our customers and provide the most powerful insights possible, we need to be able to compare large sets of data between different sources. For example, enriching our MongoDB and Amazon DocumentDB (with MongoDB compatibility) data with our application logging data leads to new insights. This requires resilient data pipelines.

In this post, we discuss how Hudl has iterated on one such data pipeline using AWS Glue to improve performance and scalability. We talk about the initial architecture of this pipeline, and some of the limitations associated with this approach. We also discuss how we iterated on that design using Apache Hudi to dramatically improve performance.

Problem statement

A data pipeline that ensures high-quality MongoDB and Amazon DocumentDB statistics data is available in our central data lake, and is a requirement for Hudl to be able to deliver sports analytics. It’s important to maintain the integrity of the data between MongoDB and Amazon DocumentDB transactional data with the data lake capturing changes in near-real time along with upserts to records in the data lake. Because Hudl statistics are backed by MongoDB and Amazon DocumentDB databases, in addition to a broad range of other data sources, it’s important that relevant MongoDB and Amazon DocumentDB data is available in a central data lake where we can run analytics queries to compare statistics data between sources.

Initial design

The following diagram demonstrates the architecture of our initial design.

Intial Ingestion Pipeline Design

Let’s discuss the key AWS services of this architecture:

  • AWS Data Migration Service (AWS DMS) allowed our team to move quickly in delivering this pipeline. AWS DMS gives our team a full snapshot of the data, and also offers ongoing change data capture (CDC). By combining these two datasets, we can ensure our pipeline delivers the latest data.
  • Amazon Simple Storage Service (Amazon S3) is the backbone of Hudl’s data lake because of its durability, scalability, and industry-leading performance.
  • AWS Glue allows us to run our Spark workloads in a serverless fashion, with minimal setup. We chose AWS Glue for its ease of use and speed of development. Additionally, features such as AWS Glue bookmarking simplified our file management logic.
  • Amazon Redshift offers petabyte-scale data warehousing. Amazon Redshift provides consistently fast performance, and easy integrations with our S3 data lake.

The data processing flow includes the following steps:

  1. Amazon DocumentDB holds the Hudl statistics data.
  2. AWS DMS gives us a full export of statistics data from Amazon DocumentDB, and ongoing changes in the same data.
  3. In the S3 Raw Zone, the data is stored in JSON format.
  4. An AWS Glue job merges the initial load of statistics data with the changed statistics data to give a snapshot of statistics data in JSON format for reference, eliminating duplicates.
  5. In the S3 Cleansed Zone, the JSON data is normalized and converted to Parquet format.
  6. AWS Glue uses a COPY command to insert Parquet data into Amazon Redshift consumption base tables.
  7. Amazon Redshift stores the final table for consumption.

The following is a sample code snippet from the AWS Glue job in the initial data pipeline:

from awsglue.context import GlueContext 
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate() 
spark_context = spark.sparkContext 
gc = GlueContext(spark_context)
   full_df = read_full_data()#Load entire dataset from S3 Cleansed Zone


cdc_df = read_cdc_data() # Read new CDC data which represents delta in the source MongoDB/DocumentDB


joined_df = full_df.join(cdc_df, '_id', 'full_outer') #Calculate final snapshot by joining the existing data with delta


result = joined_df.filter((joined_df.Op != 'D') | (joined_df.Op.isNull())) .select(coalesce(cdc_df._doc, full_df._doc).alias('_doc'))

gc.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(result, gc) , connection_type = "s3", connection_options = {"path": output_path}, format = "parquet", transformation_ctx = "ctx4")

Challenges

Although this initial solution met our need for data quality, we felt there was room for improvement:

  • The pipeline was slow – The pipeline ran slowly (over 2 hours) because for each batch, the whole dataset was compared. Every record had to be compared, flattened, and converted to Parquet, even when only a few records were changed from the previous daily run.
  • The pipeline was expensive – As the data size grew daily, the job duration also grew significantly (especially in step 4). To mitigate the impact, we needed to allocate more AWS Glue DPUs (Data Processing Units) to scale the job, which led to higher cost.
  • The pipeline limited our ability to scale – Hudl’s data has a long history of rapid growth with increasing customers and sporting events. Given this trend, our pipeline needed to run as efficiently as possible to handle only changing datasets to have predictable performance.

New design

The following diagram illustrates our updated pipeline architecture.

Although the overall architecture looks roughly the same, the internal logic in AWS Glue was significantly changed, along with addition of Apache Hudi datasets.

In step 4, AWS Glue now interacts with Apache HUDI datasets in the S3 Cleansed Zone to upsert or delete changed records as identified by AWS DMS CDC. The AWS Glue to Apache Hudi connector helps convert JSON data to Parquet format and upserts into the Apache HUDI dataset. Retaining the full documents in our Apache HUDI dataset allows us to easily make schema changes to our final Amazon Redshift tables without needing to re-export data from our source systems.

The following is a sample code snippet from the new AWS Glue pipeline:

from awsglue.context import GlueContext 
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate() 
spark_context = spark.sparkContext 
gc = GlueContext(spark_context)

upsert_conf = {'className': 'org.apache.hudi', '
hoodie.datasource.hive_sync.use_jdbc': 'false', 
'hoodie.datasource.write.precombine.field': 'write_ts', 
'hoodie.datasource.write.recordkey.field': '_id', 
'hoodie.table.name': 'glue_table', 
'hoodie.consistency.check.enabled': 'true', 
'hoodie.datasource.hive_sync.database': 'glue_database', 'hoodie.datasource.hive_sync.table': 'glue_table', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.datasource.hive_sync.sync_as_datasource': 'false', 
'path': 's3://bucket/prefix/', 'hoodie.compact.inline': 'false', 'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.NonPartitionedExtractor, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.upsert.shuffle.parallelism': 200, 
'hoodie.datasource.write.operation': 'upsert', 
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
'hoodie.cleaner.commits.retained': 10 }

gc.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(cdc_upserts_df, gc, "cdc_upserts_df"), connection_type="marketplace.spark", connection_options=upsert_conf)

Results

With this new approach using Apache Hudi datasets with AWS Glue deployed after May 2022, the pipeline runtime was predictable and less expensive than the initial approach. Because we only handled new or modified records by eliminating the full outer join over the entire dataset, we saw an 80–90% reduction in runtime for this pipeline, thereby reducing costs by 80–90% compared to the initial approach. The following diagram illustrates our processing time before and after implementing the new pipeline.

Conclusion

With Apache Hudi’s open-source data management framework, we simplified incremental data processing in our AWS Glue data pipeline to manage data changes at the record level in our S3 data lake with CDC from Amazon DocumentDB.

We hope that this post will inspire your organization to build AWS Glue pipelines with Apache Hudi datasets that reduce cost and bring performance improvements using serverless technologies to achieve your business goals.


About the authors

Addison Higley is a Senior Data Engineer at Hudl. He manages over 20 data pipelines to help ensure data is available for analytics so Hudl can deliver insights to customers.

Ramzi Yassine is a Lead Data Engineer at Hudl. He leads the architecture, implementation of Hudl’s data pipelines and data applications, and ensures that our data empowers internal and external analytics.

Swagat Kulkarni is a Senior Solutions Architect at AWS and an AI/ML enthusiast. He is passionate about solving real-world problems for customers with cloud-native services and machine learning. Swagat has over 15 years of experience delivering several digital transformation initiatives for customers across multiple domains, including retail, travel and hospitality, and healthcare. Outside of work, Swagat enjoys travel, reading, and meditating.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

How SOCAR built a streaming data pipeline to process IoT data for real-time analytics and control

Post Syndicated from DoYeun Kim original https://aws.amazon.com/blogs/big-data/how-socar-built-a-streaming-data-pipeline-to-process-iot-data-for-real-time-analytics-and-control/

SOCAR is the leading Korean mobility company with strong competitiveness in car-sharing. SOCAR has become a comprehensive mobility platform in collaboration with Nine2One, an e-bike sharing service, and Modu Company, an online parking platform. Backed by advanced technology and data, SOCAR solves mobility-related social problems, such as parking difficulties and traffic congestion, and changes the car ownership-oriented mobility habits in Korea.

SOCAR is building a new fleet management system to manage the many actions and processes that must occur in order for fleet vehicles to run on time, within budget, and at maximum efficiency. To achieve this, SOCAR is looking to build a highly scalable data platform using AWS services to collect, process, store, and analyze internet of things (IoT) streaming data from various vehicle devices and historical operational data.

This in-car device data, combined with operational data such as car details and reservation details, will provide a foundation for analytics use cases. For example, SOCAR will be able to notify customers if they have forgotten to turn their headlights off or to schedule a service if a battery is running low. Unfortunately, the previous architecture didn’t enable the enrichment of IoT data with operational data and couldn’t support streaming analytics use cases.

AWS Data Lab offers accelerated, joint-engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. The Build Lab is a 2–5-day intensive build with a technical customer team.

In this post, we share how SOCAR engaged the Data Lab program to assist them in building a prototype solution to overcome these challenges, and to build the basis for accelerating their data project.

Use case 1: Streaming data analytics and real-time control

SOCAR wanted to utilize IoT data for a new business initiative. A fleet management system, where data comes from IoT devices in the vehicles, is a key input to drive business decisions and derive insights. This data is captured by AWS IoT and sent to Amazon Managed Streaming for Apache Kafka (Amazon MSK). By joining the IoT data to other operational datasets, including reservations, car information, device information, and others, the solution can support a number of functions across SOCAR’s business.

An example of real-time monitoring is when a customer turns off the car engine and closes the car door, but the headlights are still on. By using IoT data related to the car light, door, and engine, a notification is sent to the customer to inform them that the car headlights should be turned off.

Although this real-time control is important, they also want to collect historical data—both raw and curated data—in Amazon Simple Storage Service (Amazon S3) to support historical analytics and visualizations by using Amazon QuickSight.

Use case 2: Detect table schema change

The first challenge SOCAR faced was existing batch ingestion pipelines that were prone to breaking when schema changes occurred in the source systems. Additionally, these pipelines didn’t deliver data in a way that was easy for business analysts to consume. In order to meet the future data volumes and business requirements, they needed a pattern for the automated monitoring of batch pipelines with notification of schema changes and the ability to continue processing.

The second challenge was related to the complexity of the JSON files being ingested. The existing batch pipelines weren’t flattening the five-level nested structure, which made it difficult for business users and analysts to gain business insights without any effort on their end.

Overview of solution

In this solution, we followed the serverless data architecture to establish a data platform for SOCAR. This serverless architecture allowed SOCAR to run data pipelines continuously and scale automatically with no setup cost and without managing servers.

AWS Glue is used for both the streaming and batch data pipelines. Amazon Kinesis Data Analytics is used to deliver streaming data with subsecond latencies. In terms of storage, data is stored in Amazon S3 for historical data analysis, auditing, and backup. However, when frequent reading of the latest snapshot data is required by multiple users and applications concurrently, the data is stored and read from Amazon DynamoDB tables. DynamoDB is a key-value and document database that can support tables of virtually any size with horizontal scaling.

Let’s discuss the components of the solution in detail before walking through the steps of the entire data flow.

Component 1: Processing IoT streaming data with business data

The first data pipeline (see the following diagram) processes IoT streaming data with business data from an Amazon Aurora MySQL-Compatible Edition database.

Whenever a transaction occurs in two tables in the Aurora MySQL database, this transaction is captured as data and then loaded into two MSK topics via AWS Database Management (AWS DMS) tasks. One topic conveys the car information table, and the other topic is for the device information table. This data is loaded into a single DynamoDB table that contains all the attributes (or columns) that exist in the two tables in the Aurora MySQL database, along with a primary key. This single DynamoDB table contains the latest snapshot data from the two DB tables, and is important because it contains the latest information of all the cars and devices for the lookup against the streaming IoT data. If the lookup were done on the database directly with the streaming data, it would impact the production database performance.

When the snapshot is available in DynamoDB, an AWS Glue streaming job runs continuously to collect the IoT data and join it with the latest snapshot data in the DynamoDB table to produce the up-to-date output, which is written into another DynamoDB table.

The up-to-date data in DynamoDB is used for real-time monitoring and control that SOCAR’s Data Analytics team performs for safety maintenance and fleet management. This data is ultimately consumed by a number of apps to perform various business activities, including route optimization, real-time monitoring for oil consumption and temperature, and to identify a driver’s driving pattern, tire wear and defect detection, and real-time car crash notifications.

Component 2: Processing IoT data and visualizing the data in dashboards

The second data pipeline (see the following diagram) batch processes the IoT data and visualizes it in QuickSight dashboards.

There are two data sources. The first is the Aurora MySQL database. The two database tables are exported into Amazon S3 from the Aurora MySQL cluster and registered in the AWS Glue Data Catalog as tables. The second data source is Amazon MSK, which receives streaming data from AWS IoT Core. This requires you to create a secure AWS Glue connection for an Apache Kafka data stream. SOCAR’s MSK cluster requires SASL_SSL as a security protocol (for more information, refer to Authentication and authorization for Apache Kafka APIs). To create an MSK connection in AWS Glue and set up connectivity, we use the following CLI command:

aws glue create-connection —connection-input
'{"Name":"kafka-connection","Description":"kafka connection example",
"ConnectionType":"KAFKA",
"ConnectionProperties":{
"KAFKA_BOOTSTRAP_SERVERS":"<server-ip-addresses>",
"KAFKA_SSL_ENABLED":"true",
// "KAFKA_CUSTOM_CERT": "s3://bucket/prefix/cert.pem",
"KAFKA_SECURITY_PROTOCOL" : "SASL_SSL",
"KAFKA_SKIP_CUSTOM_CERT_VALIDATION":"false",
"KAFKA_SASL_MECHANISM": "SCRAM-SHA-512",
"KAFKA_SASL_SCRAM_USERNAME": "<username>",
"KAFKA_SASL_SCRAM_PASSWORD: "<password>"
},
"PhysicalConnectionRequirements":
{"SubnetId":"subnet-xxx","SecurityGroupIdList":["sg-xxx"],"AvailabilityZone":"us-east-1a"}}'

Component 3: Real-time control

The third data pipeline processes the streaming IoT data in millisecond latency from Amazon MSK to produce the output in DynamoDB, and sends a notification in real time if any records are identified as an outlier based on business rules.

AWS IoT Core provides integrations with Amazon MSK to set up real-time streaming data pipelines. To do so, complete the following steps:

  1. On the AWS IoT Core console, choose Act in the navigation pane.
  2. Choose Rules, and create a new rule.
  3. For Actions, choose Add action and choose Kafka.
  4. Choose the VPC destination if required.
  5. Specify the Kafka topic.
  6. Specify the TLS bootstrap servers of your Amazon MSK cluster.

You can view the bootstrap server URLs in the client information of your MSK cluster details. The AWS IoT rule was created with the Kafka topic as an action to provide data from AWS IoT Core to Kafka topics.

SOCAR used Amazon Kinesis Data Analytics Studio to analyze streaming data in real time and build stream-processing applications using standard SQL and Python. We created one table from the Kafka topic using the following code:

CREATE TABLE table_name (
column_name1 VARCHAR,
column_name2 VARCHAR(100),
column_name3 VARCHAR,
column_name4 as TO_TIMESTAMP (`time_column`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR column AS column -INTERVAL '5' SECOND
)
PARTITIONED BY (column_name5)
WITH (
'connector'= 'kafka',
'topic' = 'topic_name',
'properties.bootstrap.servers' = '<bootstrap servers shown in the MSK client info dialog>',
'format' = 'json',
'properties.group.id' = 'testGroup1',
'scan.startup.mode'= 'earliest-offset'
);

Then we applied a query with business logic to identify a particular set of records that need to be alerted. When this data is loaded back into another Kafka topic, AWS Lambda functions trigger the downstream action: either load the data into a DynamoDB table or send an email notification.

Component 4: Flattening the nested structure JSON and monitoring schema changes

The final data pipeline (see the following diagram) processes complex, semi-structured, and nested JSON files.

This step uses an AWS Glue DynamicFrame to flatten the nested structure and then land the output in Amazon S3. After the data is loaded, it’s scanned by an AWS Glue crawler to update the Data Catalog table and detect any changes in the schema.

Data flow: Putting it all together

The following diagram illustrates our complete data flow with each component.

Let’s walk through the steps of each pipeline.

The first data pipeline (in red) processes the IoT streaming data with the Aurora MySQL business data:

  1. AWS DMS is used for ongoing replication to continuously apply source changes to the target with minimal latency. The source includes two tables in the Aurora MySQL database tables (carinfo and deviceinfo), and each is linked to two MSK topics via AWS DMS tasks.
  2. Amazon MSK triggers a Lambda function, so whenever a topic receives data, a Lambda function runs to load data into DynamoDB table.
  3. There is a single DynamoDB table with columns that exist from the carinfo table and the deviceinfo table of the Aurora MySQL database. This table consists of all the data from two tables and stores the latest data by performing an upsert operation.
  4. An AWS Glue job continuously receives the IoT data and joins it with data in the DynamoDB table to produce the output into another DynamoDB target table.
  5. This target table contains the final data, which includes all the device and car status information from the IoT devices as well as metadata from the Aurora MySQL table.

The second data pipeline (in green) batch processes IoT data to use in dashboards and for visualization:

  1. The car and reservation data (in two DB tables) is exported via a SQL command from the Aurora MySQL database with the output data available in an S3 bucket. The folders that contain data are registered as an S3 location for the AWS Glue crawler and become available via the AWS Glue Data Catalog.
  2. The MSK input topic continuously receives data from AWS IoT. Each car has a number of IoT devices, and each device captures data and sends it to an MSK input topic. The Amazon MSK S3 sink connector is configured to export data from Kafka topics to Amazon S3 in JSON formats. In addition, the S3 connector exports data by guaranteeing exactly-once delivery semantics to consumers of the S3 objects it produces.
  3. The AWS Glue job runs in a daily batch to load the historical IoT data into Amazon S3 and into two tables (refer to step 1) to produce the output data in an Enriched folder in Amazon S3.
  4. Amazon Athena is used to query data from Amazon S3 and make it available as a dataset in QuickSight for visualizing historical data.

The third data pipeline (in blue) processes streaming IoT data from Amazon MSK with millisecond latency to produce the output in DynamoDB and send a notification:

  1. An Amazon Kinesis Data Analytics Studio notebook powered by Apache Zeppelin and Apache Flink is used to build and deploy its output as a Kinesis Data Analytics application. This application loads data from Amazon MSK in real time, and users can apply business logic to select particular events coming from the IoT real-time data, for example, the car engine is off and the doors are closed, but the headlights are still on. The particular event that users want to capture can be sent to another MSK topic (Outlier) via the Kinesis Data Analytics application.
  2. Amazon MSK triggers a Lambda function, so whenever a topic receives data, a Lambda function runs to send an email notification to users that are subscribed to an Amazon Simple Notification Service (Amazon SNS) topic. An email is published using an SNS notification.
  3. The Kinesis Data Analytics application loads data from AWS IoT, applies business logic, and then loads it into another MSK topic (output). Amazon MSK triggers a Lambda function when data is received, which loads data into a DynamoDB Append table.
  4. Amazon Kinesis Data Analytics Studio is used to run SQL commands for ad hoc interactive analysis on streaming data.

The final data pipeline (in yellow) processes complex, semi-structured, and nested JSON files, and sends a notification when a schema evolves.

  1. An AWS Glue job runs and reads the JSON data from Amazon S3 (as a source), applies logic to flatten the nested schema using a DynamicFrame, and pivots out array columns from the flattened frame.
  2. The output is stored in Amazon S3 and is automatically registered to the AWS Glue Data Catalog table.
  3. Whenever there is a new attribute or change in the JSON input data at any level in the nested structure, the new attribute and change are captured in Amazon EventBridge as an event from the AWS Glue Data Catalog. An email notification is published using Amazon SNS.

Conclusion

As a result of the four-day Build Lab, the SOCAR team left with a working prototype that is custom fit to their needs, gaining a clear path to production. The Data Lab allowed the SOCAR team to build a new streaming data pipeline, enrich IoT data with operational data, and enhance the existing data pipeline to process complex nested JSON data. This establishes a baseline architecture to support the new fleet management system beyond the car-sharing business.


About the Authors

DoYeun Kim is the Head of Data Engineering at SOCAR. He is a passionate software engineering professional with 19+ years experience. He leads a team of 10+ engineers who are responsible for the data platform, data warehouse and MLOps engineering, as well as building in-house data products.

SangSu Park is a Lead Data Architect in SOCAR’s cloud DB team. His passion is to keep learning, embrace challenges, and strive for mutual growth through communication. He loves to travel in search of new cities and places.

YoungMin Park is a Lead Architect in SOCAR’s cloud infrastructure team. His philosophy in life is-whatever it may be-to challenge, fail, learn, and share such experiences to build a better tomorrow for the world. He enjoys building expertise in various fields and basketball.

Younggu Yun is a Senior Data Lab Architect at AWS. He works with customers around the APAC region to help them achieve business goals and solve technical problems by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together. In his free time, his son and he are obsessed with Lego blocks to build creative models.

Vicky Falconer leads the AWS Data Lab program across APAC, offering accelerated joint engineering engagements between teams of customer builders and AWS technical resources to create tangible deliverables that accelerate data analytics modernization and machine learning initiatives.

California State University Chancellor’s Office reduces cost and improves efficiency using Amazon QuickSight for streamlined HR reporting in higher education

Post Syndicated from Madi Hsieh original https://aws.amazon.com/blogs/big-data/california-state-university-chancellors-office-reduces-cost-and-improves-efficiency-using-amazon-quicksight-for-streamlined-hr-reporting-in-higher-education/

The California State University Chancellor’s Office (CSUCO) sits at the center of America’s most significant and diverse 4-year universities. The California State University (CSU) serves approximately 477,000 students and employs more than 55,000 staff and faculty members across 23 universities and 7 off-campus centers. The CSU provides students with opportunities to develop intellectually and personally, and to contribute back to the communities throughout California. For this large organization, managing a wide system of campuses while maintaining the decentralized autonomy of each is crucial. In 2019, they needed a highly secure tool to streamline the process of pulling HR data. The CSU had been using a legacy central data warehouse based on data from their financial system, but it lacked the robustness to keep up with modern technology. This wasn’t going to work for their HR reporting needs.

Looking for a tool to match the cloud-based infrastructure of their other operations, the Business Intelligence and Data Operations (BI/DO) team within the Chancellor’s Office chose Amazon QuickSight, a fast, easy-to-use, cloud-powered business analytics service that makes it easy for all employees within an organization to build visualizations, perform ad hoc analysis, and quickly get business insights from their data, any time, on any device. The team uses QuickSight to organize HR information across the CSU, implementing a centralized security system.

“It’s easy to use, very straightforward, and relatively intuitive. When you couple the experience of using QuickSight, with a huge cost difference to [the BI platform we had been using], to me, it’s a simple choice,”

– Andy Sydnor, Director Business Intelligence and Data Operations at the CSUCO.

With QuickSight, the team has the capability to harness security measures and deliver data insights efficiently across their campuses.

In this post, we share how the CSUCO uses QuickSight to reduce cost and improve efficiency in their HR reporting.

Delivering BI insights across the CSU’s 23 universities

The CSUCO serves the university system’s faculty, students, and staff by overseeing operations in several areas, including finance, HR, student information, and space and facilities. Since migrating to QuickSight in 2019, the team has built dashboards to support these operations. Dashboards include COVID-related leaves of absence, historical financial reports, and employee training data, along with a large selection of dashboards to track employee data at an individual campus level or from a system-wide perspective.

The team created a process for reading security roles from the ERP system and then translating them using QuickSight groups for internal HR reporting. QuickSight allowed them to match security measures with the benefits of low maintenance and familiarity to their end-users.

With QuickSight, the CSUCO is able to run a decentralized security process where campus security teams can provision access directly and users can get to their data faster. Before transitioning to QuickSight, the BI/DO team spent hours trying to get to specific individual-level data, but with QuickSight, the retrieval time was shortened to just minutes. For the first time, Sydnor and his team were able to pinpoint a specific employee’s work history without having to take additional actions to find the exact data they needed.

Cost savings compared to other BI tools

Sydnor shares that, for a public organization, one of the most attractive qualities of QuickSight is the immense cost savings. The BI/DO team at the Chancellor’s Office estimates that they’re saving roughly 40% on costs since switching from their previous BI platform, which is a huge benefit for a public organization of this scale. Their previous BI tool was costing them extensive amounts of money on licensing for features they didn’t require; the CSUCO felt they weren’t getting the best use of their investment.

The functionality of QuickSight to meet their reporting needs at an affordable price point is what makes QuickSight the CSUCO’s preferred BI reporting tool. Sydnor likes that with QuickSight, “we don’t have to go out and buy a subscription or a license for somebody, we can just provision access. It’s much easier to distribute the product.” QuickSight allows the CSUCO to focus their budget in other areas rather than having to pay for charges by infrequent users.

Simple and intuitive interface

Getting started in QuickSight was a no-brainer for Sydnor and his team. As a public organization, the procurement process can be cumbersome, thereby slowing down valuable time for putting their data to action. As an existing AWS customer, the CSUCO could seamlessly integrate QuickSight into their package of AWS services. An issue they were running into with other BI tools was encountering roadblocks to setting up the system, which wasn’t an issue with QuickSight, because it’s a fully managed service that doesn’t require deploying any servers.

The following screenshot shows an example of the CSUCO security audit dashboard.

example of the CSUCO security audit dashboard.

Sydnor tells us, “Our previous BI tool had a huge library of visualization, but we don’t need 95% of those. Our presentations look great with the breadth of visuals QuickSight provides. Most people just want the data and ultimately, need a robust vehicle to get data out of a database and onto a table or visualization.”

Converting from their original BI tool to QuickSight was painless for his team. Sydnor tells us that he has “yet to see something we can’t do with QuickSight.” One of Sydnor’s employees who was a user of the previous tool learned QuickSight in just 30 minutes. Now, they conduct QuickSight demos all the time.

Looking to the future: Expanding BI integration and adopting Amazon QuickSight Q

With QuickSight, the Chancellor’s Office aims to roll out more HR dashboards across its campuses and extend the tool for faculty use in the classroom. In the upcoming year, two campuses are joining CSUCO in building their own HR reporting dashboards through QuickSight. The organization is also making plans to use QuickSight to report on student data and implement external-facing dashboards. Some of the data points they’re excited to explore are insights into at-risk students and classroom scheduling on campus.

Thinking ahead, CSUCO is considering Amazon QuickSight Q, a machine learning-powered natural language capability that gives anyone in an organization the ability to ask business questions in natural language and receive accurate answers with relevant visualizations. Sydnor says, “How cool would that be if professors could go in and ask simple, straightforward questions like, ‘How many of my department’s students are taking full course loads this semester?’ It has a lot of potential.”

Summary

The CSUCO is excited to be a champion of QuickSight in the CSU, and are looking for ways to increase its implementation across their organization in the future.

To learn more, visit the website for the California State University Chancellor’s Office. For more on QuickSight, visit the Amazon QuickSight product page, or browse other Big Data Blog posts featuring QuickSight.


About the authors

Madi Hsieh, AWS 2022 Summer Intern, UCLA.

Tina Kelleher, Program Manager at AWS.