Tag Archives: AWS Big Data

How HPE Aruba Supply Chain optimized cost and performance by migrating to an AWS modern data architecture

Post Syndicated from Hardeep Randhawa original https://aws.amazon.com/blogs/big-data/how-hpe-aruba-supply-chain-optimized-cost-and-performance-by-migrating-to-an-aws-modern-data-architecture/

This blog post is co-written with Hardeep Randhawa and Abhay Kumar from HPE.

HPE Aruba Networking, formerly known as Aruba Networks, is a Santa Clara, California-based security and networking subsidiary of Hewlett Packard Enterprise company. HPE Aruba Networking is the industry leader in wired, wireless, and network security solutions. Hewlett-Packard acquired Aruba Networks in 2015, making it a wireless networking subsidiary with a wide range of next-generation network access solutions.

Aruba offers networking hardware like access points, switches, routers, software, security devices, and Internet of Things (IoT) products. Their large inventory requires extensive supply chain management to source parts, make products, and distribute them globally. This complex process involves suppliers, logistics, quality control, and delivery.

This post describes how HPE Aruba automated their Supply Chain management pipeline, and re-architected and deployed their data solution by adopting a modern data architecture on AWS.

Challenges with the on-premises solution

As the demand surged with time, it was imperative that Aruba build a sophisticated and powerful supply chain solution that could help them scale operations, enhance visibility, improve predictability, elevate customer experience, and drive sustainability. To achieve their vision of a modern, scalable, resilient, secure, and cost-efficient architecture, they chose AWS as their trusted partner due to the range of low-cost, scalable, and reliable cloud services they offer.

Through a commitment to cutting-edge technologies and a relentless pursuit of quality, HPE Aruba designed this next-generation solution as a cloud-based cross-functional supply chain workflow and analytics tool. The application supports custom workflows to allow demand and supply planning teams to collaborate, plan, source, and fulfill customer orders, then track fulfillment metrics via persona-based operational and management reports and dashboards. This also includes building an industry standard integrated data repository as a single source of truth, operational reporting through real time metrics, data quality monitoring, 24/7 helpdesk, and revenue forecasting through financial projections and supply availability projections. Overall, this new solution has empowered HPE teams with persona-based access to 10 full-scale business intelligence (BI) dashboards and over 350 report views across demand and supply planning, inventory and order management, SKU dashboards, deal management, case management, backlog views, and big deal trackers.

Overview of the solution

This post describes how HPE Aruba automated their supply chain management pipeline, starting from data migration from varied data sources into a centralized Amazon Simple Storage Service (Amazon S3) based storage to building their data warehouse on Amazon Redshift with the publication layer built on a third-party BI tool and user interface using ReactJS.

The following diagram illustrates the solution architecture.

https://admin.pulse.aws/survey/Survey-2khLQ3YQTeQ1k3VcjAFdn5UsCYb/

In the following sections, we go through the key components in the diagram in more detail:

  1. Source systems
  2. Data migration
  3. Regional distribution
  4. Orchestration
  5. File processing
  6. Data quality checks
  7. Archiving processed files
  8. Copying to Amazon Redshift
  9. Running stored procedures
  10. UI integration
  11. Code Deployment
  12. Security & Encryption
  13. Data Consumption
  14. Final Steps

1. Source systems

Aruba’s source repository includes data from three different operating regions in AMER, EMEA, and APJ, along with one worldwide (WW) data pipeline from varied sources like SAP S/4 HANA, Salesforce, Enterprise Data Warehouse (EDW), Enterprise Analytics Platform (EAP) SharePoint, and more. The data sources include 150+ files including 10-15 mandatory files per region ingested in various formats like xlxs, csv, and dat. Aruba’s data governance guidelines required that they use a single centralized tool that could securely and cost-effectively review all source files with multiple formats, sizes, and ingestion times for compliance before exporting them out of the HPE environment. To achieve this, Aruba first copied the respective files to a centralized on-premises staging layer.

2. Data migration

Aruba chose AWS Transfer Family for SFTP for secure and efficient file transfers from an on-premises staging layer to an Amazon S3 based landing zone. AWS Transfer Family seamlessly integrates with other AWS services, automates transfer, and makes sure data is protected with encryption and access controls. To prevent deduplication issues and maintain data integrity, Aruba customized these data transfer jobs to make sure previous transfers are complete before copying the next set of files.

3. Regional distribution

On average, Aruba transfers approximately 100 files, with total size ranging from 1.5–2 GB into the landing zone daily. The data volume increases each Monday with the weekly file loads and at the beginning of each month with the monthly file loads. These files follow the same naming pattern, with a daily system-generated timestamp appended to each file name. Each file arrives as a pair with a tail metadata file in CSV format containing the size and name of the file. This metadata file is later used to read source file names during processing into the staging layer.

The source data contains files from three different operating Regions and one worldwide pipeline that needs to be processed per local time zones. Therefore, separating the files and running a distinct pipeline for each was necessary to decouple and enhance failure tolerance. To achieve this, Aruba used Amazon S3 Event Notifications. With each file uploaded to Amazon S3, an Amazon S3 PUT event invokes an AWS Lambda function that distributes the source and the metadata files Region-wise and loads them into the respective Regional landing zone S3 bucket. To map the file with the respective Region, this Lambda function uses Region-to-file mapping stored in a configuration table in Amazon Aurora PostgreSQL-Compatible Edition.

4. Orchestration

The next requirement was to set up orchestration for the data pipeline to seamlessly implement the required logic on the source files to extract meaningful data. Aruba chose AWS Step Functions for orchestrating and automating their extract, transform, and load (ETL) processes to run on a fixed schedule. In addition, they use AWS Glue jobs for orchestrating validation jobs and moving data through the data warehouse.

They used Step Functions with Lambda and AWS Glue for automated orchestration to minimize the cloud solution deployment timeline by reusing the on-premises code base, where possible. The prior on-premises data pipeline was orchestrated using Python scripts. Therefore, integrating the existing scripts with Lambda within Step Functions and AWS Glue helped accelerate their deployment timeline on AWS.

5. File processing

With each pipeline running at 5:00 AM local time, the data is further validated, processed, and then moved to the processing zone folder in the same S3 bucket. Unsuccessful file validation results in the source files being moved to the reject zone S3 bucket directory. The following file validations are run by the Lambda functions invoked by the Step Functions workflow:

  • The Lambda function validates if the tail file is available with the corresponding source data file. When each complete file pair lands in the Regional landing zone, the Step Functions workflow considers the source file transfer as complete.
  • By reading the metadata file, the file validation function validates that the names and sizes of the files that land in the Regional landing zone S3 bucket match with the files on the HPE on-premises server.

6. Data quality checks

When the files land in the processing zone, the Step Functions workflow invokes another Lambda function that converts the raw files to CSV format followed by stringent data quality checks. The final validated CSV files are loaded into the temp raw zone S3 folder.

The data quality (DQ) checks are managed using DQ configurations stored in Aurora PostgreSQL tables. Some examples of DQ checks include duplicate data check, null value check, and date format check. The DQ processing is managed through AWS Glue jobs, which are invoked by Lambda functions from within the Step Functions workflow. A number of data processing logics are also integrated in the DQ flow, such as the following:

  • Flag-based deduplication – For specific files, when a flag managed in the Aurora configuration table is enabled, the process removes duplicates before processing the data
  • Pre-set values replacing nulls – Similarly, a preset value of 1 or 0 would imply a NULL in the source data based on the value set in the configuration table

7. Archiving processed files

When the CSV conversion is complete, the original raw files in the processing zone S3 folder are archived for 6 months in the archive zone S3 bucket folder. After 6 months, the files on AWS are deleted, with the original raw files retained in the HPE source system.

8. Copying to Amazon Redshift

When the data quality checks and data processing are complete, the data is loaded from the S3 temp raw zone into the curated zone on an Redshift provisioned cluster, using the COPY command feature.

9. Running stored procedures

From the curated zone, they use AWS Glue jobs, where the Redshift stored procedures are orchestrated to load the data from the curated zone into the Redshift publish zone. The Redshift publish zone is a different set of tables in the same Redshift provisioned cluster. The Redshift stored procedures process and load the data into fact and dimension tables in a star schema.

10. UI integration

Amazon OpenSearch Service is also integrated with the flow for publishing mass notifications to the end-users through the user interface (UI). The users can also send messages and post updates via the UI with the OpenSearch Service integration.

11. Code Deployment

Aruba uses AWS CodeCommit and AWS CodePipeline to deploy and manage a bi-monthly code release cycle, the frequency for which can be increased on-demand as per deployment needs. The release happens across four environments – Development, Testing, UAT and Production – deployed through DevOps discipline, thus enabling shorter turnaround time to ever-changing user requirements and upstream data source changes.

12. Security & Encryption

User access to the Aruba SC360 portal is managed via SSO with MFA authentication and data security managed via direct integration of the AWS solution with HPE IT’s unified access management API. All the data pipelines between HPE on-premises sources and S3 are encrypted for enhanced security.

13. Data Consumption

Aruba SC360 application provides a ‘Private Space’ feature to other BI/Analytics teams within HPE to run and manage their own data ingestion pipeline. This has been built using Amazon Redshift data sharing feature, which has enabled Aruba to securely share access to live data in their Amazon Redshift cluster, without manually moving or copying the data. Thus, the HPE internal teams could build their own data workloads on core Aruba SC360 data while maintaining data security and code isolation.

14. Final Steps

The data is finally fetched into the publication layer, which consists of a ReactJS-based user interface accessing the data in the Amazon publish zone using Spring Boot REST APIs. Along with data from the Redshift data warehouse, notifications updated in the OpenSearch Service tables are also fetched and loaded into the UI. Amazon Aurora PostgreSQL is used to maintain the configuration values for populating the UI. To build BI dashboards, Aruba opted to continue using their existing third-party BI tool due to its familiarity among internal teams.

Conclusion

In this post, we showed you how HPE Aruba Supply Chain successfully re-architected and deployed their data solution by adopting a modern data architecture on AWS.

The new solution has helped Aruba integrate data from multiple sources, along with optimizing their cost, performance, and scalability. This has also allowed the Aruba Supply Chain leadership to receive in-depth and timely insights for better decision-making, thereby elevating the customer experience.

To learn more about the AWS services used to build modern data solutions on AWS, refer to the AWS public documentation and stay up to date through the AWS Big Data Blog.


About the authors

Hardeep Randhawa is a Senior Manager – Big Data & Analytics, Solution Architecture at HPE, recognized for stewarding enterprise-scale programs and deployments. He has led a recent Big Data EAP (Enterprise Analytics Platform) build with one of the largest global SAP HANA/S4 implementations at HPE.

Abhay Kumar is a Lead Data Engineer in Aruba Supply Chain Analytics and manages the Cloud Infrastructure for the Application at HPE. With 11+ years of experience in the IT industry domains like banking, supply chain and Abhay has a strong background in Cloud Technologies, Data Analytics, Data Management, and Big Data systems. In his spare time, he likes reading, exploring new places and watching movies.

Ritesh Chaman is a Senior Technical Account Manager at Amazon Web Services. With 14 years of experience in the IT industry, Ritesh has a strong background in Data Analytics, Data Management, Big Data systems and Machine Learning. In his spare time, he loves cooking, watching sci-fi movies, and playing sports.

Sushmita Barthakur is a Senior Solutions Architect at Amazon Web Services, supporting Enterprise customers architect their workloads on AWS. With a strong background in Data Analytics and Data Management, she has extensive experience helping customers architect and build Business Intelligence and Analytics Solutions, both on-premises and the cloud. Sushmita is based out of Tampa, FL and enjoys traveling, reading and playing tennis.

Amazon EMR 7.1 runtime for Apache Spark and Iceberg can run Spark workloads 2.7 times faster than Apache Spark 3.5.1 and Iceberg 1.5.2

Post Syndicated from Hari Kishore Chaparala original https://aws.amazon.com/blogs/big-data/amazon-emr-7-1-runtime-for-apache-spark-and-iceberg-can-run-spark-workloads-2-7-times-faster-than-apache-spark-3-5-1-and-iceberg-1-5-2/

In this post, we explore the performance benefits of using the Amazon EMR runtime for Apache Spark and Apache Iceberg compared to running the same workloads with open source Spark 3.5.1 on Iceberg tables. Iceberg is a popular open source high-performance format for large analytic tables. Our benchmarks demonstrate that Amazon EMR can run TPC-DS 3 TB workloads 2.7 times faster, reducing the runtime from 1.548 hours to 0.564 hours. Additionally, the cost efficiency improves by 2.2 times, with the total cost decreasing from $16.09 to $7.23 when using Amazon Elastic Compute Cloud (Amazon EC2) On-Demand r5d.4xlarge instances, providing observable gains for data processing tasks.

The Amazon EMR runtime for Apache Spark offers a high-performance runtime environment while maintaining 100% API compatibility with open source Spark and Iceberg table format. In Run Apache Spark 3.5.1 workloads 4.5 times faster with Amazon EMR runtime for Apache Spark, we detailed some of the optimizations, showing a runtime improvement of 4.5 times faster and 2.8 times better price-performance compared to open source Spark 3.5.1 on the TPC-DS 3 TB benchmark. However, many of the optimizations are geared towards DataSource V1, whereas Iceberg uses Spark DataSource V2. Recognizing this, we have focused on migrating some of the existing optimizations in the EMR runtime for Spark to DataSource V2 and introducing Iceberg-specific enhancements. These improvements are built on top of the Spark runtime enhancements on query planning, physical plan operator improvements, and optimizations with Amazon Simple Storage Service (Amazon S3) and the Java runtime. We have added eight new optimizations incrementally since the Amazon EMR 6.15 release in 2023, which are present in Amazon EMR 7.1 and turned on by default. Some of the improvements include the following:

  • Optimizing DataSource V2 in Spark:
    • Dynamic filtering on non-partitioned columns
    • Removing redundant broadcast hash joins
    • Partial hash aggregate pushdowns
    • Bloom filter-based joins
  • Iceberg-specific enhancements:
    • Data prefetch
    • Support for file size-based estimations

Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, and Amazon EMR on AWS Outposts all use the optimized runtimes. Refer to Working with Apache Iceberg in Amazon EMR and Best practices for optimizing Apache Iceberg workloads for more details.

Benchmark results for Amazon EMR 7.1 vs. open source Spark 3.5.1 and Iceberg 1.5.2

To assess the Spark engine’s performance with the Iceberg table format, we performed benchmark tests using the 3 TB TPC-DS dataset, version 2.13 (our results derived from the TPC-DS dataset are not directly comparable to the official TPC-DS results due to setup differences). Benchmark tests for the EMR runtime for Spark and Iceberg were conducted on Amazon EMR 7.1 clusters with Spark 3.5.0 and Iceberg 1.4.3-amzn-0 versions, and open source Spark 3.5.1 and Iceberg 1.5.2 was deployed on EC2 clusters designated for open source runs.

The setup instructions and technical details are available in our GitHub repository. To minimize the influence of external catalogs like AWS Glue and Hive, we used the Hadoop catalog for the Iceberg tables. This uses the underlying file system, specifically Amazon S3, as the catalog. We can define this setup by configuring the property spark.sql.catalog.<catalog_name>.type. The fact tables used the default partitioning by the date column, which have a number of partitions varying from 200–2,100. No precalculated statistics were used for these tables.

We ran a total of 104 SparkSQL queries in three sequential rounds, and the average runtime of each query across these rounds was taken for comparison. The average runtime for the three rounds on Amazon EMR 7.1 with Iceberg enabled was 0.56 hours, demonstrating a 2.7-fold speed increase compared to open source Spark 3.5.1 and Iceberg 1.5.2. The following figure presents the total runtimes in seconds.

The following table summarizes the metrics.

Metric Amazon EMR 7.1 on EC2 Open Source Spark 3.5.1 and Iceberg 1.5.2
Average runtime in seconds 2033.17 5575.19
Geometric mean over queries in seconds 10.13153 20.34651
Cost* $7.23 $16.09

*Detailed cost estimates are discussed later in this post.

The following chart demonstrates the per-query performance improvement of Amazon EMR 7.1 relative to open source Spark 3.5.1 and Iceberg 1.5.2. The extent of the speedup varies from one query to another, ranging from 9.6 times faster for q93 to 1.04 times faster for q34, with Amazon EMR outperforming the open source Spark with Iceberg tables. The horizontal axis arranges the TPC-DS 3 TB benchmark queries in descending order based on the performance improvement seen with Amazon EMR, and the vertical axis depicts the magnitude of this speedup in seconds.

Cost comparison

Our benchmark provides the total runtime and geometric mean data to assess the performance of Spark and Iceberg in a complex, real-world decision support scenario. For additional insights, we also examine the cost aspect. We calculate cost estimates using formulas that account for EC2 On-Demand instances, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR expenses.

  • Amazon EC2 cost (includes SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • 4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • 4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

The calculations reveal that the Amazon EMR 7.1 benchmark yields a 2.2-fold cost efficiency improvement over open source Spark 3.5.1 and Iceberg 1.5.2 in running the benchmark job.

Metric Amazon EMR 7.1 Open Source Spark 3.5.1 and Iceberg 1.5.2
Runtime in hours 0.564 1.548
Number of EC2 instances 9 9
Amazon EBS Size 20gb 20gb
Amazon EC2 cost $5.85 $16.05
Amazon EBS cost $0.01 $0.04
Amazon EMR cost $1.37 $0
Total cost $7.23 $16.09
Cost savings Amazon EMR 7.1 is 2.2 times better Baseline

In addition to the time-based metrics discussed so far, data from Spark event logs shows that Amazon EMR 7.1 scanned approximately 3.4 times less data from Amazon S3 and 4.1 times fewer records than the open source version in the TPC-DS 3 TB benchmark. This reduction in Amazon S3 data scanning contributes directly to cost savings for Amazon EMR workloads.

Run open source Spark benchmarks on Iceberg tables

We used separate EC2 clusters, each equipped with nine r5d.4xlarge instances, for testing both open source Spark 3.5.1 and Iceberg 1.5.2 and Amazon EMR 7.1. The primary node was equipped with 16 vCPU and 128 GB of memory, and the eight worker nodes together had 128 vCPU and 1024 GB of memory. We conducted tests using the Amazon EMR default settings to showcase the typical user experience and minimally adjusted the settings of Spark and Iceberg to maintain a balanced comparison.

The following table summarizes the Amazon EC2 configurations for the primary node and eight worker nodes of type r5d.4xlarge.

EC2 Instance vCPU Memory (GiB) Instance Storage (GB) EBS Root Volume (GB)
r5d.4xlarge 16 128 2 x 300 NVMe SSD 20 GB

Prerequisites

The following prerequisites are required to run the benchmarking:

  1. Using the instructions in the emr-spark-benchmark GitHub repo, set up the TPC-DS source data in your S3 bucket and on your local computer.
  2. Build the benchmark application following the steps provided in Steps to build spark-benchmark-assembly application and copy the benchmark application to your S3 bucket. Alternatively, copy spark-benchmark-assembly-3.5.1.jar to your S3 bucket.
  3. Create Iceberg tables from the TPC-DS source data. Follow the instructions on GitHub to create Iceberg tables using the Hadoop catalog. For example, the following code uses an EMR 7.1 cluster with Iceberg enabled to create the tables:
aws emr add-steps --cluster-id <cluster-id> --steps Type=Spark,Name="Create Iceberg Tables",
Args=[--class,com.amazonaws.eks.tpcds.CreateIcebergTables,
--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.hadoop_catalog.type=hadoop,
--conf,spark.sql.catalog.hadoop_catalog.warehouse=s3://<bucket>/<warehouse_path>/,
--conf,spark.sql.catalog.hadoop_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<bucket>/<jar_location>/spark-benchmark-assembly-3.5.1.jar,
s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/,
/home/hadoop/tpcds-kit/tools,parquet,3000,true,<database_name>,true,true],ActionOnFailure=CONTINUE 
--region <AWS region>

Note the Hadoop catalog warehouse location and database name from the preceding step. We use the same tables to run benchmarks with Amazon EMR 7.1 and open source Spark and Iceberg.

This benchmark application is built from the branch tpcds-v2.13_iceberg. If you’re building a new benchmark application, switch to the correct branch after downloading the source code from the GitHub repo.

Create and configure a YARN cluster on Amazon EC2

To compare Iceberg performance between Amazon EMR on Amazon EC2 and open source Spark on Amazon EC2, follow the instructions in the emr-spark-benchmark GitHub repo to create an open source Spark cluster on Amazon EC2 using Flintrock with eight worker nodes.

Based on the cluster selection for this test, the following configurations are used:

Run the TPC-DS benchmark with Apache Spark 3.5.1 and Iceberg 1.5.2

Complete the following steps to run the TPC-DS benchmark:

  1. Log in to the open source cluster primary using flintrock login $CLUSTER_NAME.
  2. Submit your Spark job:
    1. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables.
    2. The results are created in s3://<YOUR_S3_BUCKET>/benchmark_run.
    3. You can track progress in /media/ephemeral0/spark_run.log.
spark-submit \
--master yarn \
--deploy-mode client \
--class com.amazonaws.eks.tpcds.BenchmarkSQL \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=10g \
--conf spark.executor.cores=16 \
--conf spark.executor.memory=100g \
--conf spark.executor.instances=8 \
--conf spark.network.timeout=2000 \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.shuffle.service.enabled=false \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions   \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog    \
--conf spark.sql.catalog.local.type=hadoop  \
--conf spark.sql.catalog.local.warehouse=s3a://<YOUR_S3_BUCKET>/<warehouse_path>/ \
--conf spark.sql.defaultCatalog=local   \
--conf spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO   \
spark-benchmark-assembly-3.5.1.jar   \
s3://<YOUR_S3_BUCKET>/benchmark_run 3000 1 false  \
q1-v2.13,q10-v2.13,q11-v2.13,q12-v2.13,q13-v2.13,q14a-v2.13,q14b-v2.13,q15-v2.13,q16-v2.13,\
q17-v2.13,q18-v2.13,q19-v2.13,q2-v2.13,q20-v2.13,q21-v2.13,q22-v2.13,q23a-v2.13,q23b-v2.13,\
q24a-v2.13,q24b-v2.13,q25-v2.13,q26-v2.13,q27-v2.13,q28-v2.13,q29-v2.13,q3-v2.13,q30-v2.13,\
q31-v2.13,q32-v2.13,q33-v2.13,q34-v2.13,q35-v2.13,q36-v2.13,q37-v2.13,q38-v2.13,q39a-v2.13,\
q39b-v2.13,q4-v2.13,q40-v2.13,q41-v2.13,q42-v2.13,q43-v2.13,q44-v2.13,q45-v2.13,q46-v2.13,\
q47-v2.13,q48-v2.13,q49-v2.13,q5-v2.13,q50-v2.13,q51-v2.13,q52-v2.13,q53-v2.13,q54-v2.13,\
q55-v2.13,q56-v2.13,q57-v2.13,q58-v2.13,q59-v2.13,q6-v2.13,q60-v2.13,q61-v2.13,q62-v2.13,\
q63-v2.13,q64-v2.13,q65-v2.13,q66-v2.13,q67-v2.13,q68-v2.13,q69-v2.13,q7-v2.13,q70-v2.13,\
q71-v2.13,q72-v2.13,q73-v2.13,q74-v2.13,q75-v2.13,q76-v2.13,q77-v2.13,q78-v2.13,q79-v2.13,\
q8-v2.13,q80-v2.13,q81-v2.13,q82-v2.13,q83-v2.13,q84-v2.13,q85-v2.13,q86-v2.13,q87-v2.13,\
q88-v2.13,q89-v2.13,q9-v2.13,q90-v2.13,q91-v2.13,q92-v2.13,q93-v2.13,q94-v2.13,q95-v2.13,\
q96-v2.13,q97-v2.13,q98-v2.13,q99-v2.13,ss_max-v2.13    \
true <database> > /media/ephemeral0/spark_run.log 2>&1 &!

Summarize the results

After the Spark job finishes, retrieve the test result file from the output S3 bucket at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv. This can be done either through the Amazon S3 console by navigating to the specified bucket location or by using the Amazon Command Line Interface (AWS CLI). The Spark benchmark application organizes the data by creating a timestamp folder and placing a summary file within a folder labeled summary.csv. The output CSV files contain four columns without headers:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

With the data from three separate test runs with one iteration each time, we can calculate the average and geometric mean of the benchmark runtimes.

Run the TPC-DS benchmark with the EMR runtime for Spark

Most of the instructions are similar to Steps to run Spark Benchmarking with a few Iceberg-specific details.

Prerequisites

Complete the following prerequisite steps:

  1. Run aws configure to configure the AWS CLI shell to point to the benchmarking AWS account. Refer to Configure the AWS CLI for instructions.
  2. Upload the benchmark application JAR file to Amazon S3.

Deploy the EMR cluster and run the benchmark job

Complete the following steps to run the benchmark job:

  1. Use the AWS CLI command as shown in Deploy EMR on EC2 Cluster and run benchmark job to spin up an EMR on EC2 cluster. Make sure to enable Iceberg. See Create an Iceberg cluster for more details. Choose the correct Amazon EMR version, root volume size, and same resource configuration as the open source Flintrock setup. Refer to create-cluster for a detailed description of the AWS CLI options.
  2. Store the cluster ID from the response. We need this for the next step.
  3. Submit the benchmark job in Amazon EMR using add-steps from the AWS CLI:
    1. Replace <cluster ID> with the cluster ID from Step 2.
    2. The benchmark application is at s3://<your-bucket>/spark-benchmark-assembly-3.5.1.jar.
    3. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables. This should be the same as the one used for the open source TPC-DS benchmark run.
    4. The results will be in s3://<your-bucket>/benchmark_run.
aws emr add-steps   --cluster-id <cluster-id>
--steps Type=Spark,Name="SPARK Iceberg EMR TPCDS Benchmark Job",
Args=[--class,com.amazonaws.eks.tpcds.BenchmarkSQL,
--conf,spark.driver.cores=4,
--conf,spark.driver.memory=10g,
--conf,spark.executor.cores=16,
--conf,spark.executor.memory=100g,
--conf,spark.executor.instances=8,
--conf,spark.network.timeout=2000,
--conf,spark.executor.heartbeatInterval=300s,
--conf,spark.dynamicAllocation.enabled=false,
--conf,spark.shuffle.service.enabled=false,
--conf,spark.sql.iceberg.data-prefetch.enabled=true,
--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.local.type=hadoop,
--conf,spark.sql.catalog.local.warehouse=s3://<your-bucket>/<warehouse-path>,
--conf,spark.sql.defaultCatalog=local,
--conf,spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<your-bucket>/spark-benchmark-assembly-3.5.1.jar,
s3://<your-bucket>/benchmark_run,3000,1,false,
'q1-v2.13\,q10-v2.13\,q11-v2.13\,q12-v2.13\,q13-v2.13\,q14a-v2.13\,
q14b-v2.13\,q15-v2.13\,q16-v2.13\,q17-v2.13\,q18-v2.13\,q19-v2.13\,
q2-v2.13\,q20-v2.13\,q21-v2.13\,q22-v2.13\,q23a-v2.13\,q23b-v2.13\,
q24a-v2.13\,q24b-v2.13\,q25-v2.13\,q26-v2.13\,q27-v2.13\,q28-v2.13\,
q29-v2.13\,q3-v2.13\,q30-v2.13\,q31-v2.13\,q32-v2.13\,q33-v2.13\,
q34-v2.13\,q35-v2.13\,q36-v2.13\,q37-v2.13\,q38-v2.13\,q39a-v2.13\,
q39b-v2.13\,q4-v2.13\,q40-v2.13\,q41-v2.13\,q42-v2.13\,q43-v2.13\,
q44-v2.13\,q45-v2.13\,q46-v2.13\,q47-v2.13\,q48-v2.13\,q49-v2.13\,
q5-v2.13\,q50-v2.13\,q51-v2.13\,q52-v2.13\,q53-v2.13\,q54-v2.13\,
q55-v2.13\,q56-v2.13\,q57-v2.13\,q58-v2.13\,q59-v2.13\,q6-v2.13\,
q60-v2.13\,q61-v2.13\,q62-v2.13\,q63-v2.13\,q64-v2.13\,q65-v2.13\,
q66-v2.13\,q67-v2.13\,q68-v2.13\,q69-v2.13\,q7-v2.13\,q70-v2.13\,
q71-v2.13\,q72-v2.13\,q73-v2.13\,q74-v2.13\,q75-v2.13\,q76-v2.13\,
q77-v2.13\,q78-v2.13\,q79-v2.13\,q8-v2.13\,q80-v2.13\,q81-v2.13\,
q82-v2.13\,q83-v2.13\,q84-v2.13\,q85-v2.13\,q86-v2.13\,q87-v2.13\,
q88-v2.13\,q89-v2.13\,q9-v2.13\,q90-v2.13\,q91-v2.13\,q92-v2.13\,
q93-v2.13\,q94-v2.13\,q95-v2.13\,q96-v2.13\,q97-v2.13\,q98-v2.13\,
q99-v2.13\,ss_max-v2.13',true,<database>],ActionOnFailure=CONTINUE 
--region <aws-region>

Summarize the results

After the step is complete, you can see the summarized benchmark result at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv in the same way as the previous run and compute the average and geometric mean of the query runtimes.

Clean up

To prevent any future charges, delete the resources you created by following the instructions provided in the Cleanup section of the GitHub repository.

Summary

Amazon EMR is consistently enhancing the EMR runtime for Spark when used with Iceberg tables, achieving a performance that is 2.7 times faster than open source Spark 3.5.1 and Iceberg 1.5.2 on TPC-DS 3 TB, v2.13. We encourage you to keep up to date with the latest Amazon EMR releases to fully benefit from ongoing performance improvements.

To stay informed, subscribe to the AWS Big Data Blog’s RSS feed, where you can find updates on the EMR runtime for Spark and Iceberg, as well as tips on configuration best practices and tuning recommendations.


About the authors

Hari Kishore Chaparala is a software development engineer for Amazon EMR at Amazon Web Services.

Udit Mehrotra is an Engineering Manager for EMR at Amazon Web Services.

Unlock scalable analytics with a secure connectivity pattern in AWS Glue to read from or write to Snowflake

Post Syndicated from Caio Montovani original https://aws.amazon.com/blogs/big-data/unlock-scalable-analytics-with-a-secure-connectivity-pattern-in-aws-glue-to-read-from-or-write-to-snowflake/

In today’s data-driven world, the ability to seamlessly integrate and utilize diverse data sources is critical for gaining actionable insights and driving innovation. As organizations increasingly rely on data stored across various platforms, such as Snowflake, Amazon Simple Storage Service (Amazon S3), and various software as a service (SaaS) applications, the challenge of bringing these disparate data sources together has never been more pressing.

AWS Glue is a robust data integration service that facilitates the consolidation of data from different origins, empowering businesses to use the full potential of their data assets. By using AWS Glue to integrate data from Snowflake, Amazon S3, and SaaS applications, organizations can unlock new opportunities in generative artificial intelligence (AI), machine learning (ML), business intelligence (BI), and self-service analytics or feed data to underlying applications.

In this post, we explore how AWS Glue can serve as the data integration service to bring the data from Snowflake for your data integration strategy, enabling you to harness the power of your data ecosystem and drive meaningful outcomes across various use cases.

Use case

Consider a large ecommerce company that relies heavily on data-driven insights to optimize its operations, marketing strategies, and customer experiences. The company stores vast amounts of transactional data, customer information, and product catalogs in Snowflake. However, they also generate and collect data from various other sources, such as web logs stored in Amazon S3, social media platforms, and third-party data providers. To gain a comprehensive understanding of their business and make informed decisions, the company needs to integrate and analyze data from all these sources seamlessly.

One crucial business requirement for the ecommerce company is to generate a Pricing Summary Report that provides a detailed analysis of pricing and discounting strategies. This report is essential for understanding revenue streams, identifying opportunities for optimization, and making data-driven decisions regarding pricing and promotions. After the Pricing Summary Report is generated and stored in Amazon S3, the company can use AWS analytics services to generate interactive BI dashboards and run one-time queries on the report. This allows business analysts and decision-makers to gain valuable insights, visualize key metrics, and explore the data in depth, enabling informed decision-making and strategic planning for pricing and promotional strategies.

Solution overview

The following architecture diagram illustrates a secure and efficient solution of integrating Snowflake data with Amazon S3, using the native Snowflake connector in AWS Glue. This setup uses AWS PrivateLink to provide secure connectivity between AWS services across different virtual private clouds (VPCs), eliminating the need to expose data to the public internet, which is a critical need for organizations.

BDB-4354-architecture

The following are the key components and steps in the integration process:

  1. Establish a secure, private connection between your AWS account and your Snowflake account using PrivateLink. This involves creating VPC endpoints in both the AWS and Snowflake VPCs, making sure data transfer remains within the AWS network.
  2. Use Amazon Route 53 to create a private hosted zone that resolves the Snowflake endpoint within your VPC. This allows AWS Glue jobs to connect to Snowflake using a private DNS name, maintaining the security and integrity of the data transfer.
  3. Create an AWS Glue job to handle the extract, transform, and load (ETL) process on data from Snowflake to Amazon S3. The AWS Glue job uses the secure connection established by the VPC endpoints to access Snowflake data. Snowflake credentials are securely stored in AWS Secrets Manager. The AWS Glue job retrieves these credentials at runtime to authenticate and connect to Snowflake, providing secure access management. A VPC endpoint enables you to securely communicate with this service without traversing the public internet, enhancing security and performance.
  4. Store the extracted and transformed data in Amazon S3. Organize the data into appropriate structures, such as partitioned folders, to optimize query performance and data management. We use a VPC endpoint enabled to securely communicate with this service without traversing the public internet, enhancing security and performance. We also use Amazon S3 to store AWS Glue scripts, logs, and temporary data generated during the ETL process.

This approach offers the following benefits:

  • Enhanced security – By using PrivateLink and VPC endpoints, data transfer between Snowflake and Amazon S3 is secured within the AWS network, reducing exposure to potential security threats.
  • Efficient data integration – AWS Glue simplifies the ETL process, providing a scalable and flexible solution for data integration between Snowflake and Amazon S3.
  • Cost-effectiveness – Using Amazon S3 for data storage, combined with the AWS Glue pay-as-you-go pricing model, helps optimize costs associated with data management and integration.
  • Scalability and flexibility – The architecture supports scalable data transfers and can be extended to integrate additional data sources and destinations as needed.

By following this architecture and taking advantage of the capabilities of AWS Glue, PrivateLink, and associated AWS services, organizations can achieve a robust, secure, and efficient data integration solution, enabling them to harness the full potential of their Snowflake and Amazon S3 data for advanced analytics and BI.

Prerequisites

Complete the following prerequisites before setting up the solution:

  1. Verify that you have access to AWS account with the necessary permissions to provision resources in services such as Route 53, Amazon S3, AWS Glue, Secrets Manager, and Amazon Virtual Private Cloud (Amazon VPC) using AWS CloudFormation, which lets you model, provision, and manage AWS and third-party resources by treating infrastructure as code.
  2. Confirm that you have access to Snowflake hosted in AWS with required permissions to run the steps to configure PrivateLink. Refer to Enabling AWS PrivateLink in the Snowflake documentation to verify the steps, required access level, and service level to set the configurations. After you enable PrivateLink, save the value of the following parameters provided by Snowflake to use in the next step in this post:
    1. privatelink-vpce-id
    2. privatelink-account-url
    3. privatelink_ocsp-url
    4. regionless-snowsight-privatelink-url
  3. Make sure you have a Snowflake user snowflakeUser and password snowflakePassword with required permissions to read from and write to Snowflake. The user and password are used in the AWS Glue connection to authenticate within Snowflake.
  4. If your Snowflake user doesn’t have a default warehouse set, you will need a warehouse name. We use snowflakeWarehouse as a placeholder for the warehouse name; replace it with your actual warehouse name.
  5. If you’re new to Snowflake, consider completing the Snowflake in 20 Minutes By the end of the tutorial, you should know how to create required Snowflake objects, including warehouses, databases, and tables for storing and querying data.

Create resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup of the base resources. You can review and customize it to suit your needs if needed. The CloudFormation template generates the following resources:

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack to launch the CloudFormation stack.
  3. Provide the CloudFormation stack parameters:
    1. For PrivateLinkAccountURL, enter the value of the parameter privatelink-account-url obtained in the prerequisites.
    2. For PrivateLinkOcspURL, enter the value of the parameter privatelink_ocsp-url obtained in the prerequisites.
    3. For PrivateLinkVpceId, enter the value of the parameter privatelink-vpce-id obtained in the prerequisites.
    4. For PrivateSubnet1CIDR, enter the IP addresses for your private subnet 1.
    5. For PrivateSubnet2CIDR, enter the IP addresses for your private subnet 2.
    6. For PrivateSubnet3CIDR, enter the IP addresses for your private subnet 3.
    7. For PublicSubnet1CIDR, enter the IP addresses for your public subnet 1.
    8. For RegionlessSnowsightPrivateLinkURL, enter the value of the parameter regionless-snowsight-privatelink-url obtained in the prerequisites.
    9. For VpcCIDR, enter the IP addresses for your VPC.
  4. Choose Next.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Submit and wait for the stack creation step to complete.

After the CloudFormation stack is successfully created, you can see all the resources created on the Resources tab.

Navigate to the Outputs tab to see the outputs provided by CloudFormation stack. Save the value of the outputs GlueSecurityGroupId, VpcId, and PrivateSubnet1Id to use in the next step in this post.

BDB-4354-cfn-output

Update the Secrets Manager secret with Snowflake credentials for the AWS Glue connection

To update the Secrets Manager secret with user snowflakeUser, password snowflakePassword, and warehouse snowflakeWarehouse that you will use in the AWS Glue connection to establish a connection to Snowflake, complete the following steps:

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Open the secret blog-glue-snowflake-credentials.
  3. Under Secret value, choose Retrieve secret value.

BDB-4354-secrets-manager

  1. Choose Edit.
  2. Enter the user snowflakeUser, password snowflakePassword, and warehouse snowflakeWarehouse for the keys sfUser, sfPassword, and sfWarehouse, respectively.
  3. Choose Save.

Create the AWS Glue connection for Snowflake

An AWS Glue connection is an AWS Glue Data Catalog object that stores login credentials, URI strings, VPC information, and more for a particular data store. AWS Glue crawlers, jobs, and development endpoints use connections in order to access certain types of data stores. To create an AWS Glue connection to Snowflake, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, under Data catalog, choose Connections.
  2. Choose Create connection.
  3. For Data sources, search for and select Snowflake.
  4. Choose Next.

BDB-4354-sf-data-source

  1. For Snowflake URL, enter https://<privatelink-account-url>.

To obtain the Snowflake PrivateLink account URL, refer to parameters obtained in the prerequisites.

  1. For AWS Secret, choose the secret blog-glue-snowflake-credentials.
  2. For VPC, choose the VpcId value obtained from the CloudFormation stack output.
  3. For Subnet, choose the PrivateSubnet1Id value obtained from the CloudFormation stack output.
  4. For Security groups, choose the GlueSecurityGroupId value obtained from the CloudFormation stack output.
  5. Choose Next.

BDB-4354-sf-connection-setup

  1. In the Connection Properties section, for Name, enter glue-snowflake-connection.
  2. Choose Next.

BDB-4354-sf-connection-properties

  1. Choose Create connection.

Create an AWS Glue job

You’re now ready to define the AWS Glue job using the Snowflake connection. To create an AWS Glue job to read from Snowflake, complete the following steps:

  1. On the AWS Glue console, under ETL jobs in the navigation pane, choose Visual ETL.

BDB-4354-glue-studio

  1. Choose the Job details tab.
  2. For Name, enter a name, for example, Pricing Summary Report Job.
  3. For Description, enter a meaningful description for the job.
  4. For IAM Role, choose the role that has access to the target S3 location where the job is writing to and the source location from where it’s loading the Snowflake data and also to run the AWS Glue job. You can find this role in your CloudFormation stack output, named blog-glue-snowflake-GlueServiceRole-*.
  5. Use the default options for Type, Glue version, Language, Worker type, Number of workers, Number of retries, and Job timeout.
  6. For Job bookmark, choose Disable.
  7. Choose Save to save the job.

BDB-4354-glue-job-details

  1. On the Visual tab, choose Add nodes.

  1. For Sources, choose Snowflake.

  1. Choose Data source – Snowflake in the AWS Glue Studio canvas.
  2. For Name, enter Snowflake_Pricing_Summary.
  3. For Snowflake connection, choose glue-snowflake-connection.
  4. For Snowflake source, select Enter a custom query.
  5. For Database, enter snowflake_sample_data.
  6. For Snowflake query, add the following Snowflake query:
SELECT l_returnflag
    , l_linestatus
    , Sum(l_quantity) AS sum_qty
    , Sum(l_extendedprice) AS sum_base_price
    , Sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price
    , Sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge
    , Avg(l_quantity) AS avg_qty
    , Avg(l_extendedprice) AS avg_price
    , Avg(l_discount) AS avg_disc
    , Count(*) AS count_order
FROM tpch_sf1.lineitem
WHERE l_shipdate <= Dateadd(day, - 90, To_date('1998-12-01'))
GROUP BY l_returnflag
    , l_linestatus
ORDER BY l_returnflag
    , l_linestatus;

The Pricing Summary Report provides a summary pricing report for all line items shipped as of a given date. The date is within 60–120 days of the greatest ship date contained in the database. The query lists totals for extended price, discounted extended price, discounted extended price plus tax, average quantity, average extended price, and average discount. These aggregates are grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of RETURNFLAG and LINESTATUS. A count of the number of line items in each group is included.

  1. For Custom Snowflake properties, specify Key as sfSchema and Value as tpch_sf1.
  2. Choose Save.

BDB-4354-glue-source-setup

Next, you add the destination as an S3 bucket.

  1. On the Visual tab, choose Add nodes.
  2. For Targets, choose Amazon S3.

  1. Choose Data target – S3 bucket in the AWS Glue Studio canvas.
  2. For Name, enter S3_Pricing_Summary.
  3. For Node parents, select Snowflake_Pricing_Summary.
  4. For Format, select Parquet.
  5. For S3 Target Location, enter s3://<YourBucketName>/pricing_summary_report/ (use the name of your bucket).
  6. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  7. For Database, choose db_blog_glue_snowflake.
  8. For Table name, enter tb_pricing_summary.
  9. Choose Save.
  10. Choose Run to run the job, and monitor its status on the Runs tab.

You successfully completed the steps to create an AWS Glue job that reads data from Snowflake and loads the results into an S3 bucket using a secure connectivity pattern. Eventually, if you want to transform the data before loading it into Amazon S3, you can use AWS Glue transformations available in AWS Glue Studio. Using AWS Glue transformations is crucial when creating an AWS Glue job because they enable efficient data cleansing, enrichment, and restructuring, making sure the data is in the desired format and quality for downstream processes. Refer to Editing AWS Glue managed data transform nodes for more information.

Validate the results

After the job is complete, you can validate the output of the ETL job run in Athena, a serverless interactive analytics service. To validate the output, complete the following steps:

  1. On the Athena console, choose Launch Query Editor.
  2. For Workgroup, choose blog-workgroup.
  3. If the message “All queries run in the Workgroup, blog-workgroup, will use the following settings:” is displayed, choose Acknowledge.
  4. For Database, choose db_blog_glue_snowflake.
  5. For Query, enter the following statement:
SELECT l_returnflag
    , l_linestatus
    , sum_qty
    , sum_base_price
FROM db_blog_glue_snowflake.tb_pricing_summary
  1. Choose Run.

You have successfully validated your data for the AWS Glue job Pricing Summary Report Job.

Clean up

To clean up your resources, complete the following tasks:

  1. Delete the AWS Glue job Pricing Summary Report Job.
  2. Delete the AWS Glue connection glue-snowflake-connection.
  3. Stop any AWS Glue interactive sessions.
  4. Delete content from the S3 bucket blog-glue-snowflake-*.
  5. Delete the CloudFormation stack blog-glue-snowflake.

Conclusion

Using the native Snowflake connector in AWS Glue provides an efficient and secure way to integrate data from Snowflake into your data pipelines on AWS. By following the steps outlined in this post, you can establish a private connectivity channel between AWS Glue and your Snowflake using PrivateLink, Amazon VPC, security groups, and Secrets Manager.

This architecture allows you to read data from and write data to Snowflake tables directly from AWS Glue jobs running on Spark. The secure connectivity pattern prevents data transfers over the public internet, enhancing data privacy and security.

Combining AWS data integration services like AWS Glue with data platforms like Snowflake allows you to build scalable, secure data lakes and pipelines to power analytics, BI, data science, and ML use cases.

In summary, the native Snowflake connector and private connectivity model outlined here provide a performant, secure way to include Snowflake data in AWS big data workflows. This unlocks scalable analytics while maintaining data governance, compliance, and access control. For more information on AWS Glue, visit AWS Glue.


About the Authors

Caio Sgaraboto Montovani is a Sr. Specialist Solutions Architect, Data Lake and AI/ML within AWS Professional Services, developing scalable solutions according customer needs. His vast experience has helped customers in different industries such as life sciences and healthcare, retail, banking, and aviation build solutions in data analytics, machine learning, and generative AI. He is passionate about rock and roll and cooking, and loves to spend time with his family.

Kartikay Khator is a Solutions Architect within Global Life Sciences at AWS, where he dedicates his efforts to developing innovative and scalable solutions that cater to the evolving needs of customers. His expertise lies in harnessing the capabilities of AWS analytics services. Extending beyond his professional pursuits, he finds joy and fulfillment in the world of running and hiking. Having already completed two marathons, he is currently preparing for his next marathon challenge.

Navnit Shukla, an AWS Specialist Solution Architect specializing in Analytics, is passionate about helping clients uncover valuable insights from their data. Leveraging his expertise, he develops inventive solutions that empower businesses to make informed, data-driven decisions. Notably, Navnit is the accomplished author of the book “Data Wrangling on AWS,” showcasing his expertise in the field.

BDB-4354-awskamenKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect, Amazon MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest Amazon MWAA and AWS Glue features and news!

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

Amazon OpenSearch Serverless cost-effective search capabilities, at any scale

Post Syndicated from Satish Nandi original https://aws.amazon.com/blogs/big-data/amazon-opensearch-serverless-cost-effective-search-capabilities-at-any-scale/

We’re excited to announce the new lower entry cost for Amazon OpenSearch Serverless. With support for half (0.5) OpenSearch Compute Units (OCUs) for indexing and search workloads, the entry cost is cut in half. Amazon OpenSearch Serverless is a serverless deployment option for Amazon OpenSearch Service that you can use to run search and analytics workloads without the complexities of infrastructure management, shard tuning or data lifecycle management. OpenSearch Serverless automatically provisions and scales resources to provide consistently fast data ingestion rates and millisecond query response times during changing usage patterns and application demand. 

OpenSearch Serverless offers three types of collections to help meet your needs: Time-series, search, and vector. The new lower cost of entry benefits all collection types. Vector collections have come to the fore as a predominant workload when using OpenSearch Serverless as an Amazon Bedrock knowledge base. With the introduction of half OCUs, the cost for small vector workloads is halved. Time-series and search collections also benefit, especially for small workloads like proof-of-concept deployments and development and test environments.

A full OCU includes one vCPU, 6GB of RAM and 120GB of storage. A half OCU offers half a vCPU, 3 GB of RAM, and 60 GB of storage. OpenSearch Serverless scales up a half OCU first to one full OCU and then in one-OCU increments. Each OCU also uses Amazon Simple Storage Service (Amazon S3) as a backing store; you pay for data stored in Amazon S3 regardless of the OCU size. The number of OCUs needed for the deployment depends on the collection type, along with ingestion and search patterns. We will go over the details later in the post and contrast how the new half OCU base brings benefits. 

OpenSearch Serverless separates indexing and search computes, deploying sets of OCUs for each compute need. You can deploy OpenSearch Serverless in two forms: 1) Deployment with redundancy for production, and 2) Deployment without redundancy for development or testing.

Note: OpenSearch Serverless deploys two times the compute for both indexing and searching in redundant deployments.

OpenSearch Serverless Deployment Type

The following figure shows the architecture for OpenSearch Serverless in redundancy mode.

In redundancy mode, OpenSearch Serverless deploys two base OCUs for each compute set (indexing and search) across two Availability Zones. For small workloads under 60GB, OpenSearch Serverless uses half OCUs as the base size. The minimum deployment is four base units, two each for indexing and search. The minimum cost is approximately $350 per month (four half OCUs). All prices are quoted based on the US-East region and 30 days a month. During normal operation, all OCUs are in operation to serve traffic. OpenSearch Serverless scales up from this baseline as needed.

For non-redundant deployments, OpenSearch Serverless deploys one base OCU for each compute set, costing $174 per month (two half OCUs).

Redundant configurations are recommended for production deployments to maintain availability; if one Availability Zone goes down, the other can continue serving traffic. Non-redundant deployments are suitable for development and testing to reduce costs. In both configurations, you can set a maximum OCU limit to manage costs. The system will scale up to this limit during peak loads if necessary, but will not exceed it.

OpenSearch Serverless collections and resource allocations

OpenSearch Serverless uses compute units differently depending on the type of collection and keeps your data in Amazon S3. When you ingest data, OpenSearch Serverless writes it to the OCU disk and Amazon S3 before acknowledging the request, making sure of the data’s durability and the system’s performance. Depending on collection type, it additionally keeps data in the local storage of the OCUs, scaling to accommodate the storage and computer needs.

The time-series collection type is designed to be cost-efficient by limiting the amount of data kept in local storage, and keeping the remainder in Amazon S3. The number of OCUs needed depends on amount of data and the collection’s retention period. The number of OCUs OpenSearch Serverless uses for your workload is the larger of the default minimum OCUs, or the minimum number of OCUs needed to hold the most recent portion of your data, as defined by your OpenSearch Serverless data lifecycle policy. For example, if you ingest 1 TiB per day and have 30 day retention period, the size of the most recent data will be 1 TiB. You will need 20 OCUs [10 OCUs x 2] for indexing and another 20 OCUS [10 OCUs x 2] for search (based on the 120 GiB of storage per OCU). Access to older data in Amazon S3 raises the latency of the query responses. This tradeoff in query latency for older data is done to save on the OCUs cost.

The vector collection type uses RAM to store vector graphs, as well as disk to store indices. Vector collections keep index data in OCU local storage. When sizing for vector workloads both needs into account. OCU RAM limits are reached faster than OCU disk limits, causing vector collections to be bound by RAM space. 

OpenSearch Serverless allocates OCU resources for vector collections as follows. Considering full OCUs, it uses 2 GB for the operating system, 2 GB for the Java heap, and the remaining 2 GB for vector graphs. It uses 120 GB of local storage for OpenSearch indices. The RAM required for a vector graph depends on the vector dimensions, number of vectors stored, and the algorithm chosen. See Choose the k-NN algorithm for your billion-scale use case with OpenSearch for a review and formulas to help you pre-calculate vector RAM needs for your OpenSearch Serverless deployment.

Note: Many of the behaviors of the system are explained as of June 2024. Check back in coming months as new innovations continue to drive down cost.

Supported AWS Regions

The support for the new OCU minimums for OpenSearch Serverless is now available in all regions that support OpenSearch Serverless. See AWS Regional Services List for more information about OpenSearch Service availability. See the documentation to learn more about OpenSearch Serverless.

Conclusion

The introduction of half OCUs gives you a significant reduction in the base costs of Amazon OpenSearch Serverless. If you have a smaller data set, and limited usage, you can now take advantage of this lower cost. The cost-effective nature of this solution and simplified management of search and analytics workloads ensures seamless operation even as traffic demands vary.


About the authors 

Satish Nandi is a Senior Product Manager with Amazon OpenSearch Service. He is focused on OpenSearch Serverless and Geospatial and has years of experience in networking, security and ML and AI. He holds a BEng in Computer Science and an MBA in Entrepreneurship. In his free time, he likes to fly airplanes, hang glide, and ride his motorcycle.

Jon Handler is a Senior Principal Solutions Architect at Amazon Web Services based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have search and log analytics workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included four years of coding a large-scale, eCommerce search engine. Jon holds a Bachelor of the Arts from the University of Pennsylvania, and a Master of Science and a Ph. D. in Computer Science and Artificial Intelligence from Northwestern University.

Unlock scalability, cost-efficiency, and faster insights with large-scale data migration to Amazon Redshift

Post Syndicated from Chanpreet Singh original https://aws.amazon.com/blogs/big-data/unlock-scalability-cost-efficiency-and-faster-insights-with-large-scale-data-migration-to-amazon-redshift/

Large-scale data warehouse migration to the cloud is a complex and challenging endeavor that many organizations undertake to modernize their data infrastructure, enhance data management capabilities, and unlock new business opportunities. As data volumes continue to grow exponentially, traditional data warehousing solutions may struggle to keep up with the increasing demands for scalability, performance, and advanced analytics.

Migrating to Amazon Redshift offers organizations the potential for improved price-performance, enhanced data processing, faster query response times, and better integration with technologies such as machine learning (ML) and artificial intelligence (AI). However, you might face significant challenges when planning for a large-scale data warehouse migration. These challenges can range from ensuring data quality and integrity during the migration process to addressing technical complexities related to data transformation, schema mapping, performance, and compatibility issues between the source and target data warehouses. Additionally, organizations must carefully consider factors such as cost implications, security and compliance requirements, change management processes, and the potential disruption to existing business operations during the migration. Effective planning, thorough risk assessment, and a well-designed migration strategy are crucial to mitigating these challenges and implementing a successful transition to the new data warehouse environment on Amazon Redshift.

In this post, we discuss best practices for assessing, planning, and implementing a large-scale data warehouse migration into Amazon Redshift.

Success criteria for large-scale migration

The following diagram illustrates a scalable migration pattern for an extract, load, and transform (ELT) scenario using Amazon Redshift data sharing patterns.

The following diagram illustrates a scalable migration pattern for extract, transform, and load (ETL) scenario.

Migration pattern extract, transform, and load (ETL) scenarios

Success criteria alignment by all stakeholders (producers, consumers, operators, auditors) is key for successful transition to a new Amazon Redshift modern data architecture. The success criteria are the key performance indicators (KPIs) for each component of the data workflow. This includes the ETL processes that capture source data, the functional refinement and creation of data products, the aggregation for business metrics, and the consumption from analytics, business intelligence (BI), and ML.

KPIs make sure you can track and audit optimal implementation, achieve consumer satisfaction and trust, and minimize disruptions during the final transition. They measure workload trends, cost usage, data flow throughput, consumer data rendering, and real-life performance. This makes sure the new data platform can meet current and future business goals.

Migration from a large-scale mission-critical monolithic legacy data warehouse (such as Oracle, Netezza, Teradata, or Greenplum) is typically planned and implemented over 6–16 months, depending on the complexity of the existing implementation. The monolithic data warehouse environments that have been built over the last 30 years contain proprietary business logic and multiple data design patterns, including an operation data store, star or Snowflake schema, dimension and facts, data warehouses and data marts, online transaction processing (OLTP) real-time dashboards, and online analytic processing (OLAP) cubes with multi-dimensional analytics. The data warehouse is highly business critical with minimal allowable downtime. If your data warehouse platform has gone through multiple enhancements over the years, your operational service levels documentation may not be current with the latest operational metrics and desired SLAs for each tenant (such as business unit, data domain, or organization group).

As part of the success criteria for operational service levels, you need to document the expected service levels for the new Amazon Redshift data warehouse environment. This includes the expected response time limits for dashboard queries or analytical queries, elapsed runtime for daily ETL jobs, desired elapsed time for data sharing with consumers, total number of tenants with concurrency of loads and reports, and mission-critical reports for executives or factory operations.

As part of your modern data architecture transition strategy, the migration goal of a new Amazon Redshift based platform is to use the scalability, performance, cost-optimization, and additional lake house capabilities of Amazon Redshift, resulting in improving the existing data consumption experience. Depending on your enterprise’s culture and goals, your migration pattern of a legacy multi-tenant data platform to Amazon Redshift could use one of the following strategies:

A majority of organizations opt for the organic strategy (lift and shift) when migrating their large data platforms to Amazon Redshift. This approach uses AWS migration tools such as the AWS Schema Conversion Tool (AWS SCT) or the managed service version DMS Schema Conversion to rapidly meet goals around data center exit, cloud adoption, reducing legacy licensing costs, and replacing legacy platforms.

By establishing clear success criteria and monitoring KPIs, you can implement a smooth migration to Amazon Redshift that meets performance and operational goals. Thoughtful planning and optimization are crucial, including optimizing your Amazon Redshift configuration and workload management, addressing concurrency needs, implementing scalability, tuning performance for large result sets, minimizing schema locking, and optimizing join strategies. This will enable right-sizing the Redshift data warehouse to meet workload demands cost-effectively. Thorough testing and performance optimization will facilitate a smooth transition with minimal disruption to end-users, fostering exceptional user experiences and satisfaction. A successful migration can be accomplished through proactive planning, continuous monitoring, and performance fine-tuning, thereby aligning with and delivering on business objectives.

Migration involves the following phases, which we delve into in the subsequent sections:

  • Assessment
    • Discovery of workload and integrations
    • Dependency analysis
    • Effort estimation
    • Team sizing
    • Strategic wave planning
  • Functional and performance
    • Code conversion
    • Data validation
  • Measure and benchmark KPIs
    • Platform-level KPIs
    • Tenant-level KPIs
    • Consumer-level KPIs
    • Sample SQL
  • Monitoring Amazon Redshift performance and continual optimization
    • Identify top offending queries
    • Optimization strategies

To achieve a successful Amazon Redshift migration, it’s important to address these infrastructure, security, and deployment considerations simultaneously, thereby implementing a smooth and secure transition.

Assessment

In this section, we discuss the steps you can take in the assessment phase.

Discovery of workload and integrations

Conducting discovery and assessment for migrating a large on-premises data warehouse to Amazon Redshift is a critical step in the migration process. This phase helps identify potential challenges, assess the complexity of the migration, and gather the necessary information to plan and implement the migration effectively. You can use the following steps:

  • Data profiling and assessment – This involves analyzing the schema, data types, table sizes, and dependencies. Special attention should be given to complex data types such as arrays, JSON, or custom data types and custom user-defined functions (UDFs), because they may require specific handling during the migration process. Additionally, it’s essential to assess the volume of data and daily incremental data to be migrated, and estimate the required storage capacity in Amazon Redshift. Furthermore, analyzing the existing workload patterns, queries, and performance characteristics provides valuable insights into the resource requirements needed to optimize the performance of the migrated data warehouse in Amazon Redshift.
  • Code and query assessment – It’s crucial to assess the compatibility of existing SQL code, including queries, stored procedures, and functions. The AWS SCT can help identify any unsupported features, syntax, or functions that need to be rewritten or replaced to achieve a seamless integration with Amazon Redshift. Additionally, it’s essential to evaluate the complexity of the existing processes and determine if they require redesigning or optimization to align with Amazon Redshift best practices.
  • Performance and scalability assessment – This includes identifying performance bottlenecks, concurrency issues, or resource constraints that may be hindering optimal performance. This analysis helps determine the need for performance tuning or workload management techniques that may be required to achieve optimal performance and scalability in the Amazon Redshift environment.
  • Application integrations and mapping – Embarking on a data warehouse migration to a new platform necessitates a comprehensive understanding of the existing technology stack and business processes intertwined with the legacy data warehouse. Consider the following:
    • Meticulously document all ETL processes, BI tools, and scheduling mechanisms employed in conjunction with the current data warehouse. This includes commercial tools, custom scripts, and any APIs or connectors interfacing with source systems.
    • Take note of any custom code, frameworks, or mechanisms utilized in the legacy data warehouse for tasks such as managing slowly changing dimensions (SCDs), generating surrogate keys, implementing business logic, and other specialized functionalities. These components may require redevelopment or adaptation to operate seamlessly on the new platform.
    • Identify all upstream and downstream applications, as well as business processes that rely on the data warehouse. Map out their specific dependencies on database objects, tables, views, and other components. Trace the flow of data from its origins in the source systems, through the data warehouse, and ultimately to its consumption by reporting, analytics, and other downstream processes.
  • Security and access control assessment – This includes reviewing the existing security model, including user roles, permissions, access controls, data retention policies, and any compliance requirements and industry regulations that need to be adhered to.

Dependency analysis

Understanding dependencies between objects is crucial for a successful migration. You can use system catalog views and custom queries on your on-premises data warehouses to create a comprehensive object dependency report. This report shows how tables, views, and stored procedures rely on each other. This also involves analyzing indirect dependencies (for example, a view built on top of another view, which in turn uses a set of tables), and having a complete understanding of data usage patterns.

Effort estimation

The discovery phase serves as your compass for estimating the migration effort. You can translate those insights into a clear roadmap as follows:

  • Object classification and complexity assessment – Based on the discovery findings, categorize objects (tables, views, stored procedures, and so on) based on their complexity. Simple tables with minimal dependencies will require less effort to migrate than intricate views or stored procedures with complex logic.
  • Migration tools – Use the AWS SCT to estimate the base migration effort per object type. The AWS SCT can automate schema conversion, data type mapping, and function conversion, reducing manual effort.
  • Additional considerations – Factor in additional tasks beyond schema conversion. This may include data cleansing, schema optimization for Amazon Redshift performance, unit testing of migrated objects, and migration script development for complex procedures. The discovery phase sheds light on potential schema complexities, allowing you to accurately estimate the effort required for these tasks.

Team sizing

With a clear picture of the effort estimate, you can now size the team for the migration.

Person-months calculation

Divide the total estimated effort by the desired project duration to determine the total person-months required. This provides a high-level understanding of the team size needed.

For example, for a ELT migration project from an on-premises data warehouse to Amazon Redshift to be completed within 6 months, we estimate the team requirements based on the number of schemas or tenants (for example, 30), number of database tables (for example, 5,000), average migration estimate for a schema (for example, 4 weeks based on complexity of stored procedures, tables and views, platform-specific routines, and materialized views), and number of business functions (for example, 2,000 segmented by simple, medium, and complex patterns). We can determine the following are needed:

  • Migration time period (65% migration/35% for validation & transition) = 0.8* 6 months = 5 months or 22 weeks
  • Dedicated teams = Number of tenants / (migration time period) / (average migration period for a tenant) = 30/5/1 = 6 teams
  • Migration team structure:
    • One to three data developers with stored procedure conversion expertise per team, performing over 25 conversions per week
    • One data validation engineer per team, testing over 50 objects per week
    • One to two data visualization experts per team, confirming consumer downstream applications are accurate and performant
  • A common shared DBA team with performance tuning expertise responding to standardization and challenges
  • A platform architecture team (3–5 individuals) focused on platform design, service levels, availability, operational standards, cost, observability, scalability, performance, and design pattern issue resolutions

Team composition expertise

Based on the skillsets required for various migration tasks, we assemble a team with the right expertise. Platform architects define a well-architected platform. Data engineers are crucial for schema conversion and data transformation, and DBAs can handle cluster configuration and workload monitoring. An engagement or project management team makes sure the project runs smoothly, on time, and within budget.

For example, for an ETL migration project from Informatica/Greenplum to a target Redshift lakehouse with an Amazon Simple Storage Service (Amazon S3) data lake to be completed within 12 months, we estimate the team requirements based on the number of schemas and tenants (for example, 50 schemas), number of database tables (for example, 10,000), average migration estimate for a schema (6 weeks based on complexity of database objects), and number of business functions (for example, 5,000 segmented by simple, medium, and complex patterns). We can determine the following are needed:

  • An open data format ingestion architecture processing the source dataset and refining the data in the S3 data lake. This requires a dedicated team of 3–7 members building a serverless data lake for all data sources. Ingestion migration implementation is segmented by tenants and type of ingestion patterns, such as internal database change data capture (CDC); data streaming, clickstream, and Internet of Things (IoT); public dataset capture; partner data transfer; and file ingestion patterns.
  • The migration team composition is tailored to the needs of a project wave. Depending on each migration wave and what is being done in the wave (development, testing, or performance tuning), the right people will be engaged. When the wave is complete, the people from that wave will move to another wave.
  • A loading team builds a producer-consumer architecture in Amazon Redshift to process concurrent near real-time publishing of data. This requires a dedicated team of 3–7 members building and publishing refined datasets in Amazon Redshift.
  • A shared DBA group of 3–5 individuals helping with schema standardization, migration challenges, and performance optimization outside the automated conversion.
  • Data transformation experts to convert database stored functions in the producer or consumer.
  • A migration sprint plan for 10 months with 2 sprint weeks with multiple waves to release tenants to the new architecture.
  • A validation team to confirm a reliable and complete migration.
  • One to two data visualization experts per team, confirming that consumer downstream applications are accurate and performant.
  • A platform architecture team (3–5 individuals) focused on platform design, service levels, availability, operational standards, cost, observability, scalability, performance, and design pattern issue resolutions.

Strategic wave planning

Migration waves can be determined as follows:

  • Dependency-based wave delineation – Objects can be grouped into migration waves based on their dependency relationships. Objects with no or minimal dependencies will be prioritized for earlier waves, whereas those with complex dependencies will be migrated in subsequent waves. This provides a smooth and sequential migration process.
  • Logical schema and business area alignment – You can further revise migration waves by considering logical schema and business areas. This allows you to migrate related data objects together, minimizing disruption to specific business functions.

Functional and performance

In this section, we discuss the steps for refactoring the legacy SQL codebase to leverage Redshift SQL best practices, build validation routines to ensure accuracy and completeness during the transition to Redshift, capturing KPIs to ensure similar or better service levels for consumption tools/downstream applications, and incorporating performance hooks and procedures for scalable and performant Redshift Platform.

Code conversion

We recommend using the AWS SCT as the first step in the code conversion journey. The AWS SCT is a powerful tool that can streamline the database schema and code migrations to Amazon Redshift. With its intuitive interface and automated conversion capabilities, the AWS SCT can significantly reduce the manual effort required during the migration process. Refer to Converting data warehouse schemas to Amazon Redshift using AWS SCT for instructions to convert your database schema, including tables, views, functions, and stored procedures, to Amazon Redshift format. For an Oracle source, you can also use the managed service version DMS Schema Conversion.

When the conversion is complete, the AWS SCT generates a detailed conversion report. This report highlights any potential issues, incompatibilities, or areas requiring manual intervention. Although the AWS SCT automates a significant portion of the conversion process, manual review and modifications are often necessary to address various complexities and optimizations.

Some common cases where manual review and modifications are typically required include:

  • Incompatible data types – The AWS SCT may not always handle custom or non-standard data types, requiring manual intervention to map them to compatible Amazon Redshift data types.
  • Database-specific SQL extensions or proprietary functions – If the source database uses SQL extensions or proprietary functions specific to the database vendor (for example, STRING_AGG() or ARRAY_UPPER functions, or custom UDFs for PostgreSQL), these may need to be manually rewritten or replaced with equivalent Amazon Redshift functions or UDFs. The AWS SCT extension pack is an add-on module that emulates functions present in a source database that are required when converting objects to the target database.
  • Performance optimization – Although the AWS SCT can convert the schema and code, manual optimization is often necessary to take advantage of the features and capabilities of Amazon Redshift. This may include adjusting distribution and sort keys, converting row-by-row operations to set-based operations, optimizing query plans, and other performance tuning techniques specific to Amazon Redshift.
  • Stored procedures and code conversion – The AWS SCT offers comprehensive capabilities to seamlessly migrate stored procedures and other code objects across platforms. Although its automated conversion process efficiently handles the majority of cases, certain intricate scenarios may necessitate manual intervention due to the complexity of the code and utilization of database-specific features or extensions. To achieve optimal compatibility and accuracy, it’s advisable to undertake testing and validation procedures during the migration process.

After you address the issues identified during the manual review process, it’s crucial to thoroughly test the converted stored procedures, as well as other database objects and code, such as views, functions, and SQL extensions, in a non-production Redshift cluster before deploying them in the production environment. This exercise is mostly undertaken by QA teams. This phase also involves conducting holistic performance testing (individual queries, batch loads, consumption reports and dashboards in BI tools, data mining applications, ML algorithms, and other relevant use cases) in addition to functional testing to make sure the converted code meets the required performance expectations. The performance tests should simulate production-like workloads and data volumes to validate the performance under realistic conditions.

Data validation

When migrating data from an on-premises data warehouse to a Redshift cluster on AWS, data validation is a crucial step to confirm the integrity and accuracy of the migrated data. There are several approaches you can consider:

  • Custom scripts – Use scripting languages like Python, SQL, or Bash to develop custom data validation scripts tailored to your specific data validation requirements. These scripts can connect to both the source and target databases, extract data, perform comparisons, and generate reports.
  • Open source tools – Use open source data validation tools like Amazon Deequ or Great Expectations. These tools provide frameworks and utilities for defining data quality rules, validating data, and generating reports.
  • AWS native or commercial tools – Use AWS native tools such as AWS Glue Data Quality or commercial data validation tools like Collibra Data Quality. These tools often provide comprehensive features, user-friendly interfaces, and dedicated support.

The following are different types of validation checks to consider:

  • Structural comparisons – Compare the list of columns and data types of columns between the source and target (Amazon Redshift). Any mismatches should be flagged.
  • Row count validation – Compare the row counts of each core table in the source data warehouse with the corresponding table in the target Redshift cluster. This is the most basic validation step to make sure no data has been lost or duplicated during the migration process.
  • Column-level validation – Validate individual columns by comparing column-level statistics (min, max, count, sum, average) for each column between the source and target databases. This can help identify any discrepancies in data values or data types.

You can also consider the following validation strategies:

  • Data profiling – Perform data profiling on the source and target databases to understand the data characteristics, identify outliers, and detect potential data quality issues. For example, you can use the data profiling capabilities of AWS Glue Data Quality or the Amazon Deequ
  • Reconciliation reports – Produce detailed validation reports that highlight errors, mismatches, and data quality issues. Consider generating reports in various formats (CSV, JSON, HTML) for straightforward consumption and integration with monitoring tools.
  • Automate the validation process – Integrate the validation logic into your data migration or ETL pipelines using scheduling tools or workflow orchestrators like Apache Airflow or AWS Step Functions.

Lastly, keep in mind the following considerations for collaboration and communication:

  • Stakeholder involvement – Involve relevant stakeholders, such as business analysts, data owners, and subject matter experts, throughout the validation process to make sure business requirements and data quality expectations are met.
  • Reporting and sign-off – Establish a clear reporting and sign-off process for the validation results, involving all relevant stakeholders and decision-makers.

Measure and benchmark KPIs

For multi-tenant Amazon Redshift implementation, KPIs are segmented at the platform level, tenant level, and consumption tools level. KPIs evaluate the operational metrics, cost metrics, and end-user response time metrics. In this section, we discuss the KPIs needed for achieving a successful transition.

Platform-level KPIs

As new tenants are gradually migrated to the platform, it’s imperative to monitor the current state of Amazon Redshift platform-level KPIs. The current KPI’s state will help the platform team make the necessary scalability modifications (add nodes, add consumer clusters, add producer clusters, or increase concurrency scaling clusters). Amazon Redshift query monitoring rules (QMR) also help govern the overall state of data platform, providing optimal performance for all tenants by managing outlier workloads.

The following table summarizes the relevant platform-level KPIs.

Component KPI Service Level and Success Criteria
ETL Ingestion data volume Daily or hourly peak volume in GBps, number of objects, number of threads.
Ingestion threads Peak hourly ingestion threads (COPY or INSERT), number of dependencies, KPI segmented by tenants and domains.
Stored procedure volume Peak hourly stored procedure invocations segmented by tenants and domains.
Concurrent load Peak concurrent load supported by the producer cluster; distribution of ingestion pattern across multiple producer clusters using data sharing.
Data sharing dependency Data sharing between producer clusters (objects refreshed, locks per hour, waits per hour).
Workload Number of queries Peak hour query volume supported by cluster segmented by short (less than 10 seconds), medium (less than 60 seconds), long (less than 5 minutes), very long (less than 30 minutes), and outlier (more than 30 minutes); segmented by tenant, domain, or sub-domain.
Number of queries per queue Peak hour query volume supported by priority automatic WLM queue segmented by short (less than 10 seconds), medium (less than 60 seconds), long (less than 5 minutes), very long (less than 30 minutes), and outlier (more than 30 minutes); segmented by tenant, business group, domain, or sub-domain.
Runtime pattern Total runtime per hour; max, median, and average run pattern; segmented by service class across clusters.
Wait time patterns Total wait time per hour; max, median, and average wait pattern for queries waiting.
Performance Leader node usage Service level for leader node (recommended less than 80%).
Compute node CPU usage Service level for compute node (recommended less than 90%).
Disk I/O usage per node Service level for disk I/O per node.
QMR rules Number of outlier queries stopped by QMR (large scan, large spilling disk, large runtime); logging thresholds for potential large queries running more than 5 minutes.
History of WLM queries Historical trend of queries stored in historical archive table for all instances of queries in STL_WLM_QUERY; trend analysis over 30 days, 60 days, and 90 days to fine-tune the workload across clusters.
Cost Total cost per month of Amazon Redshift platform Service level for mix of instances (reserved, on-demand, serverless), cost of Concurrency Scaling, cost of Amazon Redshift Spectrum usage. Use AWS tools like AWS Cost Explorer or daily cost usage report to capture monthly costs for each component.
Daily Concurrency Scaling usage Service limits to monitor cost for concurrency scaling; invoke for outlier activity on spikes.
Daily Amazon Redshift Spectrum usage Service limits to monitor cost for using Amazon Redshift Spectrum; invoke for outlier activity.
Redshift Managed Storage usage cost Track usage of Redshift Managed Storage, monitoring wastage on temporary, archival, and old data assets.
Localization Remote or on-premises tools Service level for rendering large datasets to remote destinations.
Data transfer to remote tools Data transfer to BI tools or workstations outside the Redshift cluster VPC; separation of datasets to Amazon S3 using the unload feature, avoiding bottlenecks at leader node.

Tenant-level KPIs

Tenant-level KPIs help capture current performance levels from the legacy system and document expected service levels for the data flow from the source capture to end-user consumption. The captured legacy KPIs assist in providing the best target modern Amazon Redshift platform (a single Redshift data warehouse, a lake house with Amazon Redshift Spectrum, and data sharing with the producer and consumer clusters). Cost usage tracking at the tenant level helps you spread the cost of a shared platform across tenants.

The following table summarizes the relevant tenant-level KPIs.

Component KPI Service Level and Success Criteria
Cost Compute usage by tenant Track usage by tenant, business group, or domain; capture query volume by business unit associating Redshift user identity to internal business unit; data observability by consumer usage for data products helping with cost attribution.
ETL Orchestration SLA Service level for daily data availability.
Runtime Service level for data loading and transformation.
Data ingestion volume Peak expected volume for service level guarantee.
Query consumption Response time Response time SLA for query patterns (dashboards, SQL analytics, ML analytics, BI tool caching).
Concurrency Peak query consumers for tenant.
Query volume Peak hourly volume service levels and daily query volumes.
Individual query response for critical data consumption Service level and success criteria for critical workloads.

Consumer-level KPIs

A multi-tenant modern data platform can set service levels for a variety of consumer tools. The service levels provide guidance to end-users of the capability of the new deployment.

The following table summarizes the relevant consumer-level KPIs.

Consumer KPI Service Level and Success Criteria
BI tools Large data extraction Service level for unloading data for caching or query rendering a large result dataset.
Dashboards Response time Service level for data refresh.
SQL query tools Response time Service level for response time by query type.
Concurrency Service level for concurrent query access by all consumers.
One-time analytics Response time Service level for large data unloads or aggregation.
ML analytics Response time Service level for large data unloads or aggregation.

Sample SQL

The post includes sample SQL to capture daily KPI metrics. The following example KPI dashboard trends assist in capturing historic workload patterns, identifying deviations in workload, and providing guidance on the platform workload capacity to meet the current workload and anticipated growth patterns.

The following figure shows a daily query volume snapshot (queries per day and queued queries per day, which waited a minimum of 5 seconds).

Figure shows a daily query volume snapshot (queries per day and queued queries per day, which waited a minimum of 5 seconds)

The following figure shows a daily usage KPI. It monitors percentage waits and median wait for waiting queries (identifies the minimal threshold for wait to compute waiting queries and median of all wait times to infer deviation patterns).

Figure shows a daily usage KPI. It monitors percentage waits and median wait for waiting queries (identifies the minimal threshold for wait to compute waiting queries and median of all wait times to infer deviation patterns)

The following figure illustrates concurrency usage (monitors concurrency compute usage for Concurrency Scaling clusters).

The following figure illustrates concurrency usage (monitors concurrency compute usage for Concurrency Scaling clusters)

The following figure shows a 30-day pattern (computes volume in terms of total runtime and total wait time).

The following figure shows a 30-day pattern (computes volume in terms of total runtime and total wait time)

Monitoring Redshift performance and continual optimization

Amazon Redshift uses automatic table optimization (ATO) to choose the right distribution style, sort keys, and encoding when you create a table with AUTO options. Therefore, it’s a good practice to take advantage of the AUTO feature and create tables with DISTSTYLE AUTO, SORTKEY AUTO, and ENCODING AUTO. When tables are created with AUTO options, Amazon Redshift initially creates tables with optimal keys for the best first-time query performance possible using information such as the primary key and data types. In addition, Amazon Redshift analyzes the data volume and query usage patterns to evolve the distribution strategy and sort keys to optimize performance over time. Finally, Amazon Redshift performs table maintenance activities on your tables that reduce fragmentation and make sure statistics are up to date.

During a large, phased migration, it’s important to monitor and measure Amazon Redshift performance against target KPIs at each phase and implement continual optimization. As new workloads are onboarded at each phase of the migration, it’s recommended to perform regular Redshift cluster reviews and analyze query pattern and performance. Cluster reviews can be done by engaging the Amazon Redshift specialist team through AWS Enterprise support or your AWS account team. The goal of a cluster review includes the following:

  • Use cases – Review the application use cases and determine if the design is suitable to solve for those use cases.
  • End-to-end architecture – Assess the current data pipeline architecture (ingestion, transformation, and consumption). For example, determine if too many small inserts are occurring and review their ETL pipeline. Determine if integration with other AWS services can be useful, such as AWS Lake Formation, Amazon Athena, Redshift Spectrum, or Amazon Redshift federation with PostgreSQL and MySQL.
  • Data model design – Review the data model and table design and provide recommendations for sort and distribution keys, keeping in mind best practices.
  • Performance – Review cluster performance metrics. Identify bottlenecks or irregularities and suggest recommendations. Dive deep into specific long-running queries to identify solutions specific to the customer’s workload.
  • Cost optimization – Provide recommendations to reduce costs where possible.
  • New features – Stay up to date with the new features in Amazon Redshift and identify where they can be used to meet these goals.

New workloads can introduce query patterns that could impact performance and miss target SLAs. A number of factors can affect query performance. In the following sections, we discuss aspects impacting query speed and optimizations for improving Redshift cluster performance.

Identify top offending queries

A compute node is partitioned into slices. More nodes means more processors and more slices, which enables you to redistribute the data as needed across the slices. However, more nodes also means greater expense, so you will need to find the balance of cost and performance that is appropriate for your system. For more information on Redshift cluster architecture, see Data warehouse system architecture. Each node type offers different sizes and limits to help you scale your cluster appropriately. The node size determines the storage capacity, memory, CPU, and price of each node in the cluster. For more information on node types, see Amazon Redshift pricing.

Redshift Test Drive is an open source tool that lets you evaluate which different data warehouse configuration options are best suited for your workload. We created Redshift Test Drive from Simple Replay and Amazon Redshift Node Configuration Comparison (see Compare different node types for your workload using Amazon Redshift for more details) to provide a single entry point for finding the best Amazon Redshift configuration for your workload. Redshift Test Drive also provides additional features such as a self-hosted analysis UI and the ability to replicate external objects that a Redshift workload may interact with. With Amazon Redshift Serverless, you can start with a base Redshift Processing Unit (RPU), and Redshift Serverless automatically scales based on your workload needs.

Optimization strategies

If you choose to fine-tune manually, the following are key concepts and considerations:

  • Data distribution – Amazon Redshift stores table data on the compute nodes according to a table’s distribution style. When you run a query, the query optimizer redistributes the data to the compute nodes as needed to perform any joins and aggregations. Choosing the right distribution style for a table helps minimize the impact of the redistribution step by locating the data where it needs to be before the joins are performed. For more information, see Working with data distribution styles.
  • Data sort order – Amazon Redshift stores table data on disk in sorted order according to a table’s sort keys. The query optimizer and query processor use the information about where the data is located to reduce the number of blocks that need to be scanned and thereby improve query speed. For more information, see Working with sort keys.
  • Dataset size – A higher volume of data in the cluster can slow query performance for queries, because more rows need to be scanned and redistributed. You can mitigate this effect by regular vacuuming and archiving of data, and by using a predicate (a condition in the WHERE clause) to restrict the query dataset.
  • Concurrent operations – Amazon Redshift offers a powerful feature called automatic workload management (WLM) with query priorities, which enhances query throughput and overall system performance. By intelligently managing multiple concurrent operations and allocating resources dynamically, automatic WLM makes sure high-priority queries receive the necessary resources promptly, while lower-priority queries are processed efficiently without compromising system stability. This advanced queuing mechanism allows Amazon Redshift to optimize resource utilization, minimizing potential bottlenecks and maximizing query throughput, ultimately delivering a seamless and responsive experience for users running multiple operations simultaneously.
  • Query structure – How your query is written will affect its performance. As much as possible, write queries to process and return as little data as will meet your needs. For more information, see Amazon Redshift best practices for designing queries.
  • Queries with a long return time – Queries with a long return time can impact the processing of other queries and overall performance of the cluster. It’s critical to identify and optimize them. You can optimize these queries by either moving clients to the same network or using the UNLOAD feature of Amazon Redshift, and then configure the client to read the output from Amazon S3. To identify percentile and top running queries, you can download the sample SQL notebook system queries. You can import this in Query Editor V2.0.

Conclusion

In this post, we discussed best practices for assessing, planning, and implementing a large-scale data warehouse migration into Amazon Redshift.

The assessment phase of a data migration project is critical for implementing a successful migration. It involves a comprehensive analysis of the existing workload, integrations, and dependencies to accurately estimate the effort required and determine the appropriate team size. Strategic wave planning is crucial for prioritizing and scheduling the migration tasks effectively. Establishing KPIs and benchmarking them helps measure progress and identify areas for improvement. Code conversion and data validation processes validate the integrity of the migrated data and applications. Monitoring Amazon Redshift performance, identifying and optimizing top offending queries, and conducting regular cluster reviews are essential for maintaining optimal performance and addressing any potential issues promptly.

By addressing these key aspects, organizations can seamlessly migrate their data workloads to Amazon Redshift while minimizing disruptions and maximizing the benefits of Amazon Redshift.

We hope this post provides you with valuable guidance. We welcome any thoughts or questions in the comments section.


About the authors

Chanpreet Singh is a Senior Lead Consultant at AWS, specializing in Data Analytics and AI/ML. He has over 17 years of industry experience and is passionate about helping customers build scalable data warehouses and big data solutions. In his spare time, Chanpreet loves to explore nature, read, and enjoy with his family.

Harshida Patel is a Analytics Specialist Principal Solutions Architect, with AWS.

Raza Hafeez is a Senior Product Manager at Amazon Redshift. He has over 13 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Ram Bhandarkar is a Principal Data Architect at AWS based out of Northern Virginia. He helps customers with planning future Enterprise Data Strategy and assists them with transition to Modern Data Architecture platform on AWS. He has worked with building and migrating databases, data warehouses and data lake solutions for over 25 years.

Vijay Bagur is a Sr. Technical Account Manager. He works with enterprise customers to modernize and cost optimize workloads, improve security posture, and helps them build reliable and secure applications on the AWS platform. Outside of work, he loves spending time with his family, biking and traveling.

Synchronize data lakes with CDC-based UPSERT using open table format, AWS Glue, and Amazon MSK

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/synchronize-data-lakes-with-cdc-based-upsert-using-open-table-format-aws-glue-and-amazon-msk/

In the current industry landscape, data lakes have become a cornerstone of modern data architecture, serving as repositories for vast amounts of structured and unstructured data. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in a downstream system. Capturing every change from transactions in a source database and moving them to the target keeps the systems synchronized, and helps with analytics use cases and zero-downtime database migrations.

However, efficiently managing and synchronizing data within these lakes presents a significant challenge. Maintaining data consistency and integrity across distributed data lakes is crucial for decision-making and analytics. Inaccurate or outdated data can lead to flawed insights and business decisions. Businesses require synchronized data to gain actionable insights and respond swiftly to changing market conditions. Scalability is a critical concern for data lakes, because they need to accommodate growing volumes of data without compromising performance or incurring exorbitant costs.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. We use MSK connect—an AWS managed service to deploy and run Kafka Connect to build an end-to-end CDC application that uses Debezium MySQL connector to process, insert, update, and delete records from MySQL and a confluent Amazon Simple Storage Service (Amazon S3) sink connector to write to Amazon S3 as raw data that can be consumed by other downstream application for further use cases. To process batch data effectively, we use AWS Glue, a serverless data integration service that uses the Spark framework to process the data from S3 and copies the data to the open table format layer. Open table format manages large collections of files as tables and supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. We chose Delta Lake as an example open table format, but you can achieve the same results using Apache Iceberg or Apache Hudi.

The post illustrates the construction of a comprehensive CDC system, enabling the processing of CDC data sourced from Amazon Relational Database Service (Amazon RDS) for MySQL. Initially, we’re creating a raw data lake of all modified records in the database in near real time using Amazon MSK and writing to Amazon S3 as raw data. This raw data can then be used to build a data warehouse or even a special type of data storage that’s optimized for analytics, such as a Delta Lake on S3. Later, we use an AWS Glue exchange, transform, and load (ETL) job for batch processing of CDC data from the S3 raw data lake. A key advantage of this setup is that you have complete control over the entire process, from capturing the changes in your database to transforming the data for your specific needs. This flexibility allows you to adapt the system to different use cases.

This is achieved through integration with MSK Connect using the Debezium MySQL connector, followed by writing data to Amazon S3 facilitated by the Confluent S3 Sink Connector. Subsequently, the data is processed from S3 using an AWS Glue ETL job, and then stored in the data lake layer. Finally, the Delta Lake table is queried using Amazon Athena.

Note: If you require real-time data processing of the CDC data, you can bypass the batch approach and use an AWS Glue streaming job instead. This job would directly connect to the Kafka topic in MSK, grabbing the data as soon as changes occur. It can then process and transform the data as needed, creating a Delta Lake on Amazon S3 that reflects the latest updates according to your business needs. This approach ensures you have the most up-to-date data available for real-time analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this blog post. Each number represents a major component of the solution.

The workflow consists of the following:

  1. Near real-time data capture from MySQL and streaming to Amazon S3
    1. The process starts with data originating from Amazon RDS for
    2. A Debezium connector is used to capture changes to the data in the RDS instance in near real time. Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect and immediately respond to row-level changes in the databases. Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors.
    3. The captured data changes are then streamed to an Amazon MSK topic. MSK is a managed service that simplifies running Apache Kafka on AWS.
    4. The processed data stream (topic) is streamed from MSK to Amazon S3 in JSON format. The Confluent S3 Sink Connector allows near real-time data transfer from an MSK cluster to an S3 bucket.
  2. Batch processing the CDC raw data and writing it into the data lake
    1. Set up an AWS Glue ETL job to process the raw CDC
    2. This job reads bookmarked data from an S3 raw bucket and writes into the data lake in open file format (Delta). The job also creates the Delta Lake table in AWS Glue Data Catalog.
    3. Delta Lake is an open-source storage layer built on top of existing data lakes. It adds functionalities like ACID transactions and versioning to improve data reliability and manageability.
  3. Analyze the data using serverless interactive query service
    1. Athena, a serverless interactive query service, can be used to query the Delta Lake table created in Glue Data Catalog. This allows for interactive data analysis without managing infrastructure.

For this post, we create the solution resources in the us-east-1 AWS Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of the CDC pipeline and AWS Glue processing according to your use case, and if you have requirements to create specific process resources only.

  1. vpc-msk-mskconnect-rds-client.yaml – This template sets up the CDC pipeline resources such as a virtual private cloud (VPC), subnet, security group, AWS Identity and Access Management (IAM) roles, NAT, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, Amazon MSK, MSKConnect, RDS, and S3
  2. gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database and ETL

Configure MSK and MSK connect

To start, you’ll configure MKS and MSK connect using Debezium connector to capture incremental changes in table and write into Amazon S3 using an S3 sink connector. The vpc-msk-mskconnect-rds-client.yaml stack creates a VPC, private and public subnets, security groups, S3 buckets, Amazon MSK cluster, EC2 instance with Kafka client, RDS database, and MSK connectors, and its worker configurations.

  1. Launch the stack vpc-msk-mskconnect-rds-client using the CloudFormation template:
    BDB-4100-CFN-Launch-Stack
  2. Provide the parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName An environment name that is prefixed to resource names. msk-delta-cdc-pipeline
3 DatabasePassword Database admin account password. S3cretPwd99
4 InstanceType MSK client EC2 instance type. t2.micro
5 LatestAmiId Latest AMI ID of Amazon Linux 2023 for EC2 instance. You can use the default value. /aws/service/ami-amazon-linux- latest/al2023-ami-kernel-6.1-x86_64
6 VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
7 PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
8 PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
9 PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
10 PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24
11 PrivateSubnet3CIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. 10.192.22.0/24
  1. The stack creation process can take approximately one hour to complete. Check the Outputs tab for the stack after the stack is created.

Next, you set up the AWS Glue data processing resources such as the AWS Glue database, table, and ETL job.

Implement UPSERT on an S3 data lake with Delta Lake using AWS Glue

The gluejob-setup.yaml CloudFormation template creates a database, IAM role, and AWS Glue ETL job. Retrieve the values for S3BucketNameForOutput, and S3BucketNameForScript from the vpc-msk-mskconnect-rds-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup.
    Launch Cloudformation Stack
  2. Provide parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName Environment name that is prefixed to resource names. gluejob-setup
3 GlueDataBaseName Name of the Data Catalog database. glue_cdc_blog_db
4 GlueTableName Name of the Data Catalog table. blog_cdc_tbl
5 S3BucketForGlueScript Bucket name for the AWS Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws- gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentNam e
6 GlueWorkerType Worker type for AWS Glue job. For example, G.1X G.1X
7 NumberOfWorkers Number of workers in the AWS Glue job. 3
8 S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
9 S3ConnectorTargetBucketname Bucket name where the Amazon MSK S3 sink connector writes the data from the Kafka topic. msk-lab-${AWS::AccountId}- target-bucket
  1. The stack creation process can take approximately 2 minutes to complete. Check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created an AWS Glue database and AWS Glue job. For further clarity, you can examine the AWS Glue database and job generated using the CloudFormation template.

After successfully creating the CloudFormation stack, you can proceed with processing data using the AWS Glue ETL job.

Run the AWS Glue ETL job

To process the data created in the S3 bucket from Amazon MSK using the AWS Glue ETL job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue ETL job from the GlueJobName In the following screenshot, the name is GlueCDCJob-glue-delta-cdc.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue ETL job named GlueCDCJob-glue-delta-cdc.
  3. Choose the job name to open its details page.
  4. Choose Run to start the On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template output.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

Note: We have enabled AWS Glue job bookmark, which will make sure job will process the new data in each job run.

Query the Delta Lake table using Athena

After the AWS Glue ETL job has successfully created the Delta Lake table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database glue_cdc_blog_db created using gluejob-setup stack.
  4. To validate the data, run the following query to preview the data and find the total count.
SELECT * FROM "glue_cdc_blog_db"."blog_cdc_tbl" ORDER BY cust_id DESC LIMIT 40;
SELECT COUNT(*) FROM "glue_cdc_blog_db"."blog_cdc_tbl";

The following screenshot shows the output of our example query.

Upload incremental (CDC) data for further processing

After we process the initial full load, let’s perform insert, update, and delete records in MySQL, which will be processed by the Debezium mysql connector and written to Amazon S3 using a confluent S3 sink connector.

  1. On the Amazon EC2 console, go to the EC2 instance named KafkaClientInstance that you created using the CloudFormation template.

  1. Sign in to the EC2 instance using SSM. Select KafkaClientInstance and then choose Connect.

  1. Run the following commands to insert the data into the RDS table. Use the database password from the CloudFormation stack parameter tab.
sudo su - ec2-user
RDS_AURORA_ENDPOINT=`aws rds describe-db-instances --region us-east-1 | jq -r '.DBInstances[] | select(.DBName == "salesdb") | .Endpoint.Address'`
mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password
  1. Now perform the insert into the CUSTOMER table.
use salesdb;
INSERT into CUSTOMER values(8887,'Customer Name 8887','Market segment 8887');
INSERT into CUSTOMER values(8888,'Customer Name 8888','Market segment 8888');
INSERT into CUSTOMER values(8889,'Customer Name 8889','Market segment 8889');

  1. Run the AWS Glue job again to update the Delta Lake table with new records.
  2. Use the Athena console to validate the data.
  3. Perform the insert, update, and delete in the CUSTOMER table.
    UPDATE CUSTOMER SET NAME='Customer Name update 8888',MKTSEGMENT='Market segment update 8888' where CUST_ID = 8888;
    UPDATE CUSTOMER SET NAME='Customer Name update 8889',MKTSEGMENT='Market segment update 8889' where CUST_ID = 8889;
    DELETE FROM CUSTOMER where CUST_ID = 8887;
    INSERT into CUSTOMER values(9000,'Customer Name 9000','Market segment 9000');
    

  4. Run the AWS Glue job again to update the Delta Lake table with the insert, update, and delete records.
  5. Use the Athena console to validate the data to verify the update and delete records in the Delta Lake table.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-msk-mskconnect-rds-client.

Conclusion

Organizations continually seek high-performance, cost-effective, and scalable analytical solutions to extract value from their operational data sources in near real time. The analytical platform must be capable of receiving updates to operational data as they happen. Traditional data lake solutions often struggle with managing changes in source data, but the Delta Lake framework addresses this challenge. This post illustrates the process of constructing an end-to-end change data capture (CDC) application using Amazon MSK, MSK Connect, AWS Glue, and native Delta Lake tables, alongside guidance on querying Delta Lake tables from Amazon Athena. This architectural pattern can be adapted to other data sources employing various Kafka connectors, enabling the creation of data lakes supporting UPSERT operations using AWS Glue and native Delta Lake tables. For further insights, see the MSK Connect examples.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specializing in AWS Glue and Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specializing in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Manage Amazon Redshift provisioned clusters with Terraform

Post Syndicated from Amit Ghodke original https://aws.amazon.com/blogs/big-data/manage-amazon-redshift-provisioned-clusters-with-terraform/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it straightforward and cost-effective to analyze all your data using standard SQL and your existing extract, transform, and load (ETL); business intelligence (BI); and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

HashiCorp Terraform is an infrastructure as code (IaC) tool that lets you define cloud resources in human-readable configuration files that you can version, reuse, and share. You can then use a consistent workflow to provision and manage your infrastructure throughout its lifecycle.

In this post, we demonstrate how to use Terraform to manage common Redshift cluster operations, such as:

  • Creating a new provisioned Redshift cluster using Terraform code and adding an AWS Identity and Access Management (IAM) role to it
  • Scheduling pause, resume, and resize operations for the Redshift cluster

Solution overview

The following diagram illustrates the solution architecture for provisioning a Redshift cluster using Terraform.

Solution Overview

In addition to Amazon Redshift, the solution uses the following AWS services:

  • Amazon Elastic Compute Cloud (Amazon EC2) offers the broadest and deepest compute platform, with over 750 instances and choice of the latest processors, storage, networking, operating system (OS), and purchase model to help you best match the needs of your workload. For this post, we use an m5.xlarge instance with the Windows Server 2022 Datacenter Edition. The choice of instance type and Windows OS is flexible; you can choose a configuration that suits your use case.
  • IAM allows you to securely manage identities and access to AWS services and resources. We use IAM roles and policies to securely access services and perform relevant operations. An IAM role is an AWS identity that you can assume to gain temporary access to AWS services and resources. Each IAM role has a set of permissions defined by IAM policies. These policies determine the actions and resources the role can access.
  • AWS Secrets Manager allows you to securely store the user name and password needed to log in to Amazon Redshift.

In this post, we demonstrate how to set up an environment that connects AWS and Terraform. The following are the high-level tasks involved:

  1. Set up an EC2 instance with Windows OS in AWS.
  2. Install Terraform on the instance.
  3. Configure your environment variables (Windows OS).
  4. Define an IAM policy to have minimum access to perform activities on a Redshift cluster, including pause, resume, and resize.
  5. Establish an IAM role using the policy you created.
  6. Create a provisioned Redshift cluster using Terraform code.
  7. Attach the IAM role you created to the Redshift cluster.
  8. Write the Terraform code to schedule cluster operations like pause, resume, and resize.

Prerequisites

To complete the activities described in this post, you need an AWS account and administrator privileges on the account to use the key AWS services and create the necessary IAM roles.

Create an EC2 instance

We begin with creating an EC2 instance. Complete the following steps to create a Windows OS EC2 instance:

  1. On the Amazon EC2 console, choose Launch Instance.
  2. Choose a Windows Server Amazon Machine Image (AMI) that suits your requirements.
  3. Select an appropriate instance type for your use case.
  4. Configure the instance details:
    1. Choose the VPC and subnet where you want to launch the instance.
    2. Enable Auto-assign Public IP.
    3. For Add storage, configure the desired storage options for your instance.
    4. Add any necessary tags to the instance.
  5. For Configure security group, select or create a security group that allows the necessary inbound and outbound traffic to your instance.
  6. Review the instance configuration and choose Launch to start the instance creation process.
  7. For Select an existing key pair or create a new key pair, choose an existing key pair or create a new one.
  8. Choose Launch instance.
  9. When the instance is running, you can connect to it using the Remote Desktop Protocol (RDP) and the administrator password obtained from the Get Windows password

Install Terraform on the EC2 instance

Install Terraform on the Windows EC2 instance using the following steps:

  1. RDP into the EC2 instance you created.
  2. Install Terraform on the EC2 instance.

You need to update the environment variables to point to the directory where the Terraform executable is available.

  1. Under System Properties, on the Advanced tab, choose Environment Variables.

Environment Variables

  1. Choose the path variable.

Path Variables

  1. Choose New and enter the path where Terraform is installed. For this post, it’s in the C:\ directory.

Add Terraform to path variable

  1. Confirm Terraform is installed by entering the following command:

terraform -v

Check Terraform version

Optionally, you can use an editor like Visual Studio Code (VS Code) and add the Terraform extension to it.

Create a user for accessing AWS through code (AWS CLI and Terraform)

Next, we create an administrator user in IAM, which performs the operations on AWS through Terraform and the AWS Command Line Interface (AWS CLI). Complete the following steps:

  1. Create a new IAM user.
  2. On the IAM console, download and save the access key and user key.

Create New IAM User

  1. Install the AWS CLI.
  2. Launch the AWS CLI and run aws configure and pass the access key ID, secret access key, and default AWS Region.

This prevents the AWS user name and password from being visible in plain text in the Terraform code and prevents accidental sharing when the code is committed to a code repository.

AWS Configure

Create a user for Accessing Redshift through code (Terraform)

Because we’re creating a Redshift cluster and subsequent operations, the administrator user name and password required for these processes (different than the admin role we created earlier for logging in to the AWS Management Console) needs to be invoked in the code. To do this securely, we use Secrets Manager to store the user name and password. We write code in Terraform to access these credentials during the cluster create operation. Complete the following steps:

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret.

Store a New Secret

  1. For Secret type, select Credentials for Amazon Redshift data warehouse.
  2. Enter your credentials.

Choose Secret Type

Set up Terraform

Complete the following steps to set up Terraform:

  1. Create a folder or directory for storing all your Terraform code.
  2. Open the VS Code editor and browse to your folder.
  3. Choose New File and enter a name for the file using the .tf extension

Now we’re ready to start writing our code starting with defining providers. The providers definition is a way for Terraform to get the necessary APIs to interact with AWS.

  1. Configure a provider for Terraform:
terraform {
required_providers {
aws = {
source  = "hashicorp/aws"
version = "5.53.0"
}
}
}

# Configure the AWS Provider
provider "aws" {
region = "us-east-1"
}
  1. Access the admin credentials for the Amazon Redshift admin user:
data "aws_secretsmanager_secret_version" "creds" {
# Fill in the name you gave to your secret
secret_id = "terraform-creds"
}
/*json decode to parse the secret*/
locals {
terraform-creds = jsondecode(
data.aws_secretsmanager_secret_version.creds.secret_string
)
}

Create a Redshift cluster

To create a Redshift cluster, use the aws_redshift_cluster resource:

# Create an encrypted Amazon Redshift cluster

resource "aws_redshift_cluster" "dw_cluster" {
cluster_identifier = "tf-example-redshift-cluster"
database_name      = "dev"
master_username    = local.terraform-creds.username
master_password    = local.terraform-creds.password
node_type          = "ra3.xlplus"
cluster_type       = "multi-node"
publicly_accessible = "false"
number_of_nodes    = 2
encrypted         = true
kms_key_id        = local.RedshiftClusterEncryptionKeySecret.arn
enhanced_vpc_routing = true
cluster_subnet_group_name="<<your-cluster-subnet-groupname>>"
}

In this example, we create a Redshift cluster called tf-example-redshift-cluster, using the ra3.xlplus node type 2 node cluster. We use the credentials from Secrets Manager and jsondecode to access these values. This makes sure the user name and password aren’t passed in plain text.

Add an IAM role to the cluster

Because we didn’t have the option to associate an IAM role during cluster creation, we do so now with the following code:

resource "aws_redshift_cluster_iam_roles" "cluster_iam_role" {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
iam_role_arns      = ["arn:aws:iam::yourawsaccountId:role/service-role/yourIAMrolename"]
}

Enable Redshift cluster operations

Performing operations on the Redshift cluster such as resize, pause, and resume on a schedule offers a more practical use of these operations. Therefore, we create two policies: one that allows the Amazon Redshift scheduler service and one that allows the cluster pause, resume, and resize operations. Then we create a role that has both policies attached to it.

You can perform these steps directly from the console and then referenced in Terraform code. The following example demonstrates the code snippets to create policies and a role, and then to attach the policy to the role.

  1. Create the Amazon Redshift scheduler policy document and create the role that assumes this policy:
#define policy document to establish the Trust Relationship between the role and the entity (Redshift scheduler)

data "aws_iam_policy_document" "assume_role_scheduling" {
statement {
effect = "Allow"
principals {
type        = "Service"
identifiers = ["scheduler.redshift.amazonaws.com"]
}
actions = ["sts:AssumeRole"]
}
}

#create a role that has the above trust relationship attached to it, so that it can invoke the redshift scheduling service
resource "aws_iam_role" "scheduling_role" {
name               = "redshift_scheduled_action_role"
assume_role_policy = data.aws_iam_policy_document.assume_role_scheduling.json
}
  1. Create a policy document and policy for Amazon Redshift operations:
/*define the policy document for other redshift operations*/

data "aws_iam_policy_document" "redshift_operations_policy_definition" {
statement {
effect = "Allow"
actions = [
"redshift:PauseCluster",
"redshift:ResumeCluster",
"redshift:ResizeCluster",
]
resources = ["arn:aws:redshift:*:youraccountid:cluster:*"]
}
}

/*create the policy and add the above data (json) to the policy*/
resource "aws_iam_policy" "scheduling_actions_policy" {
name   = "redshift_scheduled_action_policy"
policy = data.aws_iam_policy_document.redshift_operations_policy_definition.json
}
  1. Attach the policy to the IAM role:
/*connect the policy and the role*/
resource "aws_iam_role_policy_attachment" "role_policy_attach" {
policy_arn = aws_iam_policy.scheduling_actions_policy.arn
role       = aws_iam_role.scheduling_role.name
}
  1. Pause the Redshift cluster:
#pause a cluster
resource "aws_redshift_scheduled_action" "pause_operation" {
name     = "tf-redshift-scheduled-action-pause"
schedule = "cron(00 22 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
pause_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-pause that pauses the cluster at 10:00 PM every day as a cost-saving action.

  1. Resume the Redshift cluster:
name     = "tf-redshift-scheduled-action-resume"
schedule = "cron(15 07 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
resume_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-resume that resumes the cluster at 7:15 AM every day in time for business operations to start using the Redshift cluster.

  1. Resize the Redshift cluster:
#resize a cluster
resource "aws_redshift_scheduled_action" "resize_operation" {
name     = "tf-redshift-scheduled-action-resize"
schedule = "cron(15 14 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
resize_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
cluster_type = "multi-node"
node_type = "ra3.xlplus"
number_of_nodes = 4 /*increase the number of nodes using resize operation*/
classic = true /*default behavior is to use elastic resizeboolean value if we want to use classic resize*/
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-resize that increases the nodes from 2 to 4. You can do other operations like change the node type as well. By default, elastic resize will be used, but if you want to use classic resize, you have to pass the parameter classic = true as shown in the preceding code. This can be a scheduled action to anticipate the needs of peak periods and resize appripriately for that duration. You can then downsize using similar code during non-peak times.

Test the solution

We apply the following code to test the solution. Change the resource details accordingly, such as account ID and Region name.

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "5.53.0"
    }
  }
}

# Configure the AWS Provider
provider "aws" {
  region = "us-east-1"
}

# access secrets stored in secret manager
data "aws_secretsmanager_secret_version" "creds" {
  # Fill in the name you gave to your secret
  secret_id = "terraform-creds"
}

/*json decode to parse the secret*/
locals {
  terraform-creds = jsondecode(
    data.aws_secretsmanager_secret_version.creds.secret_string
  )
}

#Store the arn of the KMS key to be used for encrypting the redshift cluster

data "aws_secretsmanager_secret_version" "encryptioncreds" {
  secret_id = "RedshiftClusterEncryptionKeySecret"
}
locals {
  RedshiftClusterEncryptionKeySecret = jsondecode(
    data.aws_secretsmanager_secret_version.encryptioncreds.secret_string
  )
}

# Create an encrypted Amazon Redshift cluster
resource "aws_redshift_cluster" "dw_cluster" {
  cluster_identifier = "tf-example-redshift-cluster"
  database_name      = "dev"
  master_username    = local.terraform-creds.username
  master_password    = local.terraform-creds.password
  node_type          = "ra3.xlplus"
  cluster_type       = "multi-node"
  publicly_accessible = "false"
  number_of_nodes    = 2
  encrypted         = true
  kms_key_id        = local.RedshiftClusterEncryptionKeySecret.arn
  enhanced_vpc_routing = true
  cluster_subnet_group_name="redshiftclustersubnetgroup-yuu4sywme0bk"
}

#add IAM Role to the Redshift cluster

resource "aws_redshift_cluster_iam_roles" "cluster_iam_role" {
  cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
  iam_role_arns      = ["arn:aws:iam::youraccountid:role/service-role/yourrolename"]
}

#for audit logging please create an S3 bucket which has read write privileges for Redshift service, this example does not include S3 bucket creation.

resource "aws_redshift_logging" "redshiftauditlogging" {
  cluster_identifier   = aws_redshift_cluster.dw_cluster.cluster_identifier
  log_destination_type = "s3"
  bucket_name          = "your-s3-bucket-name"
}

#to do operations like pause, resume, resize on a schedule we need to first create a role that has permissions to perform these operations on the cluster

#define policy document to establish the Trust Relationship between the role and the entity (Redshift scheduler)

data "aws_iam_policy_document" "assume_role_scheduling" {
  statement {
    effect = "Allow"
    principals {
      type        = "Service"
      identifiers = ["scheduler.redshift.amazonaws.com"]
    }

    actions = ["sts:AssumeRole"]
  }
}

#create a role that has the above trust relationship attached to it, so that it can invoke the redshift scheduling service
resource "aws_iam_role" "scheduling_role" {
  name               = "redshift_scheduled_action_role"
  assume_role_policy = data.aws_iam_policy_document.assume_role_scheduling.json
}

/*define the policy document for other redshift operations*/

data "aws_iam_policy_document" "redshift_operations_policy_definition" {
  statement {
    effect = "Allow"
    actions = [
      "redshift:PauseCluster",
      "redshift:ResumeCluster",
      "redshift:ResizeCluster",
    ]

    resources =  ["arn:aws:redshift:*:youraccountid:cluster:*"]
  }
}

/*create the policy and add the above data (json) to the policy*/

resource "aws_iam_policy" "scheduling_actions_policy" {
  name   = "redshift_scheduled_action_policy"
  policy = data.aws_iam_policy_document.redshift_operations_policy_definition.json
}

/*connect the policy and the role*/

resource "aws_iam_role_policy_attachment" "role_policy_attach" {
  policy_arn = aws_iam_policy.scheduling_actions_policy.arn
  role       = aws_iam_role.scheduling_role.name
}

#pause a cluster

resource "aws_redshift_scheduled_action" "pause_operation" {
  name     = "tf-redshift-scheduled-action-pause"
  schedule = "cron(00 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    pause_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
    }
  }
}

#resume a cluster

resource "aws_redshift_scheduled_action" "resume_operation" {
  name     = "tf-redshift-scheduled-action-resume"
  schedule = "cron(15 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    resume_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
    }
  }
}

#resize a cluster

resource "aws_redshift_scheduled_action" "resize_operation" {
  name     = "tf-redshift-scheduled-action-resize"
  schedule = "cron(15 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    resize_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
      cluster_type = "multi-node"
      node_type = "ra3.xlplus"
      number_of_nodes = 4 /*increase the number of nodes using resize operation*/
      classic = true /*default behavior is to use elastic resizeboolean value if we want to use classic resize*/
    }
  }
}

Run terraform plan to see a list of changes that will be made, as shown in the following screenshot.

Terraform plan

After you have reviewed the changes, use terraform apply to create the resources you defined.

Terraform Apply

You will be asked to enter yes or no before Terraform starts creating the resources.

Confirmation of apply

You can confirm that the cluster is being created on the Amazon Redshift console.

redshift cluster creation

After the cluster is created, the IAM roles and schedules for pause, resume, and resize operations are added, as shown in the following screenshot.

Terraform actions

You can also view these scheduled operations on the Amazon Redshift console.

Scheduled Actions

Clean up

If you deployed resources such as the Redshift cluster and IAM roles, or any of the other associated resources by running terraform apply, to avoid incurring charges on your AWS account, run terraform destroy to tear these resources down and clean up your environment.

Conclusion

Terraform offers a powerful and flexible solution for managing your infrastructure as code using a declarative approach, with a cloud-agnostic nature, resource orchestration capabilities, and strong community support. This post provided a comprehensive guide to using Terraform to deploy a Redshift cluster and perform important operations such as resize, resume, and pause on the cluster. Embracing IaC and using the right tools, such as Workflow Studio, VS Code, and Terraform, will enable you to build scalable and maintainable distributed applications, and automate processes.


About the Authors

Amit Ghodke is an Analytics Specialist Solutions Architect based out of Austin. He has worked with databases, data warehouses and analytical applications for the past 16 years. He loves to help customers implement analytical solutions at scale to derive maximum business value.

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Amazon Managed Service for Apache Flink now supports Apache Flink version 1.19

Post Syndicated from Francisco Morillo original https://aws.amazon.com/blogs/big-data/amazon-managed-service-for-apache-flink-now-supports-apache-flink-version-1-19/

Apache Flink is an open source distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing and event time semantics. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink offers a fully managed, serverless experience in running Apache Flink applications and now supports Apache Flink 1.19.1, the latest stable version of Apache Flink at the time of writing. AWS led the community release of the version 1.19.1, which introduces a number of bug fixes over version 1.19.0, released in March 2024.

In this post, we discuss some of the interesting new features and configuration changes available for Managed Service for Apache Flink introduced with this new release. In every Apache Flink release, there are exciting new experimental features. However, in this post, we are going to focus on the features most accessible to the user with this release.

Connectors

With the release of version 1.19.1, the Apache Flink community also released new connector versions for the 1.19 runtime. Starting from 1.16, Apache Flink introduced a new connector version numbering, following the pattern <connector-version>-<flink-version>. It’s recommended to use connectors for the runtime version you are using. Refer to Using Apache Flink connectors to stay updated on any future changes regarding connector versions and compatibility.

SQL

Apache Flink 1.19 brings new features and improvements, particularly in the SQL API. These enhancements are designed to provide more flexibility, better performance, and ease of use for developers working with Flink’s SQL API. In this section, we delve into some of the most notable SQL enhancements introduced in this release.

State TTL per operator

Configuring state TTL at the operator level was introduced in Apache Flink 1.18 but wasn’t easily accessible to the end-user. To modify an operator TTL, you had to export the plan at development time, modify it manually, and force Apache Flink to use the edited plan instead of generating a new one when the application starts. The new features added to Flink SQL in 1.19 simplify this process by allowing TTL configurations directly through SQL hints, eliminating the need for JSON plan manipulation.

The following code shows examples of how to use SQL hints to set state TTL:

-- State TTL for Joins
SELECT /*+ STATE_TTL('Orders' = '1d', 'Customers' = '20d') */ 
  *
FROM Orders 
LEFT OUTER JOIN Customers 
  ON Orders.o_custkey = Customers.c_custkey;

-- State TTL for Aggregations
SELECT /*+ STATE_TTL('o' = '1d') */ 
  o_orderkey, SUM(o_totalprice) AS revenue 
FROM Orders AS o 
GROUP BY o_orderkey;

Session window table-valued functions

Windows are at the heart of processing infinite streams in Apache Flink, splitting the stream into finite buckets for computations. Before 1.19, Apache Flink provided the following types of window table-value functions (TVFs):

  • Tumble windows – Fixed-size, non-overlapping windows
  • Hop windows – Fixed-size, overlapping windows with a specified hop interval
  • Cumulate windows – Increasingly larger windows that start at the same point but grow over time

With the Apache Flink 1.19 release, it has enhanced its SQL capabilities by supporting session window TVFs in streaming mode, allowing for more sophisticated and flexible windowing operations directly within SQL queries. Applications can create dynamic windows that group elements based on session gaps, now supported in streaming mode. The following code shows an example:

-- Session window with partition keys
SELECT 
  * 
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));

-- Apply aggregation on the session windowed table with partition keys
SELECT 
  window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;

Mini-batch optimization for regular joins

When using the Table API or SQL, regular joins—standard equi-joins like a table SQL join, where time is not a factor—may induce a considerable overhead for the state backend, especially when using RocksDB.

Normally, Apache Flink processes standard joins one record at a time, looking up the state for a matching record in the other side of the join, updating the state with the input record, and emitting the resulting record. This may add considerable pressure on RocksDB, with multiple reads and writes for each record.

Apache Flink 1.19 introduces the ability to use mini-batch processing with equi-joins (FLIP-415). When enabled, Apache Flink will process regular joins not one record at a time, but in small batches, substantially reducing the pressure on the RocksDB state backend. Mini-batching adds some latency, which is controllable by the user. See, for example, the following SQL code (embedded in Java):

TableConfig tableConfig = tableEnv.getConfig();
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5s");
tableConfig.set("table.exec.mini-batch.size", "5000");

tableEnv.executeSql("CREATE TEMPORARY VIEW ab AS " +
  "SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content " +
  "FROM a LEFT JOIN b ON a.id = b.id";

With this configuration, Apache Flink will buffer up to 5,000 records or up to 5 seconds, whichever comes first, before processing the join for the entire mini-batch.

In Apache Flink 1.19, mini-batching only works for regular joins, not windowed or temporal joins. Mini-batching is disabled by default, and you have to explicitly enable it and set the batch size and latency for Flink to use it. Also, mini-batch settings are global, applied to all regular join of your application. At the time of writing, it’s not possible to set mini-batching per join statement.

AsyncScalarFunction

Before version 1.19, an important limitation of SQL and the Table API, compared to the Java DataStream API, was the lack of asynchronous I/O support. Any request to an external system, for example a database or a REST API, or even any AWS API call, using the AWS SDK, is synchronous and blocking. An Apache Flink’s subtask waits for the response before completing the processing of a record and proceeding to the next one. Practically, the roundtrip latency of each request was added to the processing latency for each processed record. Apache Flink’s Async I/O API removes this limitation, but it’s only available for the DataStream API and Java. Until version 1.19, there was no simple efficient workaround in SQL, the Table API, or Python.

Apache Flink 1.19 introduces the new AsyncScalarFunction, a user-defined function (UDF) that can be implemented using non-blocking calls to the external system, to support use cases similar to asynchronous I/O in SQL and the Table API.

This new type of UDF is only available in streaming mode. At the moment, it only supports ordered output. DataStream Async I/O also supports unordered output, which may further reduce latency when strict ordering isn’t required.

Python 3.11 support

Python 3.11 is now supported, and Python 3.7 support has been completely removed (FLINK-33029). Managed Service for Apache Flink currently uses the Python 3.11 runtime to run PyFlink applications. Python 3.11 is a bugfix only version of the runtime. Python 3.11 introduced several performance improvements and bug fixes, but no API breaking changes.

Performance improvements: Dynamic checkpoint interval

In the latest release of Apache Flink 1.19, significant enhancements have been made to improve checkpoint behavior. With this new release, it gives the application the capability to adjust checkpointing intervals dynamically based on whether the source is processing backlog data (FLIP-309).

In Apache Flink 1.19, you can now specify different checkpointing intervals based on whether a source operator is processing backlog data. This flexibility optimizes job performance by reducing checkpoint frequency during backlog phases, enhancing overall throughput. Extending checkpoint intervals allows Apache Flink to prioritize processing throughput over frequent state snapshots, thereby improving efficiency and performance.

To enable it, you need to define the execution.checkpointing.interval parameter for regular intervals and execution.checkpointing.interval-during-backlog to specify a longer interval when sources report processing backlog.

For example, if you want to run checkpoints every 60 seconds during normal processing, but extend to 10 minutes during the processing of backlogs, you can set the following:

  • execution.checkpointing.interval = 60s
  • execution.checkpointing.interval-during-backlog = 10m

In Amazon Managed Service for Apache Flink, the default checkpointing interval is configured by the application configuration (60 seconds by default). You don’t need to set the configuration parameter. To set a longer checkpointing interval during backlog processing, you can raise a support case to modify execution.checkpointing.interval-during-backlog. See Modifiable Flink configuration properties for further details about modifying Apache Flink configurations.

At the time of writing, dynamic checkpointing intervals are only supported by Apache Kafka source and FileSystem source connectors. If you use any other source connector, intervals during backlog are ignored, and Apache Flink runs a checkpoint at the default interval during backlog processing.

In Apache Flink, checkpoints are always injected in the flow from the sources. This feature only involves source connectors. The sink connectors you use in your application don’t affect this feature. For a deep dive into the Apache Flink checkpoint mechanism, see Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints.

More troubleshooting information: Job initialization and checkpoint traces

With FLIP-384, Apache Flink 1.19 introduces trace reporters, which show checkpointing and job initialization traces. As of 1.19, this trace information can be sent to the logs using Slf4j. In Managed Service for Apache Flink, this is now enabled by default. You can find checkpoint and job initialization details in Amazon CloudWatch Logs, with the other logs from the application.

Checkpoint traces contain valuable information about each checkpoint. You can find similar information on the Apache Flink Dashboard, but only for the latest checkpoints and only while the application is running. Conversely, in the logs, you can find the full history of checkpoints. The following is an example of a checkpoint trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker, 
  name=Checkpoint, 
  startTsMillis=1718779769305, 
  endTsMillis=1718779769542, 
  attributes={
    jobId=1b418a2404cbcf47ef89071f83f2dff9, 
    checkpointId=9774, 
    checkpointStatus=COMPLETED, 
    fullSize=9585, 
    checkpointedSize=9585
  }
}

Job initialization traces are generated when the job starts and recovers the state from a checkpoint or savepoint. You can find valuable statistics you can’t normally find elsewhere, including the Apache Flink Dashboard. The following is an example of a job initialization trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker,
  name=JobInitialization,
  startTsMillis=1718781201463,
  endTsMillis=1718781409657,
  attributes={
    maxReadOutputDataDurationMs=89,
    initializationStatus=COMPLETED,
    fullSize=26167879378,
    sumMailboxStartDurationMs=621,
    sumGateRestoreDurationMs=29,
    sumDownloadStateDurationMs=199482,
    sumRestoredStateSizeBytes.LOCAL_MEMORY=46764,
    checkpointId=270,
    sumRestoredStateSizeBytes.REMOTE=26167832614,
    maxDownloadStateDurationMs=199482,
    sumReadOutputDataDurationMs=90,
    maxRestoredStateSizeBytes.REMOTE=26167832614,
    maxInitializeStateDurationMs=201122,
    sumInitializeStateDurationMs=201241,
    jobId=8edb291c9f1c91c088db51b48de42308,
    maxGateRestoreDurationMs=22,
    maxMailboxStartDurationMs=391,
    maxRestoredStateSizeBytes.LOCAL_MEMORY=46764
  }
}

Checkpoint and job initialization traces are logged at INFO level. You can find them in CloudWatch Logs only if you configure a logging level of INFO or DEBUG in your Managed Service for Apache Flink application.

Managed Service for Apache Flink behavior change

As a fully managed service, Managed Service for Apache Flink controls some runtime configuration parameters to guarantee the stability of your application. For details about the Apache Flink settings that can be modified, see Apache Flink settings.

With the 1.19 runtime, if you programmatically modify a configuration parameter that is directly controlled by Managed Service for Apache Flink, you receive an explicit ProgramInvocationException when the application starts, explaining what parameter is causing the problem and preventing the application from starting. With runtime 1.18 or earlier, changes to parameters controlled by the managed service were silently ignored.

To learn more about how Managed Service for Apache Flink handles configuration changes in runtime 1.19 or later, refer to FlinkRuntimeException: “Not allowed configuration change(s) were detected”.

Conclusion

In this post, we explored some of the new relevant features and configuration changes introduced with Apache Flink 1.19, now supported by Managed Service for Apache Flink. This latest version brings numerous enhancements aimed at improving performance, flexibility, and usability for developers working with Apache Flink.

With the support of Apache Flink 1.19, Managed Service for Apache Flink now supports the latest released Apache Flink version. We have seen some of the interesting new features available for Flink SQL and PyFlink.

You can find more details about recent releases from the Apache Flink blog and release notes:

If you’re new to Apache Flink, we recommend our guide to choosing the right API and language and following the getting started guide to start using Managed Service for Apache Flink.

If you’re already running an application in Managed Service for Apache Flink, you can safely upgrade it in-place to the new 1.19 runtime.


About the Authors

Francisco Morillo is a Streaming Solutions Architect at AWS, specializing in real-time analytics architectures. With over five years in the streaming data space, Francisco has worked as a data analyst for startups and as a big data engineer for consultancies, building streaming data pipelines. He has deep expertise in Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink. Francisco collaborates closely with AWS customers to build scalable streaming data solutions and advanced streaming data lakes, ensuring seamless data processing and real-time insights.

Lorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Automate data loading from your database into Amazon Redshift using AWS Database Migration Service (DMS), AWS Step Functions, and the Redshift Data API

Post Syndicated from Ritesh Sinha original https://aws.amazon.com/blogs/big-data/automate-data-loading-from-your-database-into-amazon-redshift-using-aws-database-migration-service-dms-aws-step-functions-and-the-redshift-data-api/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

As more and more data is being generated, collected, processed, and stored in many different systems, making the data available for end-users at the right place and right time is a very important aspect for data warehouse implementation. A fully automated and highly scalable ETL process helps minimize the operational effort that you must invest in managing the regular ETL pipelines. It also provides timely refreshes of data in your data warehouse.

You can approach the data integration process in two ways:

  • Full load – This method involves completely reloading all the data within a specific data warehouse table or dataset
  • Incremental load – This method focuses on updating or adding only the changed or new data to the existing dataset in a data warehouse

This post discusses how to automate ingestion of source data that changes completely and has no way to track the changes. This is useful for customers who want to use this data in Amazon Redshift; some examples of such data are products and bills of materials without tracking details at the source.

We show how to build an automatic extract and load process from various relational database systems into a data warehouse for full load only. A full load is performed from SQL Server to Amazon Redshift using AWS Database Migration Service (AWS DMS). When Amazon EventBridge receives a full load completion notification from AWS DMS, ETL processes are run on Amazon Redshift to process data. AWS Step Functions is used to orchestrate this ETL pipeline. Alternatively, you could use Amazon Managed Workflows for Apache Airflow (Amazon MWAA), a managed orchestration service for Apache Airflow that makes it straightforward to set up and operate end-to-end data pipelines in the cloud.

Solution overview

The workflow consists of the following steps:

  1. The solution uses an AWS DMS migration task that replicates the full load dataset from the configured SQL Server source to a target Redshift cluster in a staging area.
  2. AWS DMS publishes the replicationtaskstopped event to EventBridge when the replication task is complete, which invokes an EventBridge rule.
  3. EventBridge routes the event to a Step Functions state machine.
  4. The state machine calls a Redshift stored procedure through the Redshift Data API, which loads the dataset from the staging area to the target production tables. With this API, you can also access Redshift data with web-based service applications, including AWS Lambda.

The following architecture diagram highlights the end-to-end solution using AWS services.

In the following sections, we demonstrate how to create the full load AWS DMS task, configure the ETL orchestration on Amazon Redshift, create the EventBridge rule, and test the solution.

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  • An AWS account
  • A SQL Server database configured as a replication source for AWS DMS
  • A Redshift cluster to serve as the target database
  • An AWS DMS replication instance to migrate data from source to target
  • A source endpoint pointing to the SQL Server database
  • A target endpoint pointing to the Redshift cluster

Create the full load AWS DMS task

Complete the following steps to set up your migration task:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Choose Create task.
  3. For Task identifier, enter a name for your task, such as dms-full-dump-task.
  4. Choose your replication instance.
  5. Choose your source endpoint.
  6. Choose your target endpoint.
  7. For Migration type, choose Migrate existing data.

  1. In the Table mapping section, under Selection rules, choose Add new selection rule
  2. For Schema, choose Enter a schema.
  3. For Schema name, enter a name (for example, dms_sample).
  4. Keep the remaining settings as default and choose Create task.

The following screenshot shows your completed task on the AWS DMS console.

Create Redshift tables

Create the following tables on the Redshift cluster using the Redshift query editor:

  • dbo.dim_cust – Stores customer attributes:
CREATE TABLE dbo.dim_cust (
cust_key integer ENCODE az64,
cust_id character varying(10) ENCODE lzo,
cust_name character varying(100) ENCODE lzo,
cust_city character varying(50) ENCODE lzo,
cust_rev_flg character varying(1) ENCODE lzo
)

DISTSTYLE AUTO;
  • dbo.fact_sales – Stores customer sales transactions:
CREATE TABLE dbo.fact_sales (
order_number character varying(20) ENCODE lzo,
cust_key integer ENCODE az64,
order_amt numeric(18,2) ENCODE az64
)

DISTSTYLE AUTO;
  • dbo.fact_sales_stg – Stores daily customer incremental sales transactions:
CREATE TABLE dbo.fact_sales_stg (
order_number character varying(20) ENCODE lzo,
cust_id character varying(10) ENCODE lzo,
order_amt numeric(18,2) ENCODE az64
)

DISTSTYLE AUTO;

Use the following INSERT statements to load sample data into the sales staging table:

insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (100,1,200);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (101,1,300);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (102,2,25);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (103,2,35);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (104,3,80);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (105,3,45);

Create the stored procedures

In the Redshift query editor, create the following stored procedures to process customer and sales transaction data:

  • Sp_load_cust_dim() – This procedure compares the customer dimension with incremental customer data in staging and populates the customer dimension:
CREATE OR REPLACE PROCEDURE dbo.sp_load_cust_dim()
LANGUAGE plpgsql
AS $$
BEGIN
truncate table dbo.dim_cust;
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (1,100,'abc','chicago');
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (2,101,'xyz','dallas');
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (3,102,'yrt','new york');
update dbo.dim_cust
set cust_rev_flg=case when cust_city='new york' then 'Y' else 'N' end
where cust_rev_flg is null;
END;
$$
  • sp_load_fact_sales() – This procedure does the transformation for incremental order data by joining with the date dimension and customer dimension and populates the primary keys from the respective dimension tables in the final sales fact table:
CREATE OR REPLACE PROCEDURE dbo.sp_load_fact_sales()
LANGUAGE plpgsql
AS $$
BEGIN
--Process Fact Sales
insert into dbo.fact_sales
select
sales_fct.order_number,
cust.cust_key as cust_key,
sales_fct.order_amt
from dbo.fact_sales_stg sales_fct
--join to customer dim
inner join (select * from dbo.dim_cust) cust on sales_fct.cust_id=cust.cust_id;
END;
$$

Create the Step Functions state machine

Complete the following steps to create the state machine redshift-elt-load-customer-sales. This state machine is invoked as soon as the AWS DMS full load task for the customer table is complete.

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose Create state machine.
  3. For Template, choose Blank.
  4. On the Actions dropdown menu, choose Import definition to import the workflow definition of the state machine.

  1. Open your preferred text editor and save the following code as an ASL file extension (for example, redshift-elt-load-customer-sales.ASL). Provide your Redshift cluster ID and the secret ARN for your Redshift cluster.
{
"Comment": "State Machine to process ETL for Customer Sales Transactions",
"StartAt": "Load_Customer_Dim",
"States": {
"Load_Customer_Dim": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "redshiftcluster-abcd",
"Database": "dev",
"Sql": "call dbo.sp_load_cust_dim()",
"SecretArn": "arn:aws:secretsmanager:us-west-2:xxx:secret:rs-cluster-secret-abcd"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"Next": "Wait on Load_Customer_Dim"
},
"Wait on Load_Customer_Dim": {
"Type": "Wait",
"Seconds": 30,
"Next": "Check_Status_Load_Customer_Dim"
},

"Check_Status_Load_Customer_Dim": {
"Type": "Task",
"Next": "Choice",
"Parameters": {
"Id.$": "$.Id"
},

"Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement"
},

"Choice": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.Status",
"StringEquals": "FINISHED"
},
"Next": "Wait on Load_Customer_Dim"
}
],
"Default": "Load_Sales_Fact"
},
"Load_Sales_Fact": {
"Type": "Task",
"End": true,
"Parameters": {
"ClusterIdentifier": "redshiftcluster-abcdef”,
"Database": "dev",
"Sql": "call dbo.sp_load_fact_sales()",
"SecretArn": "arn:aws:secretsmanager:us-west-2:xxx:secret:rs-cluster-secret-abcd"
},

"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement"
}
}
}
  1. Choose Choose file and upload the ASL file to create a new state machine.

  1. For State machine name, enter a name for the state machine (for example, redshift-elt-load-customer-sales).
  2. Choose Create.

After the successful creation of the state machine, you can verify the details as shown in the following screenshot.

The following diagram illustrates the state machine workflow.

The state machine includes the following steps:

  • Load_Customer_Dim – Performs the following actions:
    • Passes the stored procedure sp_load_cust_dim to the execute-statement API to run in the Redshift cluster to load the incremental data for the customer dimension
    • Sends data back the identifier of the SQL statement to the state machine
  • Wait_on_Load_Customer_Dim – Waits for at least 15 seconds
  • Check_Status_Load_Customer_Dim – Invokes the Data API’s describeStatement to get the status of the API call
  • is_run_Load_Customer_Dim_complete – Routes the next step of the ETL workflow depending on its status:
    • FINISHED – Passes the stored procedure Load_Sales_Fact to the execute-statement API to run in the Redshift cluster, which loads the incremental data for fact sales and populates the corresponding keys from the customer and date dimensions
    • All other statuses – Goes back to the wait_on_load_customer_dim step to wait for the SQL statements to finish

The state machine redshift-elt-load-customer-sales loads the dim_cust, fact_sales_stg, and fact_sales tables when invoked by the EventBridge rule.

As an optional step, you can set up event-based notifications on completion of the state machine to invoke any downstream actions, such as Amazon Simple Notification Service (Amazon SNS) or further ETL processes.

Create an EventBridge rule

EventBridge sends event notifications to the Step Functions state machine when the full load is complete. You can also turn event notifications on or off in EventBridge.

Complete the following steps to create the EventBridge rule:

  1. On the EventBridge console, in the navigation pane, choose Rules.
  2. Choose Create rule.
  3. For Name, enter a name (for example, dms-test).
  4. Optionally, enter a description for the rule.
  5. For Event bus, choose the event bus to associate with this rule. If you want this rule to match events that come from your account, select AWS default event bus. When an AWS service in your account emits an event, it always goes to your account’s default event bus.
  6. For Rule type, choose Rule with an event pattern.
  7. Choose Next.
  8. For Event source, choose AWS events or EventBridge partner events.
  9. For Method, select Use pattern form.
  10. For Event source, choose AWS services.
  11. For AWS service, choose Database Migration Service.
  12. For Event type, choose All Events.
  13. For Event pattern, enter the following JSON expression, which looks for the REPLICATON_TASK_STOPPED status for the AWS DMS task:
{
"source": ["aws.dms"],
"detail": {
"eventId": ["DMS-EVENT-0079"],
"eventType": ["REPLICATION_TASK_STOPPED"],
"detailMessage": ["Stop Reason FULL_LOAD_ONLY_FINISHED"],
"type": ["REPLICATION_TASK"],
"category": ["StateChange"]
}
}

  1. For Target type, choose AWS service.
  2. For AWS service, choose Step Functions state machine.
  3. For State machine name, enter redshift-elt-load-customer-sales.
  4. Choose Create rule.

The following screenshot shows the details of the rule created for this post.

Test the solution

Run the task and wait for the workload to complete. This workflow moves the full volume data from the source database to the Redshift cluster.

The following screenshot shows the load statistics for the customer table full load.

AWS DMS provides notifications when an AWS DMS event occurs, for example the completion of a full load or if a replication task has stopped.

After the full load is complete, AWS DMS sends events to the default event bus for your account. The following screenshot shows an example of invoking the target Step Functions state machine using the rule you created.

We configured the Step Functions state machine as a target in EventBridge. This enables EventBridge to invoke the Step Functions workflow in response to the completion of an AWS DMS full load task.

Validate the state machine orchestration

When the entire customer sales data pipeline is complete, you may go through the entire event history for the Step Functions state machine, as shown in the following screenshots.

Limitations

The Data API and Step Functions AWS SDK integration offers a robust mechanism to build highly distributed ETL applications within minimal developer overhead. Consider the following limitations when using the Data API and Step Functions:

Clean up

To avoid incurring future charges, delete the Redshift cluster, AWS DMS full load task, AWS DMS replication instance, and Step Functions state machine that you created as part of this post.

Conclusion

In this post, we demonstrated how to build an ETL orchestration for full loads from operational data stores using the Redshift Data API, EventBridge, Step Functions with AWS SDK integration, and Redshift stored procedures.

To learn more about the Data API, see Using the Amazon Redshift Data API to interact with Amazon Redshift clusters and Using the Amazon Redshift Data API.


About the authors

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Praveen Kadipikonda is a Senior Analytics Specialist Solutions Architect at AWS based out of Dallas. He helps customers build efficient, performant, and scalable analytic solutions. He has worked with building databases and data warehouse solutions for over 15 years.

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.

How Cloudinary transformed their petabyte scale streaming data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-cloudinary-transformed-their-petabyte-scale-streaming-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Amit Gilad, Alex Dickman and Itay Takersman from Cloudinary. 

Enterprises and organizations across the globe want to harness the power of data to make better decisions by putting data at the center of every decision-making process. Data-driven decisions lead to more effective responses to unexpected events, increase innovation and allow organizations to create better experiences for their customers. However, throughout history, data services have held dominion over their customers’ data. Despite the potential separation of storage and compute in terms of architecture, they are often effectively fused together. This amalgamation empowers vendors with authority over a diverse range of workloads by virtue of owning the data. This authority extends across realms such as business intelligence, data engineering, and machine learning thus limiting the tools and capabilities that can be used.

The landscape of data technology is swiftly advancing, driven frequently by projects led by the open source community in general and the Apache foundation specifically. This evolving open source landscape allows customers complete control over data storage, processing engines and permissions expanding the array of available options significantly. This approach also encourages vendors to compete based on the value they provide to businesses, rather than relying on potential fusing of storage and compute. This fosters a competitive environment that prioritizes customer acquisition and prompts vendors to differentiate themselves through unique features and offerings that cater directly to the specific needs and preferences of their clientele.

A modern data strategy redefines and enables sharing data across the enterprise and allows for both reading and writing of a singular instance of the data using an open table format. The open table format accelerates companies’ adoption of a modern data strategy because it allows them to use various tools on top of a single copy of the data.

Cloudinary is a cloud-based media management platform that provides a comprehensive set of tools and services for managing, optimizing, and delivering images, videos, and other media assets on websites and mobile applications. It’s widely used by developers, content creators, and businesses to streamline their media workflows, enhance user experiences, and optimize content delivery.

In this blog post, we dive into different data aspects and how Cloudinary breaks the two concerns of vendor locking and cost efficient data analytics by using Apache Iceberg, Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon EMR, and AWS Glue.

Short overview of Cloudinary’s infrastructure

Cloudinary infrastructure handles over 20 billion requests daily with every request generating event logs. Various data pipelines process these logs, storing petabytes (PBs) of data per month, which after processing data stored on Amazon S3, are then stored in Snowflake Data Cloud. These datasets serve as a critical resource for Cloudinary internal teams and data science groups to allow detailed analytics and advanced use cases.

Until recently, this data was mostly prepared by automated processes and aggregated into results tables, used by only a few internal teams. Cloudinary struggled to use this data for additional teams who had more online, real time, lower-granularity, dynamic usage requirements. Making petabytes of data accessible for ad-hoc reports became a challenge as query time increased and costs skyrocketed along with growing compute resource requirements. Cloudinary data retention for the specific analytical data discussed in this post was defined as 30 days. However, new use cases drove the need for increased retention, which would have led to significantly higher cost.

The data is flowing from Cloudinary log providers into files written into Amazon S3 and notified through events pushed to Amazon Simple Queue Service (Amazon SQS). Those SQS events are ingested by a Spark application running in Amazon EMR Spark, which parses and enriches the data. The processed logs are written in Apache Parquet format back to Amazon S3 and then automatically loaded to a Snowflake table using Snowpipe.

Why Cloudinary chose Apache Iceberg

Apache Iceberg is a high-performance table format for huge analytic workloads. Apache Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for processing engines such as Apache Spark, Trino, Apache Flink, Presto, Apache Hive, and Impala to safely work with the same tables at the same time.

A solution based on Apache Iceberg encompasses complete data management, featuring simple built-in table optimization capabilities within an existing storage solution. These capabilities, along with the ability to use multiple engines on top of a singular instance of data, helps avoid the need for data movement between various solutions.

While exploring the various controls and options in configuring Apache Iceberg, Cloudinary had to adapt its data to use AWS Glue Data Catalog, as well as move a significant volume of data to Apache Iceberg on Amazon S3. At this point it became clear that costs would be significantly reduced, and while it had been a key factor since the planning phase, it was now possible to get concrete numbers. One example is that Cloudinary was now able to store 6 months of data for the same storage price that was previously paid for storing 1 month of data. This cost saving was achieved by using Amazon S3 storage tiers as well as improved compression (Zstandard), further enhanced by the fact that Parquet files were sorted.

Since Apache Iceberg is well supported by AWS data services and Cloudinary was already using Spark on Amazon EMR, they could integrate writing to Data Catalog and start an additional Spark cluster to handle data maintenance and compaction. As exploration continued with Apache Iceberg, some interesting performance metrics were found. For example, for certain queries, Athena runtime was 2x–4x faster than Snowflake.

Integration of Apache Iceberg

The integration of Apache Iceberg was done before loading data to Snowflake. The data is written to an Iceberg table using Apache Parquet data format and AWS Glue as the data catalog. In addition, a Spark application on Amazon EMR runs in the background handling compaction of the Parquet files to optimal size for querying through various tools such as Athena, Trino running on top of EMR, and Snowflake.

Challenges faced

Cloudinary faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing ingestion
  • Solving the small files problem to improve query performance
  • Cost effectively maintaining Apache Iceberg tables
  • Choosing the right query engine

In this section, we describe each of these challenges and the solutions implemented to address them. Many of the tests to check performance and volumes of data scanned have used Athena because it provides a simple to use, fully serverless, cost effective, interface without the need to setup infrastructure.

Determining optimal table partitioning

Apache Iceberg makes partitioning easier for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg tables can be configured to map regular columns to the partition keys. Users don’t need to maintain partition columns or even understand the physical table layout to get fast and accurate query results.

Iceberg has several partitioning options. One example is when partitioning timestamps, which can be done by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Iceberg can also partition categorical column values by identity, hash buckets, or truncation. In addition, Iceberg partitioning is user-friendly because it also allows partition layouts to evolve over time without breaking pre-written queries. For example, when using daily partitions and the query pattern changes over time to be based on hours, it’s possible to evolve the partitions to hourly ones, thus making queries more efficient. When evolving such a partition definition, the data in the table prior to the change is unaffected, as is its metadata. Only data that is written to the table after the evolution is partitioned with the new definition, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that need to be accessed; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical storage. This concept makes Iceberg extremely versatile.

Determining the correct partitioning is key when working with large data sets because it affects query performance and the amount of data being scanned. Because this migration was from existing tables from Snowflake native storage to Iceberg, it was crucial to test and provide a solution with the same or better performance for the existing workload and types of queries.

These tests were possible due to Apache Iceberg’s:

  1. Hidden partitions
  2. Partition transformations
  3. Partition evolution

These allowed altering table partitions and testing which strategy works best without data rewrite.

Here are a few partitioning strategies that were tested:

  1. PARTITIONED BY (days(day), customer_id)
  2. PARTITIONED BY (days(day), hour(timestamp))
  3. PARTITIONED BY (days(day), bucket(N, customer_id))
  4. PARTITIONED BY (days(day))

Each partitioning strategy that was reviewed generated significantly different results both during writing as well as during query time. After careful results analysis, Cloudinary decided to partition the data by day and combine it with sorting, which allows them to sort data within partitions as would be elaborated in the compaction section.

Optimizing ingestion

Cloudinary receives billions of events in files from its providers in various formats and sizes and stores those on Amazon S3, resulting in terabytes of data processed and stored every day.

Because the data doesn’t come in a consistent manner and it’s not possible to predict the incoming rate and file size of the data, it was necessary to find a way of keeping cost down while maintaining high throughput.

This was achieved by using EventBridge to push each file received into Amazon SQS, where it was processed using Spark running on Amazon EMR in batches. This allowed processing the incoming data at high throughput and scale clusters according to queue size while keeping costs down.

Example of fetching 100 messages (files) from Amazon SQS with Spark:

var client = AmazonSQSClientBuilder.standard().withRegion("us-east-1").build()
var getMessageBatch: Iterable[Message] = DistributedSQSReceiver.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(10)).getMessages.asScala
sparkSession.sparkContext.parallelize(10) .map(_ => getMessageBatch) .collect().flatMap(_.toList) .toList

When dealing with a high data ingestion rate for a specific partition prefix, Amazon S3 might potentially throttle requests and return a 503 status code (service unavailable). To address this scenario, Cloudinary used an Iceberg table property called write.object-storage.enabled, which incorporates a hash prefix into the stored Amazon S3 object path. This approach was deemed efficient and effectively mitigated Amazon S3 throttling problems.

Solving the small file problem and improving query performance

In modern data architectures, stream processing engines such as Amazon EMR are often used to ingest continuous streams of data into data lakes using Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems:

  • It generates many small files that lead to longer query planning, which in turn can impact read performance.
  • Poor data clustering, which can make file pruning less effective. This typically occurs in the streaming process when there is insufficient new data to generate optimal file sizes for reading, such as 512 MB.

Because partition is a key factor in the number of files produced and Cloudinary’s data is time based and most queries use a time filter, it was decided to address the optimization of our data lake in multiple ways.

First, Cloudinary set all the necessary configurations that helped reduce the number of files while appending data in the table by setting write.target-file-size-bytes, which allows defining the default target file size. Setting spark.sql.shuffle.partitions in Spark can reduce the number of output files by controlling the number of partitions used during shuffle operations, which affects how data is distributed across tasks, consequently minimizing the number of output files generated after transformations or aggregations.

Because the above approach only addressed the small file problem but didn’t eliminate it entirely, Cloudinary used another capability of Apache Iceberg that can compact data files in parallel using Spark with the rewriteDataFiles action. This action combines small files into larger files to reduce metadata overhead and minimize the amount of Amazon S3 GetObject API operation usage.

Here is where it can get complicated. When running compaction, Cloudinary needed to choose which strategy to apply out of the three that Apache Iceberg offers; each one having its own advantages and disadvantages:

  1. Binpack – simply rewrites smaller files to a target size
  2. Sort – data sorting based on different columns
  3. Z-order – a technique to colocate related data in the same set of files

At first, the Binpack compaction strategy was evaluated. This strategy works fastest and combines small files together to reach the target file size defined and after running it a significant improvement in query performance was observed.

As mentioned previously, data was partitioned by day and most queries ran on a specific time range. Because data comes from external vendors and sometimes arrives late, it was noticed that when running queries on compacted days, a lot of data was being scanned, because the specific time range could reside across many files. The query engine (Athena, Snowflake, and Trino with Amazon EMR) needed to scan the entire partition to fetch only the relevant rows.

To increase query performance even further, Cloudinary decided to change the compaction process to use sort, so now data is partitioned by day and sorted by requested_at (timestamp when the action occurred) and customer ID.

This strategy is costlier for compaction because it needs to shuffle the data in order to sort it. However, after adopting this sort strategy, two things were noticeable: the same queries that ran before now scanned around 50 percent less data, and query run time was improved by 30 percent to 50 percent.

Cost effectively maintaining Apache Iceberg tables

Maintaining Apache Iceberg tables is crucial for optimizing performance, reducing storage costs, and ensuring data integrity. Iceberg provides several maintenance operations to keep your tables in good shape. By incorporating these operations Cloudinary were able to cost-effectively manage their Iceberg tables.

Expire snapshots

Each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.

Regularly expiring snapshots is recommended to delete data files that are no longer needed and to keep the size of table metadata small. Cloudinary decided to retain snapshots for up to 7 days to allow easier troubleshooting and handling of corrupted data which sometimes arrives from external sources and aren’t identified upon arrival. SparkActions.get().expireSnapshots(iceTable).expireOlderThan(TimeUnit.DAYS.toMillis(7)).execute()

Remove old metadata files

Iceberg keeps track of table metadata using JSON files. Each change to a table produces a new metadata file to provide atomicity.

Old metadata files are kept for history by default. Tables with frequent commits, like those written by streaming jobs, might need to regularly clean metadata files.

Configuring the following properties will make sure that only the latest ten metadata files are kept and anything older is deleted.

write.metadata.delete-after-commit.enabled=true 
write.metadata.previous-versions-max=10

Delete orphan files

In Spark and other distributed processing engines, when tasks or jobs fail, they might leave behind files that aren’t accounted for in the table metadata. Moreover, in certain instances, the standard snapshot expiration process might fail to identify files that are no longer necessary and not delete them.

Apache Iceberg offers a deleteOrphanFiles action that will take care of unreferenced files. This action might take a long time to complete if there are a large number of files in the data and metadata directories. A metadata or data file is considered orphan if it isn’t reachable by any valid snapshot. The set of actual files is built by listing the underlying storage using the Amazon S3 ListObjects operation, which makes this operation expensive. It’s recommended to run this operation periodically to avoid increased storage usage; however, too frequent runs can potentially offset this cost benefit.

A good example of how critical it is to run this procedure is to look at the following diagram, which shows how this procedure removed 112 TB of storage.

Rewriting manifest files

Apache Iceberg uses metadata in its manifest list and manifest files to speed up query planning and to prune unnecessary data files. Manifests in the metadata tree are automatically compacted in the order that they’re added, which makes queries faster when the write pattern aligns with read filters.

If a table’s write pattern doesn’t align with the query read filter pattern, metadata can be rewritten to re-group data files into manifests using rewriteManifests.

While Cloudinary already had a compaction process that optimized data files, they noticed that manifest files also required optimization. It turned out that in certain cases, Cloudinary reached over 300 manifest files—which were small, often under 8Mb in size—and due to late arriving data, manifest files were pointing to data in different partitions. This caused query planning to run for 12 seconds for each query.

Cloudinary initiated a separate scheduled process of rewriteManifests, and after it ran, the number of manifest files was reduced to approximately 170 files and as a result of more alignment between manifests and query filters (based on partitions), query planning was improved by three times to approximately 4 seconds.

Choosing the right query engine

As part of Cloudinary exploration aimed at testing various query engines, they initially outlined several key performance indicators (KPIs) to guide their search, including support for Apache Iceberg alongside integration with existing data sources such as MySQL and Snowflake, the availability of a web interface for effortless one-time queries, and cost optimization. In line with these criteria, they opted to evaluate various solutions including Trino on Amazon EMR, Athena, and Snowflake with Apache Iceberg support (at that time it was available as a Private Preview). This approach allowed for the assessment of each solution against defined KPIs, facilitating a comprehensive understanding of their capabilities and suitability for Cloudinary’s requirements.

Two of the more quantifiable KPIs that Cloudinary was planning to evaluate were cost and performance. Cloudinary realized early in the process that different queries and usage types can potentially benefit from different runtime engines. They decided to focus on four runtime engines.

Engine Details
Snowflake native XL data warehouse on top of data stored within Snowflake
Snowflake with Apache Iceberg support XL data warehouse on top of data stored in S3 in Apache Iceberg tables
Athena On-demand mode
Amazon EMR Trino Opensource Trino on top of eight nodes (m6g.12xl) cluster

The test included four types of queries that represent different production workloads that Cloudinary is running. They’re ordered by size and complexity from the simplest one to the most heavy and complex.

Query Description Data scanned Returned results set
Q1 Multi-day aggregation on a single tenant Single digit GBs <10 rows
Q2 Single-day aggregation by tenant across multiple tenant Dozens of GBs 100 thousand rows
Q3 Multi-day aggregation across multiple tenants Hundreds of GBs <10 rows
Q4 Heavy series of aggregations and transformations on a multi-tenant dataset to derive access metrics Single digit TBs >1 billion rows

The following graphs show the cost and performance of the four engines across the different queries. To avoid chart scaling issues, all costs and query durations were normalized based on Trino running on Amazon EMR. Cloudinary considered Query 4 to be less suitable for Athena because it involved processing and transforming extremely large volumes of complex data.

Some important aspects to consider are:

  • Cost for EMR running Trino was derived based on query duration only, without considering cluster set up, which on average launches in just under 5 minutes.
  • Cost for Snowflake (both options) was derived based on query duration only, without considering cold start (more than 10 seconds on average) and a Snowflake warehouse minimum charge of 1 minute.
  • Cost for Athena was based on the amount of data scanned; Athena doesn’t require cluster set up and the query queue time is less than 1 second.
  • All costs are based on list on-demand (OD) prices.
  • Snowflake prices are based on Standard edition.

The above chart shows that, from a cost perspective, Amazon EMR running Trino on top of Apache Iceberg tables was superior to other engines, in certain cases up to ten times less expensive. However, Amazon EMR setup requires additional expertise and skills compared to the no-code, no infrastructure management offered by Snowflake and Athena.

In terms of query duration, it’s noticeable that there’s no clear engine of choice for all types of queries. In fact, Amazon EMR, which was the most cost-effective option, was only fastest in two out of the four query types. Another interesting point is that Snowflake’s performance on top of Apache Iceberg is almost on-par with data stored within Snowflake, which adds another great option for querying their Apache Iceberg data-lake. The following table shows the cost and time for each query and product.

. Amazon EMR Trino Snowflake (XL) Snowflake (XL) Iceberg Athena
Query1 $0.01
5 seconds
$0.08
8 seconds
$0.07
8 seconds
$0.02
11 seconds
Query2 $0.12
107 seconds
$0.25
28 seconds
$0.35
39 seconds
$0.18
94 seconds
Query3 $0.17
147 seconds
$1.07
120 seconds
$1.88
211 seconds
$1.22
26 seconds
Query4 $6.43
1,237 seconds
$11.73
1,324 seconds
$12.71
1,430 seconds
N/A

Benchmarking conclusions

While every solution presents its own set of advantages and drawbacks—whether in terms of pricing, scalability, optimizing for Apache Iceberg, or the contrast between open source versus closed source—the beauty lies in not being constrained to a single choice. Embracing Apache Iceberg frees you from relying solely on a single solution. In certain scenarios where queries must be run frequently while scanning up to hundreds of gigabytes of data with an aim to evade warm-up periods and keep costs down, Athena emerged as the best choice. Conversely, when tackling hefty aggregations that demanded significant memory allocation while being mindful of cost, the preference leaned towards using Trino on Amazon EMR. Amazon EMR was significantly more cost efficient when running longer queries, because boot time cost could be discarded. Snowflake stood out as a great option when queries could be joined with other tables already residing within Snowflake. This flexibility allowed harnessing the strengths of each service, strategically applying them to suit the specific needs of various tasks without being confined to a singular solution.

In essence, the true power lies in the ability to tailor solutions to diverse requirements, using the strengths of different environments to optimize performance, cost, and efficiency.

Conclusion

Data lakes built on Amazon S3 and analytics services such as Amazon EMR and Amazon Athena, along with the open source Apache Iceberg framework, provide a scalable, cost-effective foundation for modern data architectures. It enables organizations to quickly construct robust, high-performance data lakes that support ACID transactions and analytics workloads. This combination is the most refined way to have an enterprise-grade open data environment. The availability of managed services and open source software helps companies to implement data lakes that meet their needs.

Since building a data lake solution on top of Apache Iceberg, Cloudinary has seen major enhancements. The data lake infrastructure enables Cloudinary to extend their data retention by six times while lowering the cost of storage by over 25 percent. Furthermore, query costs dropped by more than 25–40 percent thanks to the efficient querying capabilities of Apache Iceberg and the query optimizations provided in the Athena version 3, which is now based on Trino as its engine. The ability to retain data for longer as well as providing it to various stakeholders while reducing cost is a key component in allowing Cloudinary to be more data driven in their operation and decision-making processes.

Using a transactional data lake architecture that uses Amazon S3, Apache Iceberg, and AWS Analytics services can greatly enhance an organization’s data infrastructure. This allows for sophisticated analytics and machine learning, fueling innovation while keeping costs down and allowing the use of a plethora of tools and services without limits.


About the Authors

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan is an Apache Iceberg evangelist.

Amit Gilad is a Senior Data Engineer on the Data Infrastructure team at Cloudinar. He is currently leading the strategic transition from traditional data warehouses to a modern data lakehouse architecture, utilizing Apache Iceberg to enhance scalability and flexibility.

Alex Dickman is a Staff Data Engineer on the Data Infrastructure team at Cloudinary. He focuses on engaging with various internal teams to consolidate the team’s data infrastructure and create new opportunities for data applications, ensuring robust and scalable data solutions for Cloudinary’s diverse requirements.

Itay Takersman is a Senior Data Engineer at Cloudinary data infrastructure team. Focused on building resilient data flows and aggregation pipelines to support Cloudinary’s data requirements.

Revolutionizing data querying: Amazon Redshift and Visual Studio Code integration

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/revolutionizing-data-querying-amazon-redshift-and-visual-studio-code-integration/

In today’s data-driven landscape, the efficiency and accessibility of querying tools play a crucial role in driving businesses forward. Amazon Redshift recently announced integration with Visual Studio Code (), an action that transforms the way data practitioners engage with Amazon Redshift and reshapes your interactions and practices in data management. This innovation not only unlocks new possibilities, but also tackles long-standing challenges in data analytics and query handling.

While the Amazon Redshift query editor v2 (QE v2) offers a smooth experience for data analysts and business users, many organizations have data engineers and developers who rely on VS Code as their primary development tool. Traditionally, they had to use QE v2 for their development tasks, which wasn’t the most optimal solution. However, this new feature resolves that issue by enabling data engineers and developers to seamlessly integrate their development work within VS Code, enhancing their workflow efficiency.

Visual Studio Code’s integration simplifies access to database objects within Redshift data warehouses, offering an interface you’re already familiar with to run and troubleshoot your code.

By integrating Amazon Redshift Provisioned cluster, and Amazon Redshift Serverless with the popular and free VS Code, you can alleviate concerns about costs associated with third-party tools. This integration allows you to reduce or eliminate licensing expenses for query authoring and data visualization, because these functionalities are now available within the free VSCode editor.

The support for Amazon Redshift within VS Code marks a significant leap towards a more streamlined, cost-effective, and user-friendly data querying experience.

In this post, we explore how to kickstart your journey with Amazon Redshift using the AWS Toolkit for VS Code.

Solution overview

This post outlines the procedure for creating a secure and direct connection between your local VS Code environment and the Redshift cluster. Emphasizing both security and accessibility, this solution allows you to operate within the familiar VS Code interface while seamlessly engaging with your Redshift database.

The following diagram illustrates the VS Code connection to Amazon Redshift deployed in a private VPC.

To connect to a data warehouse using VS Code from the Toolkit, you can choose from the following methods:

  • Use a database user name and password
  • Use AWS Secrets Manager
  • Use temporary credentials (this option is only available with Amazon Redshift Provisioned cluster)

In the following sections, we show how to establish a connection with a database situated on an established provisioned cluster or a serverless data warehouse from the Toolkit.

Prerequisites

Before you begin using Amazon Redshift Provisioned Cluster  and Amazon Redshift Serverless with the AWS Toolkit for Visual Studio Code, make sure you’ve completed the following requirements:

  1. Connect to your AWS account using the Toolkit.
  2. Set up a Amazon Redshift or Amazon Redshift serverless data warehouse.

Establish a connection to your data warehouse using user credentials

To connect using the database user name and password, complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse (for example, US East (N. Virginia)).
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Database user name and password and provide the necessary information requested by the prompts.

After the Toolkit establishes the connection to your data warehouse, you will be able to view your available databases, tables, and schemas directly in the Toolkit explorer.

Establish a connection to your data warehouse using Secrets Manager

To connect using Secrets Manager, complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse.
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Secrets Manager and fill in the information requested at each prompt.

After the Toolkit establishes a successful connection to your data warehouse, you’ll gain visibility into your databases, tables, and schemas directly in the Toolkit explorer.

Establish a connection to your Amazon Redshift Provisioned cluster using Temporary credentials:

To connect using Temporary credentials complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse.
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Temporary Credentials and fill in the information requested at each prompt.

Run SQL statements

We have successfully established the connection. The next step involves running some SQL. The steps outlined in this section detail the process of generating and running SQL statements within your database using the Toolkit for Visual Studio Code.

  1. Navigate to the Toolkit explorer and expand Redshift, then choose the data warehouse that stores the desired database for querying.
  2. Choose Create Notebook and specify a file name and location for saving your notebook locally.
  3. Choose OK to open the notebook in your VS Code editor.
  4. Enter the following SQL statements into the VS Code editor, which will be stored in this notebook:
    create table promotion
    (
        p_promo_sk                integer               not null,
        p_promo_id                char(16)              not null,
        p_start_date_sk           integer                       ,
        p_end_date_sk             integer                       ,
        p_item_sk                 integer                       ,
        p_cost                    decimal(15,2)                 ,
        p_response_target         integer                       ,
        p_promo_name              char(50)                      ,
        p_channel_dmail           char(1)                       ,
        p_channel_email           char(1)                       ,
        p_channel_catalog         char(1)                       ,
        p_channel_tv              char(1)                       ,
        p_channel_radio           char(1)                       ,
        p_channel_press           char(1)                       ,
        p_channel_event           char(1)                       ,
        p_channel_demo            char(1)                       ,
        p_channel_details         varchar(100)                  ,
        p_purpose                 char(15)                      ,
        p_discount_active         char(1)                       ,
        primary key (p_promo_sk)
    ) diststyle all;
    
    create table reason
    (
        r_reason_sk               integer               not null,
        r_reason_id               char(16)              not null,
        r_reason_desc             char(100)                     ,
        primary key (r_reason_sk)
    ) diststyle all ;
    
    
    create table ship_mode
    (
        sm_ship_mode_sk           integer               not null,
        sm_ship_mode_id           char(16)              not null,
        sm_type                   char(30)                      ,
        sm_code                   char(10)                      ,
        sm_carrier                char(20)                      ,
        sm_contract               char(20)                      ,
        primary key (sm_ship_mode_sk)
    ) diststyle all;
    
    
    copy promotion from 's3://redshift-downloads/TPC-DS/2.13/1TB/promotion/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    copy reason from 's3://redshift-downloads/TPC-DS/2.13/1TB/reason/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    copy ship_mode from 's3://redshift-downloads/TPC-DS/2.13/1TB/ship_mode/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    
    
    select * from promotion limit 10;
    
    drop table promotion;
    drop table reason;
    drop table ship_mode;

  5. Choose Run All to run the SQL statements.

The output corresponding to your SQL statements will be visible below the entered statements within the editor.

Include markdown in a notebook

To include markdown in your notebook, complete the following steps:

  1. Access your notebook within the VS Code editor and choose Markdown to create a markdown cell.
  2. Enter your markdown content within the designated cell.
  3. Use the editing tools in the upper-right corner of the markdown cell to modify the markdown content as needed.

Congratulations, you have learned the art of using the VS Code editor to effectively interface with your Redshift environment.

Clean up

To remove the connection, complete the following steps:

  1. In the Toolkit explorer, expand Redshift, and choose the data warehouse containing your database.
  2. Choose the database (right-click) and choose Delete Connection.

Conclusion

In this post, we explored the process of using VS Code to establish a connection with Amazon Redshift, streamlining access to database objects within Redshift data warehouses.

You can learn about Amazon Redshift from Getting started with Amazon Redshift guide. Know more about write and run SQL queries directly in VS Code with the new AWS Toolkit for VS Code integration.


About the Author

Navnit Shukla, an AWS Specialist Solution Architect specializing in Analytics, is passionate about helping clients uncover valuable insights from their data. Leveraging his expertise, he develops inventive solutions that empower businesses to make informed, data-driven decisions. Notably, Navnit Shukla is the accomplished author of the book “Data Wrangling on AWS,” showcasing his expertise in the field.

Dive deep into security management: The Data on EKS Platform

Post Syndicated from Yuzhu Xiao original https://aws.amazon.com/blogs/big-data/dive-deep-into-security-management-the-data-on-eks-platform/

The construction of big data applications based on open source software has become increasingly uncomplicated since the advent of projects like Data on EKS, an open source project from AWS to provide blueprints for building data and machine learning (ML) applications on Amazon Elastic Kubernetes Service (Amazon EKS). In the realm of big data, securing data on cloud applications is crucial. This post explores the deployment of Apache Ranger for permission management within the Hadoop ecosystem on Amazon EKS. We show how Ranger integrates with Hadoop components like Apache Hive, Spark, Trino, Yarn, and HDFS, providing secure and efficient data management in a cloud environment. Join us as we navigate these advanced security strategies in the context of Kubernetes and cloud computing.

Overview of solution

The Amber Group’s Data on EKS Platform (DEP) is a Kubernetes-based, cloud-centered big data platform that revolutionizes the way we handle data in EKS environments. Developed by Amber Group’s Data Team, DEP integrates with familiar components like Apache Hive, Spark, Flink, Trino, HDFS, and more, making it a versatile and comprehensive solution for data management and BI platforms.

The following diagram illustrates the solution architecture.

Effective permission management is crucial for several key reasons:

  • Enhanced security – With proper permission management, sensitive data is only accessible to authorized individuals, thereby safeguarding against unauthorized access and potential security breaches. This is especially important in industries handling large volumes of sensitive or personal data.
  • Operational efficiency – By defining clear user roles and permissions, organizations can streamline workflows and reduce administrative overhead. This system simplifies managing user access, saves time for data security administrators, and minimizes the risk of configuration errors.
  • Scalability and compliance – As businesses grow and evolve, a scalable permission management system helps with smoothly adjusting user roles and access rights. This adaptability is essential for maintaining compliance with various data privacy regulations like GDPR and HIPAA, making sure that the organization’s data practices are legally sound and up to date.
  • Addressing big data challenges – Big data comes with unique challenges, like managing large volumes of rapidly evolving data across multiple platforms. Effective permission management helps tackle these challenges by controlling how data is accessed and used, providing data integrity and minimizing the risk of data breaches.

Apache Ranger is a comprehensive framework designed for data governance and security in Hadoop ecosystems. It provides a centralized framework to define, administer, and manage security policies consistently across various Hadoop components. Ranger specializes in fine-grained access control, offering detailed management of user permissions and auditing capabilities.

Ranger’s architecture is designed to integrate smoothly with various big data tools such as Hadoop, Hive, HBase, and Spark. The key components of Ranger include:

  • Ranger Admin – This is the central component where all security policies are created and managed. It provides a web-based user interface for policy management and an API for programmatic configuration.
  • Ranger UserSync – This service is responsible for syncing user and group information from a directory service like LDAP or AD into Ranger.
  • Ranger plugins – These are installed on each component of the Hadoop ecosystem (like Hive and HBase). Plugins pull policies from the Ranger Admin service and enforce them locally.
  • Ranger Auditing – Ranger captures access audit logs and stores them for compliance and monitoring purposes. It can integrate with external tools for advanced analytics on these audit logs.
  • Ranger Key Management Store (KMS) – Ranger KMS provides encryption and key management, extending Hadoop’s HDFS Transparent Data Encryption (TDE).

The following flowchart illustrates the priority levels for matching policies.

chartflow

The priority levels are as follows:

  • Deny list takes precedence over allow list
  • Deny list exclude has a higher priority than deny list
  • Allow list exclude has a higher priority than allow list

Our Amazon EKS-based deployment includes the following components:

  • S3 buckets – We use Amazon Simple Storage Service (Amazon S3) for scalable and durable Hive data storage
  • MySQL database – The database stores Hive metadata, facilitating efficient metadata retrieval and management
  • EKS cluster – The cluster is comprised of three distinct node groups: platform, Hadoop, and Trino, each tailored for specific operational needs
  • Hadoop cluster applications – These applications include HDFS for distributed storage and YARN for managing cluster resources
  • Trino cluster application – This application enables us to run distributed SQL queries for analytics
  • Apache Ranger – Ranger serves as the central security management tool for access policy across the big data components
  • OpenLDAP – This is integrated as the LDAP service to provide a centralized user information repository, essential for user authentication and authorization
  • Other cloud services resources – Other resources include a dedicated VPC for network security and isolation

By the end of this deployment process, we will have realized the following benefits:

  • A high-performing, scalable big data platform that can handle complex data workflows with ease
  • Enhanced security through centralized management of authentication and authorization, provided by the integration of OpenLDAP and Apache Ranger
  • Cost-effective infrastructure management and operation, thanks to the containerized nature of services on Amazon EKS
  • Compliance with stringent data security and privacy regulations, due to Apache Ranger’s policy enforcement capabilities

Deploy a big data cluster on Amazon EKS and configure Ranger for access control

In this section, we outline the process of deploying a big data cluster on AWS EKS and configuring Ranger for access control. We use AWS CloudFormation templates for quick deployment of a big data environment on Amazon EKS with Apache Ranger.

Complete the following steps:

  1. Upload the provided template to AWS CloudFormation, configure the stack options, and launch the stack to automate the deployment of the entire infrastructure, including the EKS cluster and Apache Ranger integration.

    cloudformation

    After a few minutes, you’ll have a fully functional big data environment with robust security management ready for your analytical workloads, as shown in the following screenshot.

  2. On the AWS web console, find the name of your EKS cluster. In this case, it’s dep-demo-eks-cluster-ap-northeast-1. For example:
    aws eks update-kubeconfig --name dep-eks-cluster-ap-northeast-1 --region ap-northeast-1
    
    ## Check pod status.
    
    kubectl get pods --namespace hadoop
    
    kubectl get pods --namespace platform
    
    kubectl get pods --namespace trino

  3. After Ranger Admin is successfully forwarded to port 6080 of localhost, go to localhost:6080 in your browser.
  4. Log in with user name admin and the password you entered earlier.

By default, you have already created two policies: Hive and Trino, and granted all access to the LDAP user you created (depadmin in this case).

Also, the LDAP user sync service is set up and will automatically sync all users from the LDAP service created in this template.

Example permission configuration

In a practical application within a company, permissions for tables and fields in the data warehouse are divided based on business departments, isolating sensitive data for different business units. This provides data security and orderly conduct of daily business operations. The following screenshots show an example business configuration.

The following is an example of an Apache Ranger permission configuration.

The following screenshots show users associated with roles.

When performing data queries, using Hive and Spark as examples, we can demonstrate the comparison before and after permission configuration.

The following screenshot shows an example of Hive SQL (running on superset) with privileges denied.

The following screenshot shows an example of Spark SQL (running on IDE) with privileges denied.

The following screenshot shows an example of Spark SQL (running on IDE) with permissions permitting.

Based on this example and considering your enterprise requirements, it becomes feasible and flexible to manage permissions in the data warehouse effectively.

Conclusion

This post provided a comprehensive guide on permission management in big data, particularly within the Amazon EKS platform using Apache Ranger, that equips you with the essential knowledge and tools for robust data security and management. By implementing the strategies and understanding the components detailed in this post, you can effectively manage permissions, implementing data security and compliance in your big data environments.


About the Authors


Yuzhu Xiao is a Senior Data Development Engineer at Amber Group with extensive experience in cloud data platform architecture. He has many years of experience in AWS Cloud platform data architecture and development, primarily focusing on efficiency optimization and cost control of enterprise cloud architectures.


Xin Zhang is an AWS Solutions Architect, responsible for solution consulting and design based on the AWS Cloud platform. He has a rich experience in R&D and architecture practice in the fields of system architecture, data warehousing, and real-time computing.

Run interactive workloads on Amazon EMR Serverless from Amazon EMR Studio

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/run-interactive-workloads-on-amazon-emr-serverless-from-amazon-emr-studio/

Starting from release 6.14, Amazon EMR Studio supports interactive analytics on Amazon EMR Serverless. You can now use EMR Serverless applications as the compute, in addition to Amazon EMR on EC2 clusters and Amazon EMR on EKS virtual clusters, to run JupyterLab notebooks from EMR Studio Workspaces.

EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug analytics applications written in PySpark, Python, and Scala. EMR Serverless is a serverless option for Amazon EMR that makes it straightforward to run open source big data analytics frameworks such as Apache Spark without configuring, managing, and scaling clusters or servers.

In the post, we demonstrate how to do the following:

  • Create an EMR Serverless endpoint for interactive applications
  • Attach the endpoint to an existing EMR Studio environment
  • Create a notebook and run an interactive application
  • Seamlessly diagnose interactive applications from within EMR Studio

Prerequisites

In a typical organization, an AWS account administrator will set up AWS resources such as AWS Identity and Access management (IAM) roles, Amazon Simple Storage Service (Amazon S3) buckets, and Amazon Virtual Private Cloud (Amazon VPC) resources for internet access and access to other resources in the VPC. They assign EMR Studio administrators who manage setting up EMR Studios and assigning users to a specific EMR Studio. Once they’re assigned, EMR Studio developers can use EMR Studio to develop and monitor workloads.

Make sure you set up resources like your S3 bucket, VPC subnets, and EMR Studio in the same AWS Region.

Complete the following steps to deploy these prerequisites:

  1. Launch the following AWS CloudFormation stack.
    Launch Cloudformation Stack
  2. Enter values for AdminPassword and DevPassword and make a note of the passwords you create.
  3. Choose Next.
  4. Keep the settings as default and choose Next again.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Submit.

We have also provided instructions to deploy these resources manually with sample IAM policies in the GitHub repo.

Set up EMR Studio and a serverless interactive application

After the AWS account administrator completes the prerequisites, the EMR Studio administrator can log in to the AWS Management Console to create an EMR Studio, Workspace, and EMR Serverless application.

Create an EMR Studio and Workspace

The EMR Studio administrator should log in to the console using the emrs-interactive-app-admin-user user credentials. If you deployed the prerequisite resources using the provided CloudFormation template, use the password that you provided as an input parameter.

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. Choose Get started.
  3. Select Create and launch EMR Studio.

This creates a Studio with the default name studio_1 and a Workspace with the default name My_First_Workspace. A new browser tab will open for the Studio_1 user interface.

Create and Launch EMR Studio

Create an EMR Serverless application

Complete the following steps to create an EMR Serverless application:

  1. On the EMR Studio console, choose Applications in the navigation pane.
  2. Create a new application.
  3. For Name, enter a name (for example, my-serverless-interactive-application).
  4. For Application setup options, select Use custom settings for interactive workloads.
    Create Serverless Application using custom settings

For interactive applications, as a best practice, we recommend keeping the driver and workers pre-initialized by configuring the pre-initialized capacity at the time of application creation. This effectively creates a warm pool of workers for an application and keeps the resources ready to be consumed, enabling the application to respond in seconds. For further best practices for creating EMR Serverless applications, see Define per-team resource limits for big data workloads using Amazon EMR Serverless.

  1. In the Interactive endpoint section, select Enable Interactive endpoint.
  2. In the Network connections section, choose the VPC, private subnets, and security group you created previously.

If you deployed the CloudFormation stack provided in this post, choose emr-serverless-sg­  as the security group.

A VPC is needed for the workload to be able to access the internet from within the EMR Serverless application in order to download external Python packages. The VPC also allows you to access resources such as Amazon Relational Database Service (Amazon RDS) and Amazon Redshift that are in the VPC from this application. Attaching a serverless application to a VPC can lead to IP exhaustion in the subnet, so make sure there are sufficient IP addresses in your subnet.

  1. Choose Create and start application.

Enable Interactive Endpoints, Choose private subnets and security group

On the applications page, you can verify that the status of your serverless application changes to Started.

  1. Select your application and choose How it works.
  2. Choose View and launch workspaces.
  3. Choose Configure studio.

  1. For Service role¸ provide the EMR Studio service role you created as a prerequisite (emr-studio-service-role).
  2. For Workspace storage, enter the path of the S3 bucket you created as a prerequisite (emrserverless-interactive-blog-<account-id>-<region-name>).
  3. Choose Save changes.

Choose emr-studio-service-role and emrserverless-interactive-blog s3 bucket

14.  Navigate to the Studios console by choosing Studios in the left navigation menu in the EMR Studio section. Note the Studio access URL from the Studios console and provide it to your developers to run their Spark applications.

Run your first Spark application

After the EMR Studio administrator has created the Studio, Workspace, and serverless application, the Studio user can use the Workspace and application to develop and monitor Spark workloads.

Launch the Workspace and attach the serverless application

Complete the following steps:

  1. Using the Studio URL provided by the EMR Studio administrator, log in using the emrs-interactive-app-dev-user user credentials shared by the AWS account admin.

If you deployed the prerequisite resources using the provided CloudFormation template, use the password that you provided as an input parameter.

On the Workspaces page, you can check the status of your Workspace. When the Workspace is launched, you will see the status change to Ready.

  1. Launch the workspace by choosing the workspace name (My_First_Workspace).

This will open a new tab. Make sure your browser allows pop-ups.

  1. In the Workspace, choose Compute (cluster icon) in the navigation pane.
  2. For EMR Serverless application, choose your application (my-serverless-interactive-application).
  3. For Interactive runtime role, choose an interactive runtime role (for this post, we use emr-serverless-runtime-role).
  4. Choose Attach to attach the serverless application as the compute type for all the notebooks in this Workspace.

Choose my-serverless-interactive-application as your app and emr-serverless-runtime-role and attach

Run your Spark application interactively

Complete the following steps:

  1. Choose the Notebook samples (three dots icon) in the navigation pane and open Getting-started-with-emr-serverless notebook.
  2. Choose Save to Workspace.

There are three choices of kernels for our notebook: Python 3, PySpark, and Spark (for Scala).

  1. When prompted, choose PySpark as the kernel.
  2. Choose Select.

Choose PySpark as kernel

Now you can run your Spark application. To do so, use the %%configure Sparkmagic command, which configures the session creation parameters. Interactive applications support Python virtual environments. We use a custom environment in the worker nodes by specifying a path for a different Python runtime for the executor environment using spark.executorEnv.PYSPARK_PYTHON. See the following code:

%%configure -f
{
  "conf": {
    "spark.pyspark.virtualenv.enabled": "true",
    "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv",
    "spark.pyspark.virtualenv.type": "native",
    "spark.pyspark.python": "/usr/bin/python3",
    "spark.executorEnv.PYSPARK_PYTHON": "/usr/bin/python3"
  }
}

Install external packages

Now that you have an independent virtual environment for the workers, EMR Studio notebooks allow you to install external packages from within the serverless application by using the Spark install_pypi_package function through the Spark context. Using this function makes the package available for all the EMR Serverless workers.

First, install matplotlib, a Python package, from PyPi:

sc.install_pypi_package("matplotlib")

If the preceding step doesn’t respond, check your VPC setup and make sure it is configured correctly for internet access.

Now you can use a dataset and visualize your data.

Create visualizations

To create visualizations, we use a public dataset on NYC yellow taxis:

file_name = "s3://athena-examples-us-east-1/notebooks/yellow_tripdata_2016-01.parquet"
taxi_df = (spark.read.format("parquet").option("header", "true") \
.option("inferSchema", "true").load(file_name))

In the preceding code block, you read the Parquet file from a public bucket in Amazon S3. The file has headers, and we want Spark to infer the schema. You then use a Spark dataframe to group and count specific columns from taxi_df:

taxi1_df = taxi_df.groupBy("VendorID", "passenger_count").count()
taxi1_df.show()

Use %%display magic to view the result in table format:

%%display
taxi1_df

Table shows vendor_id, passenger_count and count columns

You can also quickly visualize your data with five types of charts. You can choose the display type and the chart will change accordingly. In the following screenshot, we use a bar chart to visualize our data.

bar chart showing passenger_count against each vendor_id

Interact with EMR Serverless using Spark SQL

You can interact with tables in the AWS Glue Data Catalog using Spark SQL on EMR Serverless. In the sample notebook, we show how you can transform data using a Spark dataframe.

First, create a new temporary view called taxis. This allows you to use Spark SQL to select data from this view. Then create a taxi dataframe for further processing:

taxi_df.createOrReplaceTempView("taxis")
sqlDF = spark.sql(
    "SELECT DOLocationID, sum(total_amount) as sum_total_amount \
     FROM taxis where DOLocationID < 25 Group by DOLocationID ORDER BY DOLocationID"
)
sqlDF.show(5)

Table shows vendor_id, passenger_count and count columns

In each cell in your EMR Studio notebook, you can expand Spark Job Progress to view the various stages of the job submitted to EMR Serverless while running this specific cell. You can see the time taken to complete each stage. In the following example, stage 14 of the job has 12 completed tasks. In addition, if there is any failure, you can see the logs, making troubleshooting a seamless experience. We discuss this more in the next section.

Job[14]: showString at NativeMethodAccessorImpl.java:0 and Job[15]: showString at NativeMethodAccessorImpl.java:0

Use the following code to visualize the processed dataframe using the matplotlib package. You use the maptplotlib library to plot the dropoff location and the total amount as a bar chart.

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
plt.clf()
df = sqlDF.toPandas()
plt.bar(df.DOLocationID, df.sum_total_amount)
%matplot plt

Diagnose interactive applications

You can get the session information for your Livy endpoint using the %%info Sparkmagic. This gives you links to access the Spark UI as well as the driver log right in your notebook.

The following screenshot is a driver log snippet for our application, which we opened via the link in our notebook.

driver log screenshot

Similarly, you can choose the link below Spark UI to open the UI. The following screenshot shows the Executors tab, which provides access to the driver and executor logs.

The following screenshot shows stage 14, which corresponds to the Spark SQL step we saw earlier in which we calculated the location wise sum of total taxi collections, which had been broken down into 12 tasks. Through the Spark UI, the interactive application provides fine-grained task-level status, I/O, and shuffle details, as well as links to corresponding logs for each task for this stage right from your notebook, enabling a seamless troubleshooting experience.

Clean up

If you no longer want to keep the resources created in this post, complete the following cleanup steps:

  1. Delete the EMR Serverless application.
  2. Delete the EMR Studio and the associated workspaces and notebooks.
  3. To delete rest of the resources, navigate to CloudFormation console, select the stack, and choose Delete.

All of the resources will be deleted except the S3 bucket, which has its deletion policy set to retain.

Conclusion

The post showed how to run interactive PySpark workloads in EMR Studio using EMR Serverless as the compute. You can also build and monitor Spark applications in an interactive JupyterLab Workspace.

In an upcoming post, we’ll discuss additional capabilities of EMR Serverless Interactive applications, such as:

  • Working with resources such as Amazon RDS and Amazon Redshift in your VPC (for example, for JDBC/ODBC connectivity)
  • Running transactional workloads using serverless endpoints

If this is your first time exploring EMR Studio, we recommend checking out the Amazon EMR workshops and referring to Create an EMR Studio.


About the Authors

Sekar Srinivasan is a Principal Specialist Solutions Architect at AWS focused on Data Analytics and AI. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, focused on underprivileged Children’s education.

Disha Umarwani is a Sr. Data Architect with Amazon Professional Services within Global Health Care and LifeSciences. She has worked with customers to design, architect and implement Data Strategy at scale. She specializes in architecting Data Mesh architectures for Enterprise platforms.

Automate large-scale data validation using Amazon EMR and Apache Griffin

Post Syndicated from Dipal Mahajan original https://aws.amazon.com/blogs/big-data/automate-large-scale-data-validation-using-amazon-emr-and-apache-griffin/

Many enterprises are migrating their on-premises data stores to the AWS Cloud. During data migration, a key requirement is to validate all the data that has been moved from source to target. This data validation is a critical step, and if not done correctly, may result in the failure of the entire project. However, developing custom solutions to determine migration accuracy by comparing the data between the source and target can often be time-consuming.

In this post, we walk through a step-by-step process to validate large datasets after migration using a configuration-based tool using Amazon EMR and the Apache Griffin open source library. Griffin is an open source data quality solution for big data, which supports both batch and streaming mode.

In today’s data-driven landscape, where organizations deal with petabytes of data, the need for automated data validation frameworks has become increasingly critical. Manual validation processes are not only time-consuming but also prone to errors, especially when dealing with vast volumes of data. Automated data validation frameworks offer a streamlined solution by efficiently comparing large datasets, identifying discrepancies, and ensuring data accuracy at scale. With such frameworks, organizations can save valuable time and resources while maintaining confidence in the integrity of their data, thereby enabling informed decision-making and enhancing overall operational efficiency.

The following are standout features for this framework:

  • Utilizes a configuration-driven framework
  • Offers plug-and-play functionality for seamless integration
  • Conducts count comparison to identify any disparities
  • Implements robust data validation procedures
  • Ensures data quality through systematic checks
  • Provides access to a file containing mismatched records for in-depth analysis
  • Generates comprehensive reports for insights and tracking purposes

Solution overview

This solution uses the following services:

  • Amazon Simple Storage Service (Amazon S3) or Hadoop Distributed File System (HDFS) as the source and target.
  • Amazon EMR to run the PySpark script. We use a Python wrapper on top of Griffin to validate data between Hadoop tables created over HDFS or Amazon S3.
  • AWS Glue to catalog the technical table, which stores the results of the Griffin job.
  • Amazon Athena to query the output table to verify the results.

We use tables that store the count for each source and target table and also create files that show the difference of records between source and target.

The following diagram illustrates the solution architecture.

Architecture_Diagram

In the depicted architecture and our typical data lake use case, our data either resides n Amazon S3 or is migrated from on premises to Amazon S3 using replication tools such as AWS DataSync or AWS Database Migration Service (AWS DMS). Although this solution is designed to seamlessly interact with both Hive Metastore and the AWS Glue Data Catalog, we use the Data Catalog as our example in this post.

This framework operates within Amazon EMR, automatically running scheduled tasks on a daily basis, as per the defined frequency. It generates and publishes reports in Amazon S3, which are then accessible via Athena. A notable feature of this framework is its capability to detect count mismatches and data discrepancies, in addition to generating a file in Amazon S3 containing full records that didn’t match, facilitating further analysis.

In this example, we use three tables in an on-premises database to validate between source and target : balance_sheet, covid, and survery_financial_report.

Prerequisites

Before getting started, make sure you have the following prerequisites:

Deploy the solution

To make it straightforward for you to get started, we have created a CloudFormation template that automatically configures and deploys the solution for you. Complete the following steps:

  1. Create an S3 bucket in your AWS account called bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region} (provide your AWS account ID and AWS Region).
  2. Unzip the following file to your local system.
  3. After unzipping the file to your local system, change <bucket name> to the one you created in your account (bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}) in the following files:
    1. bootstrap-bdb-3070-datavalidation.sh
    2. Validation_Metrics_Athena_tables.hql
    3. datavalidation/totalcount/totalcount_input.txt
    4. datavalidation/accuracy/accuracy_input.txt
  4. Upload all the folders and files in your local folder to your S3 bucket:
    aws s3 cp . s3://<bucket_name>/ --recursive

  5. Run the following CloudFormation template in your account.

The CloudFormation template creates a database called griffin_datavalidation_blog and an AWS Glue crawler called griffin_data_validation_blog on top of the data folder in the .zip file.

  1. Choose Next.
    Cloudformation_template_1
  2. Choose Next again.
  3. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs
  1. Run the AWS Glue crawler and verify that six tables have been created in the Data Catalog.
  2. Run the following CloudFormation template in your account.

This template creates an EMR cluster with a bootstrap script to copy Griffin-related JARs and artifacts. It also runs three EMR steps:

  • Create two Athena tables and two Athena views to see the validation matrix produced by the Griffin framework
  • Run count validation for all three tables to compare the source and target table
  • Run record-level and column-level validations for all three tables to compare between the source and target table
  1. For SubnetID, enter your subnet ID.
  2. Choose Next.
    Cloudformation_template_2
  3. Choose Next again.
  4. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.

You can view the stack outputs on the console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs

It takes approximately 5 minutes for the deployment to complete. When the stack is complete, you should see the EMRCluster resource launched and available in your account.

When the EMR cluster is launched, it runs the following steps as part of the post-cluster launch:

  • Bootstrap action – It installs the Griffin JAR file and directories for this framework. It also downloads sample data files to use in the next step.
  • Athena_Table_Creation – It creates tables in Athena to read the result reports.
  • Count_Validation – It runs the job to compare the data count between source and target data from the Data Catalog table and stores the results in an S3 bucket, which will be read via an Athena table.
  • Accuracy – It runs the job to compare the data rows between the source and target data from the Data Catalog table and store the results in an S3 bucket, which will be read via the Athena table.

Athena_table

When the EMR steps are complete, your table comparison is done and ready to view in Athena automatically. No manual intervention is needed for validation.

Validate data with Python Griffin

When your EMR cluster is ready and all the jobs are complete, it means the count validation and data validation are complete. The results have been stored in Amazon S3 and the Athena table is already created on top of that. You can query the Athena tables to view the results, as shown in the following screenshot.

The following screenshot shows the count results for all tables.

Summary_table

The following screenshot shows the data accuracy results for all tables.

Detailed_view

The following screenshot shows the files created for each table with mismatched records. Individual folders are generated for each table directly from the job.

mismatched_records

Every table folder contains a directory for each day the job is run.

S3_path_mismatched

Within that specific date, a file named __missRecords contains records that do not match.

S3_path_mismatched_2

The following screenshot shows the contents of the __missRecords file.

__missRecords

Clean up

To avoid incurring additional charges, complete the following steps to clean up your resources when you’re done with the solution:

  1. Delete the AWS Glue database griffin_datavalidation_blog and drop the database griffin_datavalidation_blog cascade.
  2. Delete the prefixes and objects you created from the bucket bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}.
  3. Delete the CloudFormation stack, which removes your additional resources.

Conclusion

This post showed how you can use Python Griffin to accelerate the post-migration data validation process. Python Griffin helps you calculate count and row- and column-level validation, identifying mismatched records without writing any code.

For more information about data quality use cases, refer to Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog and AWS Glue Data Quality.


About the Authors

Dipal Mahajan serves as a Lead Consultant at Amazon Web Services, providing expert guidance to global clients in developing highly secure, scalable, reliable, and cost-efficient cloud applications. With a wealth of experience in software development, architecture, and analytics across diverse sectors such as finance, telecom, retail, and healthcare, he brings invaluable insights to his role. Beyond the professional sphere, Dipal enjoys exploring new destinations, having already visited 14 out of 30 countries on his wish list.

Akhil is a Lead Consultant at AWS Professional Services. He helps customers design & build scalable data analytics solutions and migrate data pipelines and data warehouses to AWS. In his spare time, he loves travelling, playing games and watching movies.

Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

How Amazon optimized its high-volume financial reconciliation process with Amazon EMR for higher scalability and performance

Post Syndicated from Jeeshan Khetrapal original https://aws.amazon.com/blogs/big-data/how-amazon-optimized-its-high-volume-financial-reconciliation-process-with-amazon-emr-for-higher-scalability-and-performance/

Account reconciliation is an important step to ensure the completeness and accuracy of financial statements. Specifically, companies must reconcile balance sheet accounts that could contain significant or material misstatements. Accountants go through each account in the general ledger of accounts and verify that the balance listed is complete and accurate. When discrepancies are found, accountants investigate and take appropriate corrective action.

As part of Amazon’s FinTech organization, we offer a software platform that empowers the internal accounting teams at Amazon to conduct account reconciliations. To optimize the reconciliation process, these users require high performance transformation with the ability to scale on demand, as well as the ability to process variable file sizes ranging from as low as a few MBs to more than 100 GB. It’s not always possible to fit data onto a single machine or process it with one single program in a reasonable time frame. This computation has to be done fast enough to provide practical services where programming logic and underlying details (data distribution, fault tolerance, and scheduling) can be separated.

We can achieve these simultaneous computations on multiple machines or threads of the same function across groups of elements of a dataset by using distributed data processing solutions. This encouraged us to reinvent our reconciliation service powered by AWS services, including Amazon EMR and the Apache Spark distributed processing framework, which uses PySpark. This service enables users to process files over 100 GB containing up to 100 million transactions in less than 30 minutes. The reconciliation service has become a powerhouse for data processing, and now users can seamlessly perform a variety of operations, such as Pivot, JOIN (like an Excel VLOOKUP operation), arithmetic operations, and more, providing a versatile and efficient solution for reconciling vast datasets. This enhancement is a testament to the scalability and speed achieved through the adoption of distributed data processing solutions.

In this post, we explain how we integrated Amazon EMR to build a highly available and scalable system that enabled us to run a high-volume financial reconciliation process.

Architecture before migration

The following diagram illustrates our previous architecture.

Our legacy service was built with Amazon Elastic Container Service (Amazon ECS) on AWS Fargate. We processed the data sequentially using Python. However, due to its lack of parallel processing capability, we frequently had to increase the cluster size vertically to support larger datasets. For context, 5 GB of data with 50 operations took around 3 hours to process. This service was configured to scale horizontally to five ECS instances that polled messages from Amazon Simple Queue Service (Amazon SQS), which fed the transformation requests. Each instance was configured with 4 vCPUs and 30 GB of memory to allow horizontal scaling. However, we couldn’t expand its capacity on performance because the process happened sequentially, picking chunks of data from Amazon Simple Storage Service (Amazon S3) for processing. For example, a VLOOKUP operation where two files are to be joined required both files to be read in memory chunk by chunk to obtain the output. This became an obstacle for users because they had to wait for long periods of time to process their datasets.

As part of our re-architecture and modernization, we wanted to achieve the following:

  • High availability – The data processing clusters should be highly available, providing three 9s of availability (99.9%)
  • Throughput – The service should handle 1,500 runs per day
  • Latency – It should be able to process 100 GB of data within 30 minutes
  • Heterogeneity – The cluster should be able to support a wide variety of workloads, with files ranging from a few MBs to hundreds of GBs
  • Query concurrency – The implementation demands the ability to support a minimum of 10 degrees of concurrency
  • Reliability of jobs and data consistency – Jobs need to run reliably and consistently to avoid breaking Service Level Agreements (SLAs)
  • Cost-effective and scalable – It must be scalable based on the workload, making it cost-effective
  • Security and compliance – Given the sensitivity of data, it must support fine-grained access control and appropriate security implementations
  • Monitoring – The solution must offer end-to-end monitoring of the clusters and jobs

Why Amazon EMR

Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open source frameworks such as Apache Spark, Apache Hive, and Presto. With these frameworks and related open-source projects, you can process data for analytics purposes and BI workloads. Amazon EMR lets you transform and move large amounts of data in and out of other AWS data stores and databases, such as Amazon S3 and Amazon DynamoDB.

A notable advantage of Amazon EMR lies in its effective use of parallel processing with PySpark, marking a significant improvement over traditional sequential Python code. This innovative approach streamlines the deployment and scaling of Apache Spark clusters, allowing for efficient parallelization on large datasets. The distributed computing infrastructure not only enhances performance, but also enables the processing of vast amounts of data at unprecedented speeds. Equipped with libraries, PySpark facilitates Excel-like operations on DataFrames, and the higher-level abstraction of DataFrames simplifies intricate data manipulations, reducing code complexity. Combined with automatic cluster provisioning, dynamic resource allocation, and integration with other AWS services, Amazon EMR proves to be a versatile solution suitable for diverse workloads, ranging from batch processing to ML. The inherent fault tolerance in PySpark and Amazon EMR promotes robustness, even in the event of node failures, making it a scalable, cost-effective, and high-performance choice for parallel data processing on AWS.

Amazon EMR extends its capabilities beyond the basics, offering a variety of deployment options to cater to diverse needs. Whether it’s Amazon EMR on EC2, Amazon EMR on EKS, Amazon EMR Serverless, or Amazon EMR on AWS Outposts, you can tailor your approach to specific requirements. For those seeking a serverless environment for Spark jobs, integrating AWS Glue is also a viable option. In addition to supporting various open-source frameworks, including Spark, Amazon EMR provides flexibility in choosing deployment modes, Amazon Elastic Compute Cloud (Amazon EC2) instance types, scaling mechanisms, and numerous cost-saving optimization techniques.

Amazon EMR stands as a dynamic force in the cloud, delivering unmatched capabilities for organizations seeking robust big data solutions. Its seamless integration, powerful features, and adaptability make it an indispensable tool for navigating the complexities of data analytics and ML on AWS.

Redesigned architecture

The following diagram illustrates our redesigned architecture.

The solution operates under an API contract, where clients can submit transformation configurations, defining the set of operations alongside the S3 dataset location for processing. The request is queued through Amazon SQS, then directed to Amazon EMR via a Lambda function. This process initiates the creation of an Amazon EMR step for Spark framework implementation on a dedicated EMR cluster. Although Amazon EMR accommodates an unlimited number of steps over a long-running cluster’s lifetime, only 256 steps can be running or pending simultaneously. For optimal parallelization, the step concurrency is set at 10, allowing 10 steps to run concurrently. In case of request failures, the Amazon SQS dead-letter queue (DLQ) retains the event. Spark processes the request, translating Excel-like operations into PySpark code for an efficient query plan. Resilient DataFrames store input, output, and intermediate data in-memory, optimizing processing speed, reducing disk I/O cost, enhancing workload performance, and delivering the final output to the specified Amazon S3 location.

We define our SLA in two dimensions: latency and throughput. Latency is defined as the amount of time taken to perform one job against a deterministic dataset size and the number of operations performed on the dataset. Throughput is defined as the maximum number of simultaneous jobs the service can perform without breaching the latency SLA of one job. The overall scalability SLA of the service depends on the balance of horizontal scaling of elastic compute resources and vertical scaling of individual servers.

Because we had to run 1,500 processes per day with minimal latency and high performance, we choose to integrate Amazon EMR on EC2 deployment mode with managed scaling enabled to support processing variable file sizes.

The EMR cluster configuration provides many different selections:

  • EMR node types – Primary, core, or task nodes
  • Instance purchasing options – On-Demand Instances, Reserved Instances, or Spot Instances
  • Configuration options – EMR instance fleet or uniform instance group
  • Scaling optionsAuto Scaling or Amazon EMR managed scaling

Based on our variable workload, we configured an EMR instance fleet (for best practices, see Reliability). We also decided to use Amazon EMR managed scaling to scale the core and task nodes (for scaling scenarios, refer to Node allocation scenarios). Lastly, we chose memory-optimized AWS Graviton instances, which provide up to 30% lower cost and up to 15% improved performance for Spark workloads.

The following code provides a snapshot of our cluster configuration:

Concurrent steps:10

EMR Managed Scaling:
minimumCapacityUnits: 64
maximumCapacityUnits: 512
maximumOnDemandCapacityUnits: 512
maximumCoreCapacityUnits: 512

Master Instance Fleet:
r6g.xlarge
- 4 vCore, 30.5 GiB memory, EBS only storage
- EBS Storage:250 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 1 units
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:250 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 1 units

Core Instance Fleet:
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 8 units
r6g.4xlarge
- 16 vCore, 122 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 16 units

Task Instances:
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 8 units
r6g.4xlarge
- 16 vCore, 122 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 16 units

Performance

With our migration to Amazon EMR, we were able to achieve a system performance capable of handling a variety of datasets, ranging from as low as 273 B to as high as 88.5 GB with a p99 of 491 seconds (approximately 8 minutes).

The following figure illustrates the variety of file sizes processed.

The following figure shows our latency.

To compare against sequential processing, we took two datasets containing 53 million records and ran a VLOOKUP operation against each other, along with 49 other Excel-like operations. This took 26 minutes to process in the new service, compared to 5 days to process in the legacy service. This improvement is almost 300 times greater over the previous architecture in terms of performance.

Considerations

Keep in mind the following when considering this solution:

  • Right-sizing clusters – Although Amazon EMR is resizable, it’s important to right-size the clusters. Right-sizing mitigates a slow cluster, if undersized, or higher costs, if the cluster is oversized. To anticipate these issues, you can calculate the number and type of nodes that will be needed for the workloads.
  • Parallel steps – Running steps in parallel allows you to run more advanced workloads, increase cluster resource utilization, and reduce the amount of time taken to complete your workload. The number of steps allowed to run at one time is configurable and can be set when a cluster is launched and any time after the cluster has started. You need to consider and optimize the CPU/memory usage per job when multiple jobs are running in a single shared cluster.
  • Job-based transient EMR clusters – If applicable, it is recommended to use a job-based transient EMR cluster, which delivers superior isolation, verifying that each task operates within its dedicated environment. This approach optimizes resource utilization, helps prevent interference between jobs, and enhances overall performance and reliability. The transient nature enables efficient scaling, providing a robust and isolated solution for diverse data processing needs.
  • EMR Serverless – EMR Serverless is the ideal choice if you prefer not to handle the management and operation of clusters. It allows you to effortlessly run applications using open-source frameworks available within EMR Serverless, offering a straightforward and hassle-free experience.
  • Amazon EMR on EKS – Amazon EMR on EKS offers distinct advantages, such as faster startup times and improved scalability resolving compute capacity challenges—which is particularly beneficial for Graviton and Spot Instance users. The inclusion of a broader range of compute types enhances cost-efficiency, allowing tailored resource allocation. Furthermore, Multi-AZ support provides increased availability. These compelling features provide a robust solution for managing big data workloads with improved performance, cost optimization, and reliability across various computing scenarios.

Conclusion

In this post, we explained how Amazon optimized its high-volume financial reconciliation process with Amazon EMR for higher scalability and performance. If you have a monolithic application that’s dependent on vertical scaling to process additional requests or datasets, then migrating it to a distributed processing framework such as Apache Spark and choosing a managed service such as Amazon EMR for compute may help reduce the runtime to lower your delivery SLA, and also may help reduce the Total Cost of Ownership (TCO).

As we embrace Amazon EMR for this particular use case, we encourage you to explore further possibilities in your data innovation journey. Consider evaluating AWS Glue, along with other dynamic Amazon EMR deployment options such as EMR Serverless or Amazon EMR on EKS, to discover the best AWS service tailored to your unique use case. The future of the data innovation journey holds exciting possibilities and advancements to be explored further.


About the Authors

Jeeshan Khetrapal is a Sr. Software Development Engineer at Amazon, where he develops fintech products based on cloud computing serverless architectures that are responsible for companies’ IT general controls, financial reporting, and controllership for governance, risk, and compliance.

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Improve healthcare services through patient 360: A zero-ETL approach to enable near real-time data analytics

Post Syndicated from Saeed Barghi original https://aws.amazon.com/blogs/big-data/improve-healthcare-services-through-patient-360-a-zero-etl-approach-to-enable-near-real-time-data-analytics/

Healthcare providers have an opportunity to improve the patient experience by collecting and analyzing broader and more diverse datasets. This includes patient medical history, allergies, immunizations, family disease history, and individuals’ lifestyle data such as workout habits. Having access to those datasets and forming a 360-degree view of patients allows healthcare providers such as claim analysts to see a broader context about each patient and personalize the care they provide for every individual. This is underpinned by building a complete patient profile that enables claim analysts to identify patterns, trends, potential gaps in care, and adherence to care plans. They can then use the result of their analysis to understand a patient’s health status, treatment history, and past or upcoming doctor consultations to make more informed decisions, streamline the claim management process, and improve operational outcomes. Achieving this will also improve general public health through better and more timely interventions, identify health risks through predictive analytics, and accelerate the research and development process.

AWS has invested in a zero-ETL (extract, transform, and load) future so that builders can focus more on creating value from data, instead of having to spend time preparing data for analysis. The solution proposed in this post follows a zero-ETL approach to data integration to facilitate near real-time analytics and deliver a more personalized patient experience. The solution uses AWS services such as AWS HealthLake, Amazon Redshift, Amazon Kinesis Data Streams, and AWS Lake Formation to build a 360 view of patients. These services enable you to collect and analyze data in near real time and put a comprehensive data governance framework in place that uses granular access control to secure sensitive data from unauthorized users.

Zero-ETL refers to a set of features on the AWS Cloud that enable integrating different data sources with Amazon Redshift:

Solution overview

Organizations in the healthcare industry are currently spending a significant amount of time and money on building complex ETL pipelines for data movement and integration. This means data will be replicated across multiple data stores via bespoke and in some cases hand-written ETL jobs, resulting in data inconsistency, latency, and potential security and privacy breaches.

With support for querying cross-account Apache Iceberg tables via Amazon Redshift, you can now build a more comprehensive patient-360 analysis by querying all patient data from one place. This means you can seamlessly combine information such as clinical data stored in HealthLake with data stored in operational databases such as a patient relationship management system, together with data produced from wearable devices in near real-time. Having access to all this data enables healthcare organizations to form a holistic view of patients, improve care coordination across multiple organizations, and provide highly personalized care for each individual.

The following diagram depicts the high-level solution we build to achieve these outcomes.

Deploy the solution

You can use the following AWS CloudFormation template to deploy the solution components:

This stack creates the following resources and necessary permissions to integrate the services:

AWS Solution setup

AWS HealthLake

AWS HealthLake enables organizations in the health industry to securely store, transform, transact, and analyze health data. It stores data in HL7 FHIR format, which is an interoperability standard designed for quick and efficient exchange of health data. When you create a HealthLake data store, a Fast Healthcare Interoperability Resources (FHIR) data repository is made available via a RESTful API endpoint. Simultaneously and as part of AWS HealthLake managed service, the nested JSON FHIR data undergoes an ETL process and is stored in Apache Iceberg open table format in Amazon S3.

To create an AWS HealthLake data store, refer to Getting started with AWS HealthLake. Make sure to select the option Preload sample data when creating your data store.

In real-world scenarios and when you use AWS HealthLake in production environments, you don’t need to load sample data into your AWS HealthLake data store. Instead, you can use FHIR REST API operations to manage and search resources in your AWS HealthLake data store.

We use two tables from the sample data stored in HealthLake: patient and allergyintolerance.

Query AWS HealthLake tables with Redshift Serverless

Amazon Redshift is the data warehousing service available on the AWS Cloud that provides up to six times better price-performance than any other cloud data warehouses in the market, with a fully managed, AI-powered, massively parallel processing (MPP) data warehouse built for performance, scale, and availability. With continuous innovations added to Amazon Redshift, it is now more than just a data warehouse. It enables organizations of different sizes and in different industries to access all the data they have in their AWS environments and analyze it from one single location with a set of features under the zero-ETL umbrella. Amazon Redshift integrates with AWS HealthLake and data lakes through Redshift Spectrum and Amazon S3 auto-copy features, enabling you to query data directly from files on Amazon S3.

Query AWS HealthLake data with Amazon Redshift

Amazon Redshift makes it straightforward to query the data stored in S3-based data lakes with automatic mounting of an AWS Glue Data Catalog in the Redshift query editor v2. This means you no longer have to create an external schema in Amazon Redshift to use the data lake tables cataloged in the Data Catalog. To get started with this feature, see Querying the AWS Glue Data Catalog. After it is set up and you’re connected to the Redshift query editor v2, complete the following steps:

  1. Validate that your tables are visible in the query editor V2. The Data Catalog objects are listed under the awsdatacatalog database.

FHIR data stored in AWS HealthLake is highly nested. To learn about how to un-nest semi-structured data with Amazon Redshift, see Tutorial: Querying nested data with Amazon Redshift Spectrum.

  1. Use the following query to un-nest the allergyintolerance and patient tables, join them together, and get patient details and their allergies:
    WITH patient_allergy AS 
    (
        SELECT
            resourcetype, 
            c AS allery_category,
            a."patient"."reference",
            SUBSTRING(a."patient"."reference", 9, LEN(a."patient"."reference")) AS patient_id,
            a.recordeddate AS allergy_record_date,
            NVL(cd."code", 'NA') AS allergy_code,
            NVL(cd.display, 'NA') AS allergy_description
    
        FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."allergyintolerance" a
                LEFT JOIN a.category c ON TRUE
                LEFT JOIN a.reaction r ON TRUE
                LEFT JOIN r.manifestation m ON TRUE
                LEFT JOIN m.coding cd ON TRUE
    ), patinet_info AS
    (
        SELECT id,
                gender,
                g as given_name,
                n.family as family_name,
                pr as prefix
    
        FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."patient" p
                LEFT JOIN p.name n ON TRUE
                LEFT JOIN n.given g ON TRUE
                LEFT JOIN n.prefix pr ON TRUE
    )
    SELECT DISTINCT p.id, 
            p.gender, 
            p.prefix,
            p.given_name,
            p.family_name,
            pa.allery_category,
            pa.allergy_code,
            pa.allergy_description
    from patient_allergy pa
        JOIN patinet_info p
            ON pa.patient_id = p.id
    ORDER BY p.id, pa.allergy_code
    ;
    

To eliminate the need for Amazon Redshift to un-nest data every time a query is run, you can create a materialized view to hold un-nested and flattened data. Materialized views are an effective mechanism to deal with complex and repeating queries. They contain a precomputed result set, based on a SQL query over one or more base tables. You can issue SELECT statements to query a materialized view, in the same way that you can query other tables or views in the database.

  1. Use the following SQL to create a materialized view. You use it later to build a complete view of patients:
    CREATE MATERIALIZED VIEW patient_allergy_info AUTO REFRESH YES AS
    WITH patient_allergy AS 
    (
        SELECT
            resourcetype, 
            c AS allery_category,
            a."patient"."reference",
            SUBSTRING(a."patient"."reference", 9, LEN(a."patient"."reference")) AS patient_id,
            a.recordeddate AS allergy_record_date,
            NVL(cd."code", 'NA') AS allergy_code,
            NVL(cd.display, 'NA') AS allergy_description
    
        FROM
            "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."allergyintolerance" a
                LEFT JOIN a.category c ON TRUE
                LEFT JOIN a.reaction r ON TRUE
                LEFT JOIN r.manifestation m ON TRUE
                LEFT JOIN m.coding cd ON TRUE
    ), patinet_info AS
    (
        SELECT id,
                gender,
                g as given_name,
                n.family as family_name,
                pr as prefix
    
        FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."patient" p
                LEFT JOIN p.name n ON TRUE
                LEFT JOIN n.given g ON TRUE
                LEFT JOIN n.prefix pr ON TRUE
    )
    SELECT DISTINCT p.id, 
            p.gender, 
            p.prefix,
            p.given_name,
            p.family_name,
            pa.allery_category,
            pa.allergy_code,
            pa.allergy_description
    from patient_allergy pa
        JOIN patinet_info p
            ON pa.patient_id = p.id
    ORDER BY p.id, pa.allergy_code
    ;
    

You have confirmed you can query data in AWS HealthLake via Amazon Redshift. Next, you set up zero-ETL integration between Amazon Redshift and Amazon Aurora MySQL.

Set up zero-ETL integration between Amazon Aurora MySQL and Redshift Serverless

Applications such as front-desk software, which are used to schedule appointments and register new patients, store data in OLTP databases such as Aurora. To get data out of OLTP databases and have them ready for analytics use cases, data teams might have to spend a considerable amount of time to build, test, and deploy ETL jobs that are complex to maintain and scale.

With the Amazon Redshift zero-ETL integration with Amazon Aurora MySQL, you can run analytics on the data stored in OLTP databases and combine them with the rest of the data in Amazon Redshift and AWS HealthLake in near real time. In the next steps in this section, we connect to a MySQL database and set up zero-ETL integration with Amazon Redshift.

Connect to an Aurora MySQL database and set up data

Connect to your Aurora MySQL database using your editor of choice using AdminUsername and AdminPassword that you entered when running the CloudFormation stack. (For simplicity, it is the same for Amazon Redshift and Aurora.)

When you’re connected to your database, complete the following steps:

  1. Create a new database by running the following command:
    CREATE DATABASE front_desk_app_db;

  2. Create a new table. This table simulates storing patient information as they visit clinics and other healthcare centers. For simplicity and to demonstrate specific capabilities, we assume that patient IDs are the same in AWS HealthLake and the front-of-office application. In real-world scenarios, this can be a hashed version of a national health care number:
    CREATE TABLE patient_appointment ( 
          patient_id varchar(250), 
          gender varchar(1), 
          date_of_birth date, 
          appointment_datetime datetime, 
          phone_number varchar(15), 
          PRIMARY KEY (patient_id, appointment_datetime) 
    );

Having a primary key in the table is mandatory for zero-ETL integration to work.

  1. Insert new records into the source table in the Aurora MySQL database. To demonstrate the required functionalities, make sure the patient_id of the sample records inserted into the MySQL database match the ones in AWS HealthLake. Replace [patient_id_1] and [patient_id_2] in the following query with the ones from the Redshift query you ran previously (the query that joined allergyintolerance and patient):
    INSERT INTO front_desk_app_db.patient_appointment (patient_id, gender, date_of_birth, appointment_datetime, phone_number)
    
    VALUES([PATIENT_ID_1], 'F', '1988-7-04', '2023-12-19 10:15:00', '0401401401'),
    ([PATIENT_ID_1], 'F', '1988-7-04', '2023-09-19 11:00:00', '0401401401'),
    ([PATIENT_ID_1], 'F', '1988-7-04', '2023-06-06 14:30:00', '0401401401'),
    ([PATIENT_ID_2], 'F', '1972-11-14', '2023-12-19 08:15:00', '0401401402'),
    ([PATIENT_ID_2], 'F', '1972-11-14', '2023-01-09 12:15:00', '0401401402');

Now that your source table is populated with sample records, you can set up zero-ETL and have data ingested into Amazon Redshift.

Set up zero-ETL integration between Amazon Aurora MySQL and Amazon Redshift

Complete the following steps to create your zero-ETL integration:

  1. On the Amazon RDS console, choose Databases in the navigation pane.
  2. Choose the DB identifier of your cluster (not the instance).
  3. On the Zero-ETL Integration tab, choose Create zero-ETL integration.
  4. Follow the steps to create your integration.

Create a Redshift database from the integration

Next, you create a target database from the integration. You can do this by running a couple of simple SQL commands on Amazon Redshift. Log in to the query editor V2 and run the following commands:

  1. Get the integration ID of the zero-ETL you set up between your source database and Amazon Redshift:
    SELECT * FROM svv_integration;

  2. Create a database using the integration ID:
    CREATE DATABASE ztl_demo FROM INTEGRATION '[INTEGRATION_ID ';

  3. Query the database and validate that a new table is created and populated with data from your source MySQL database:
    SELECT * FROM ztl_demo.front_desk_app_db.patient_appointment;

It might take a few seconds for the first set of records to appear in Amazon Redshift.

This shows that the integration is working as expected. To validate it further, you can insert a new record in your Aurora MySQL database, and it will be available in Amazon Redshift for querying in near real time within a few seconds.

Set up streaming ingestion for Amazon Redshift

Another aspect of zero-ETL on AWS, for real-time and streaming data, is realized through Amazon Redshift Streaming Ingestion. It provides low-latency, high-speed ingestion of streaming data from Kinesis Data Streams and Amazon MSK. It lowers the effort required to have data ready for analytics workloads, lowers the cost of running such workloads on the cloud, and decreases the operational burden of maintaining the solution.

In the context of healthcare, understanding an individual’s exercise and movement patterns can help with overall health assessment and better treatment planning. In this section, you send simulated data from wearable devices to Kinesis Data Streams and integrate it with the rest of the data you already have access to from your Redshift Serverless data warehouse.

For step-by-step instructions, refer to Real-time analytics with Amazon Redshift streaming ingestion. Note the following steps when you set up streaming ingestion for Amazon Redshift:

  1. Select wearables_stream and use the following template when sending data to Amazon Kinesis Data Streams via Kinesis Data Generator, to simulate data generated by wearable devices. Replace [PATIENT_ID_1] and [PATIENT_ID_2] with the patient IDs you earlier when inserting new records into your Aurora MySQL table:
    {
       "patient_id": "{{random.arrayElement(["[PATIENT_ID_1]"," [PATIENT_ID_2]"])}}",
       "steps_increment": "{{random.arrayElement(
          [0,1]
       )}}",
       "heart_rate": {{random.number( 
          {
             "min":45,
             "max":120}
       )}}
    }

  2. Create an external schema called from_kds by running the following query and replacing [IAM_ROLE_ARN] with the ARN of the role created by the CloudFormation stack (Patient360BlogRole):
    CREATE EXTERNAL SCHEMA from_kds
    FROM KINESIS
    IAM_ROLE '[IAM_ROLE_ARN]';

  3. Use the following SQL when creating a materialized view to consume data from the stream:
    CREATE MATERIALIZED VIEW patient_wearable_data AUTO REFRESH YES AS 
    SELECT approximate_arrival_timestamp, 
          JSON_PARSE(kinesis_data) as Data FROM from_kds."wearables_stream" 
    WHERE CAN_JSON_PARSE(kinesis_data);

  4. To validate that streaming ingestion works as expected, refresh the materialized view to get the data you already sent to the data stream and query the table to make sure data has landed in Amazon Redshift:
    REFRESH MATERIALIZED VIEW patient_wearable_data;
    
    SELECT *
    FROM patient_wearable_data
    ORDER BY approximate_arrival_timestamp DESC;

Query and analyze patient wearable data

The results in the data column of the preceding query are in JSON format. Amazon Redshift makes it straightforward to work with semi-structured data in JSON format. It uses PartiQL language to offer SQL-compatible access to relational, semi-structured, and nested data. Use the following query to flatten data:

SELECT data."patient_id"::varchar AS patient_id,       
      data."steps_increment"::integer as steps_increment,       
      data."heart_rate"::integer as heart_rate, 
      approximate_arrival_timestamp 
FROM patient_wearable_data 
ORDER BY approximate_arrival_timestamp DESC;

The result looks like the following screenshot.

Now that you know how to flatten JSON data, you can analyze it further. Use the following query to get the number of minutes a patient has been physically active per day, based on their heart rate (greater than 80):

WITH patient_wearble_flattened AS
(
   SELECT data."patient_id"::varchar AS patient_id,
      data."steps_increment"::integer as steps_increment,
      data."heart_rate"::integer as heart_rate,
      approximate_arrival_timestamp,
      DATE(approximate_arrival_timestamp) AS date_received,
      extract(hour from approximate_arrival_timestamp) AS    hour_received,
      extract(minute from approximate_arrival_timestamp) AS minute_received
   FROM patient_wearable_data
), patient_active_minutes AS
(
   SELECT patient_id,
      date_received,
      hour_received,
      minute_received,
      avg(heart_rate) AS heart_rate
   FROM patient_wearble_flattened
   GROUP BY patient_id,
      date_received,
      hour_received,
      minute_received
   HAVING avg(heart_rate) > 80
)
SELECT patient_id,
      date_received,
      COUNT(heart_rate) AS active_minutes_count
FROM patient_active_minutes
GROUP BY patient_id,
      date_received
ORDER BY patient_id,
      date_received;

Create a complete patient 360

Now that you are able to query all patient data with Redshift Serverless, you can combine the three datasets you used in this post and form a comprehensive patient 360 view with the following query:

WITH patient_appointment_info AS
(
      SELECT "patient_id",
         "gender",
         "date_of_birth",
         "appointment_datetime",
         "phone_number"
      FROM ztl_demo.front_desk_app_db.patient_appointment
),
patient_wearble_flattened AS
(
      SELECT data."patient_id"::varchar AS patient_id,
         data."steps_increment"::integer as steps_increment,
         data."heart_rate"::integer as heart_rate,
         approximate_arrival_timestamp,
         DATE(approximate_arrival_timestamp) AS date_received,
         extract(hour from approximate_arrival_timestamp) AS hour_received,
         extract(minute from approximate_arrival_timestamp) AS minute_received
      FROM patient_wearable_data
), patient_active_minutes AS
(
      SELECT patient_id,
         date_received,
         hour_received,
         minute_received,
         avg(heart_rate) AS heart_rate
      FROM patient_wearble_flattened
      GROUP BY patient_id,
         date_received,
         hour_received,
         minute_received
         HAVING avg(heart_rate) > 80
), patient_active_minutes_count AS
(
      SELECT patient_id,
         date_received,
         COUNT(heart_rate) AS active_minutes_count
      FROM patient_active_minutes
      GROUP BY patient_id,
         date_received
)
SELECT pai.patient_id,
      pai.gender,
      pai.prefix,
      pai.given_name,
      pai.family_name,
      pai.allery_category,
      pai.allergy_code,
      pai.allergy_description,
      ppi.date_of_birth,
      ppi.appointment_datetime,
      ppi.phone_number,
      pamc.date_received,
      pamc.active_minutes_count
FROM patient_allergy_info pai
      LEFT JOIN patient_active_minutes_count pamc
            ON pai.patient_id = pamc.patient_id
      LEFT JOIN patient_appointment_info ppi
            ON pai.patient_id = ppi.patient_id
GROUP BY pai.patient_id,
      pai.gender,
      pai.prefix,
      pai.given_name,
      pai.family_name,
      pai.allery_category,
      pai.allergy_code,
      pai.allergy_description,
      ppi.date_of_birth,
      ppi.appointment_datetime,
      ppi.phone_number,
      pamc.date_received,
      pamc.active_minutes_count
ORDER BY pai.patient_id,
      pai.gender,
      pai.prefix,
      pai.given_name,
      pai.family_name,
      pai.allery_category,
      pai.allergy_code,
      pai.allergy_description,
      ppi.date_of_birth DESC,
      ppi.appointment_datetime DESC,
      ppi.phone_number DESC,
      pamc.date_received,
      pamc.active_minutes_count

You can use the solution and queries used here to expand the datasets used in your analysis. For example, you can include other tables from AWS HealthLake as needed.

Clean up

To clean up resources you created, complete the following steps:

  1. Delete the zero-ETL integration between Amazon RDS and Amazon Redshift.
  2. Delete the CloudFormation stack.
  3. Delete AWS HealthLake data store

Conclusion

Forming a comprehensive 360 view of patients by integrating data from various different sources offers numerous benefits for organizations operating in the healthcare industry. It enables healthcare providers to gain a holistic understanding of a patient’s medical journey, enhances clinical decision-making, and allows for more accurate diagnosis and tailored treatment plans. With zero-ETL features for data integration on AWS, it is effortless to build a view of patients securely, cost-effectively, and with minimal effort.

You can then use visualization tools such as Amazon QuickSight to build dashboards or use Amazon Redshift ML to enable data analysts and database developers to train machine learning (ML) models with the data integrated through Amazon Redshift zero-ETL. The result is a set of ML models that are trained with a broader view into patients, their medical history, and their lifestyle, and therefore enable you make more accurate predictions about their upcoming health needs.


About the Authors

Saeed Barghi is a Sr. Analytics Specialist Solutions Architect specializing in architecting enterprise data platforms. He has extensive experience in the fields of data warehousing, data engineering, data lakes, and AI/ML. Based in Melbourne, Australia, Saeed works with public sector customers in Australia and New Zealand.

Satesh Sonti is a Sr. Analytics Specialist Solutions Architect based out of Atlanta, specialized in building enterprise data platforms, data warehousing, and analytics solutions. He has over 17 years of experience in building data assets and leading complex data platform programs for banking and insurance clients across the globe.

Create an end-to-end data strategy for Customer 360 on AWS

Post Syndicated from Ismail Makhlouf original https://aws.amazon.com/blogs/big-data/create-an-end-to-end-data-strategy-for-customer-360-on-aws/

Customer 360 (C360) provides a complete and unified view of a customer’s interactions and behavior across all touchpoints and channels. This view is used to identify patterns and trends in customer behavior, which can inform data-driven decisions to improve business outcomes. For example, you can use C360 to segment and create marketing campaigns that are more likely to resonate with specific groups of customers.

In 2022, AWS commissioned a study conducted by the American Productivity and Quality Center (APQC) to quantify the Business Value of Customer 360. The following figure shows some of the metrics derived from the study. Organizations using C360 achieved 43.9% reduction in sales cycle duration, 22.8% increase in customer lifetime value, 25.3% faster time to market, and 19.1% improvement in net promoter score (NPS) rating.

Without C360, businesses face missed opportunities, inaccurate reports, and disjointed customer experiences, leading to customer churn. However, building a C360 solution can be complicated. A Gartner Marketing survey found only 14% of organizations have successfully implemented a C360 solution, due to lack of consensus on what a 360-degree view means, challenges with data quality, and lack of cross-functional governance structure for customer data.

In this post, we discuss how you can use purpose-built AWS services to create an end-to-end data strategy for C360 to unify and govern customer data that address these challenges. We structure it in five pillars that power C360: data collection, unification, analytics, activation, and data governance, along with a solution architecture that you can use for your implementation.

The five pillars of a mature C360

When you embark on creating a C360, you work with multiple use cases, types of customer data, and users and applications that require different tools. Building a C360 on the right datasets, adding new datasets over time while maintaining the quality of data, and keeping it secure needs an end-to-end data strategy for your customer data. You also need to provide tools that make it straightforward for your teams to build products that mature your C360.

We recommend building your data strategy around five pillars of C360, as shown in the following figure. This starts with basic data collection, unifying and linking data from various channels related to unique customers, and progresses towards basic to advanced analytics for decision-making, and personalized engagement through various channels. As you mature in each of these pillars, you progress towards responding to real-time customer signals.

The following diagram illustrates the functional architecture that combines the building blocks of a Customer Data Platform on AWS with additional components used to design an end-to-end C360 solution. This is aligned to the five pillars we discuss in this post.

Pillar 1: Data collection

As you start building your customer data platform, you have to collect data from various systems and touchpoints, such as your sales systems, customer support, web and social media, and data marketplaces. Think of the data collection pillar as a combination of ingestion, storage, and processing capabilities.

Data ingestion

You have to build ingestion pipelines based on factors like types of data sources (on-premises data stores, files, SaaS applications, third-party data), and flow of data (unbounded streams or batch data). AWS provides different services for building data ingestion pipelines:

  • AWS Glue is a serverless data integration service that ingests data in batches from on-premises databases and data stores in the cloud. It connects to more than 70 data sources and helps you build extract, transform, and load (ETL) pipelines without having to manage pipeline infrastructure. AWS Glue Data Quality checks for and alerts on poor data, making it straightforward to spot and fix issues before they harm your business.
  • Amazon AppFlow ingests data from software as a service (SaaS) applications like Google Analytics, Salesforce, SAP, and Marketo, giving you the flexibility to ingest data from more than 50 SaaS applications.
  • AWS Data Exchange makes it straightforward to find, subscribe to, and use third-party data for analytics. You can subscribe to data products that help enrich customer profiles, for example demographics data, advertising data, and financial markets data.
  • Amazon Kinesis ingests streaming events in real time from point-of-sales systems, clickstream data from mobile apps and websites, and social media data. You could also consider using Amazon Managed Streaming for Apache Kafka (Amazon MSK) for streaming events in real time.

The following diagram illustrates the different pipelines to ingest data from various source systems using AWS services.

Data storage

Structured, semi-structured, or unstructured batch data is stored in an object storage because these are cost-efficient and durable. Amazon Simple Storage Service (Amazon S3) is a managed storage service with archiving features that can store petabytes of data with eleven 9’s of durability. Streaming data with low latency needs is stored in Amazon Kinesis Data Streams for real-time consumption. This allows immediate analytics and actions for various downstream consumers—as seen with Riot Games’ central Riot Event Bus.

Data processing

Raw data is often cluttered with duplicates and irregular formats. You need to process this to make it ready for analysis. If you are consuming batch data and streaming data, consider using a framework that can handle both. A pattern such as the Kappa architecture views everything as a stream, simplifying the processing pipelines. Consider using Amazon Managed Service for Apache Flink to handle the processing work. With Managed Service for Apache Flink, you can clean and transform the streaming data and direct it to the appropriate destination based on latency requirements. You can also implement batch data processing using Amazon EMR on open source frameworks such as Apache Spark at 3.5 times better performance than the self-managed version. The architecture decision of using a batch or streaming processing system will depend on various factors; however, if you want to enable real-time analytics on your customer data, we recommend using a Kappa architecture pattern.

Pillar 2: Unification

To link the diverse data arriving from various touchpoints to a unique customer, you need to build an identity processing solution that identifies anonymous logins, stores useful customer information, links them to external data for better insights, and groups customers in domains of interest. Although the identity processing solution helps build the unified customer profile, we recommend considering this as part of your data processing capabilities. The following diagram illustrates the components of such a solution.

The key components are as follows:

  • Identity resolution – Identity resolution is a deduplication solution, where records are matched to identify a unique customer and prospects by linking multiple identifiers such as cookies, device identifiers, IP addresses, email IDs, and internal enterprise IDs to a known person or anonymous profile using privacy-compliant methods. This can be achieved using AWS Entity Resolution, which enables using rules and machine learning (ML) techniques to match records and resolve identities. Alternatively, you can build identity graphs using Amazon Neptune for a single unified view of your customers.
  • Profile aggregation – When you’ve uniquely identified a customer, you can build applications in Managed Service for Apache Flink to consolidate all their metadata, from name to interaction history. Then, you transform this data into a concise format. Instead of showing every transaction detail, you can offer an aggregated spend value and a link to their Customer Relationship Management (CRM) record. For customer service interactions, provide an average CSAT score and a link to the call center system for a deeper dive into their communication history.
  • Profile enrichment – After you resolve a customer to a single identity, enhance their profile using various data sources. Enrichment typically involves adding demographic, behavioral, and geolocation data. You can use third-party data products from AWS Marketplace delivered through AWS Data Exchange to gain insights on income, consumption patterns, credit risk scores, and many more dimensions to further refine the customer experience.
  • Customer segmentation – After uniquely identifying and enriching a customer’s profile, you can segment them based on demographics like age, spend, income, and location using applications in Managed Service for Apache Flink. As you advance, you can incorporate AI services for more precise targeting techniques.

After you have done the identity processing and segmentation, you need a storage capability to store the unique customer profile and provide search and query capabilities on top of it for downstream consumers to use the enriched customer data.

The following diagram illustrates the unification pillar for a unified customer profile and single view of the customer for downstream applications.

Unified customer profile

Graph databases excel in modeling customer interactions and relationships, offering a comprehensive view of the customer journey. If you are dealing with billions of profiles and interactions, you can consider using Neptune, a managed graph database service on AWS. Organizations such as Zeta and Activision have successfully used Neptune to store and query billions of unique identifiers per month and millions of queries per second at millisecond response time.

Single customer view

Although graph databases provide in-depth insights, yet they can be complex for regular applications. It is prudent to consolidate this data into a single customer view, serving as a primary reference for downstream applications, ranging from ecommerce platforms to CRM systems. This consolidated view acts as a liaison between the data platform and customer-centric applications. For such purposes, we recommend using Amazon DynamoDB for its adaptability, scalability, and performance, resulting in an up-to-date and efficient customer database. This database will accept a lot of write queries back from the activation systems that learn new information about the customers and feed them back.

Pillar 3: Analytics

The analytics pillar defines capabilities that help you generate insights on top of your customer data. Your analytics strategy applies to the wider organizational needs, not just C360. You can use the same capabilities to serve financial reporting, measure operational performance, or even monetize data assets. Strategize based on how your teams explore data, run analyses, wrangle data for downstream requirements, and visualize data at different levels. Plan on how you can enable your teams to use ML to move from descriptive to prescriptive analytics.

The AWS modern data architecture shows a way to build a purpose-built, secure, and scalable data platform in the cloud. Learn from this to build querying capabilities across your data lake and the data warehouse.

The following diagram breaks down the analytics capability into data exploration, visualization, data warehousing, and data collaboration. Let’s find out what role each of these components play in the context of C360.

Data exploration

Data exploration helps unearth inconsistencies, outliers, or errors. By spotting these early on, your teams can have cleaner data integration for C360, which in turn leads to more accurate analytics and predictions. Consider the personas exploring the data, their technical skills, and the time to insight. For instance, data analysts who know to write SQL can directly query the data residing in Amazon S3 using Amazon Athena. Users interested in visual exploration can do so using AWS Glue DataBrew. Data scientists or engineers can use Amazon EMR Studio or Amazon SageMaker Studio to explore data from the notebook, and for a low-code experience, you can use Amazon SageMaker Data Wrangler. Because these services directly query S3 buckets, you can explore the data as it lands in the data lake, reducing time to insight.

Visualization

Turning complex datasets into intuitive visuals unravels hidden patterns in the data, and is crucial for C360 use cases. With this capability, you can design reports for different levels catering to varying needs: executive reports offering strategic overviews, management reports highlighting operational metrics, and detailed reports diving into the specifics. Such visual clarity helps your organization make informed decisions across all tiers, centralizing the customer’s perspective.

The following diagram shows a sample C360 dashboard built on Amazon QuickSight. QuickSight offers scalable, serverless visualization capabilities. You can benefit from its ML integrations for automated insights like forecasting and anomaly detection or natural language querying with Amazon Q in QuickSight, direct data connectivity from various sources, and pay-per-session pricing. With QuickSight, you can embed dashboards to external websites and applications, and the SPICE engine enables rapid, interactive data visualization at scale. The following screenshot shows an example C360 dashboard built on QuickSight.

Data warehouse

Data warehouses are efficient in consolidating structured data from multifarious sources and serving analytics queries from a large number of concurrent users. Data warehouses can provide a unified, consistent view of a vast amount of customer data for C360 use cases. Amazon Redshift addresses this need by adeptly handling large volumes of data and diverse workloads. It provides strong consistency across datasets, allowing organizations to derive reliable, comprehensive insights about their customers, which is essential for informed decision-making. Amazon Redshift offers real-time insights and predictive analytics capabilities for analyzing data from terabytes to petabytes. With Amazon Redshift ML, you can embed ML on top of the data stored in the data warehouse with minimum development overhead. Amazon Redshift Serverless simplifies application building and makes it straightforward for companies to embed rich data analytics capabilities.

Data collaboration

You can securely collaborate and analyze collective datasets from your partners without sharing or copying one another’s underlying data using AWS Clean Rooms. You can bring together disparate data from across engagement channels and partner datasets to form a 360-degree view of your customers. AWS Clean Rooms can enhance C360 by enabling use cases like cross-channel marketing optimization, advanced customer segmentation, and privacy-compliant personalization. By safely merging datasets, it offers richer insights and robust data privacy, meeting business needs and regulatory standards.

Pillar 4: Activation

The value of data diminishes the older it gets, leading to higher opportunity costs over time. In a survey conducted by Intersystems, 75% of the organizations surveyed believe untimely data inhibited business opportunities. In another survey, 58% of organizations (out of 560 respondents of HBR Advisory council and readers) stated they saw an increase in customer retention and loyalty using real-time customer analytics.

You can achieve a maturity in C360 when you build the ability to act on all the insights acquired from the previous pillars we discussed in real time. For example, at this maturity level, you can act on customer sentiment based on the context you automatically derived with an enriched customer profile and integrated channels. For this you need to implement prescriptive decision-making on how to address the customer’s sentiment. To do this at scale, you have to use AI/ML services for decision-making. The following diagram illustrates the architecture to activate insights using ML for prescriptive analytics and AI services for targeting and segmentation.

Use ML for the decision-making engine

With ML, you can improve the overall customer experience—you can create predictive customer behavior models, design hyper-personalized offers, and target the right customer with the right incentive. You can build them using Amazon SageMaker, which features a suite of managed services mapped to the data science lifecycle, including data wrangling, model training, model hosting, model inference, model drift detection, and feature storage. SageMaker enables you to build and operationalize your ML models, infusing them back into your applications to produce the right insight to the right person at the right time.

Amazon Personalize supports contextual recommendations, through which you can improve the relevance of recommendations by generating them within a context—for instance, device type, location, or time of day. Your team can get started without any prior ML experience using APIs to build sophisticated personalization capabilities in a few clicks. For more information, see Customize your recommendations by promoting specific items using business rules with Amazon Personalize.

Activate channels across marketing, advertising, direct-to-consumer, and loyalty

Now that you know who your customers are and who to reach out to, you can build solutions to run targeting campaigns at scale. With Amazon Pinpoint, you can personalize and segment communications to engage customers across multiple channels. For example, you can use Amazon Pinpoint to build engaging customer experiences through various communication channels like email, SMS, push notifications, and in-app notifications.

Pillar 5: Data governance

Establishing the right governance that balances control and access gives users trust and confidence in data. Imagine offering promotions on products that a customer doesn’t need, or bombarding the wrong customers with notifications. Poor data quality can lead to such situations, and ultimately results in customer churn. You have to build processes that validate data quality and take corrective actions. AWS Glue Data Quality can help you build solutions that validate the quality of data at rest and in transit, based on predefined rules.

To set up a cross-functional governance structure for customer data, you need a capability for governing and sharing data across your organization. With Amazon DataZone, admins and data stewards can manage and govern access to data, and consumers such as data engineers, data scientists, product managers, analysts, and other business users can discover, use, and collaborate with that data to drive insights. It streamlines data access, letting you find and use customer data, promotes team collaboration with shared data assets, and provides personalized analytics either via a web app or API on a portal. AWS Lake Formation makes sure data is accessed securely, guaranteeing the right people see the right data for the right reasons, which is crucial for effective cross-functional governance in any organization. Business metadata is stored and managed by Amazon DataZone, which is underpinned by technical metadata and schema information, which is registered in the AWS Glue Data Catalog. This technical metadata is also used both by other governance services such as Lake Formation and Amazon DataZone, and analytics services such as Amazon Redshift, Athena, and AWS Glue.

Bringing it all together

Using the following diagram as a reference, you can create projects and teams for building and operating different capabilities. For example, you can have a data integration team focus on the data collection pillar—you can then align functional roles, like data architects and data engineers. You can build your analytics and data science practices to focus on the analytics and activation pillars, respectively. Then you can create a specialized team for customer identity processing and for building the unified view of the customer. You can establish a data governance team with data stewards from different functions, security admins, and data governance policymakers to design and automate policies.

Conclusion

Building a robust C360 capability is fundamental for your organization to gain insights into your customer base. AWS Databases, Analytics, and AI/ML services can help streamline this process, providing scalability and efficiency. Following the five pillars to guide your thinking, you can build an end-to-end data strategy that defines the C360 view across the organization, makes sure data is accurate, and establishes cross-functional governance for customer data. You can categorize and prioritize the products and features you have to build within each pillar, select the right tool for the job, and build the skills you need in your teams.

Visit AWS for Data Customer Stories to learn how AWS is transforming customer journeys, from the world’s largest enterprises to growing startups.


About the Authors

Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily works with organizations in retail, ecommerce, FinTech, HealthTech, and travel to achieve their business objectives with well architected data platforms.

Sandipan Bhaumik (Sandi) is a Senior Analytics Specialist Solutions Architect at AWS. He helps customers modernize their data platforms in the cloud to perform analytics securely at scale, reduce operational overhead, and optimize usage for cost-effectiveness and sustainability.

Enrich your customer data with geospatial insights using Amazon Redshift, AWS Data Exchange, and Amazon QuickSight

Post Syndicated from Tony Stricker original https://aws.amazon.com/blogs/big-data/enrich-your-customer-data-with-geospatial-insights-using-amazon-redshift-aws-data-exchange-and-amazon-quicksight/

It always pays to know more about your customers, and AWS Data Exchange makes it straightforward to use publicly available census data to enrich your customer dataset.

The United States Census Bureau conducts the US census every 10 years and gathers household survey data. This data is anonymized, aggregated, and made available for public use. The smallest geographic area for which the Census Bureau collects and aggregates data are census blocks, which are formed by streets, roads, railroads, streams and other bodies of water, other visible physical and cultural features, and the legal boundaries shown on Census Bureau maps.

If you know the census block in which a customer lives, you are able to make general inferences about their demographic characteristics. With these new attributes, you are able to build a segmentation model to identify distinct groups of customers that you can target with personalized messaging. This data is available to subscribe to on AWS Data Exchange—and with data sharing, you don’t need to pay to store a copy of it in your account in order to query it.

In this post, we show how to use customer addresses to enrich a dataset with additional demographic details from the US Census Bureau dataset.

Solution overview

The solution includes the following high-level steps:

  1. Set up an Amazon Redshift Serverless endpoint and load customer data.
  2. Set up a place index in Amazon Location Service.
  3. Write an AWS Lambda user-defined function (UDF) to call Location Service from Amazon Redshift.
  4. Subscribe to census data on AWS Data Exchange.
  5. Use geospatial queries to tag addresses to census blocks.
  6. Create a new customer dataset in Amazon Redshift.
  7. Evaluate new customer data in Amazon QuickSight.

The following diagram illustrates the solution architecture.

architecture diagram

Prerequisites

You can use the following AWS CloudFormation template to deploy the required infrastructure. Before deployment, you need to sign up for QuickSight access through the AWS Management Console.

Load generic address data to Amazon Redshift

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. Redshift Serverless makes it straightforward to run analytics workloads of any size without having to manage data warehouse infrastructure.

To load our address data, we first create a Redshift Serverless workgroup. Then we use Amazon Redshift Query Editor v2 to load customer data from Amazon Simple Storage Service (Amazon S3).

Create a Redshift Serverless workgroup

There are two primary components of the Redshift Serverless architecture:

  • Namespace – A collection of database objects and users. Namespaces group together all of the resources you use in Redshift Serverless, such as schemas, tables, users, datashares, and snapshots.
  • Workgroup – A collection of compute resources. Workgroups have network and security settings that you can configure using the Redshift Serverless console, the AWS Command Line Interface (AWS CLI), or the Redshift Serverless APIs.

To create your namespace and workgroup, refer to Creating a data warehouse with Amazon Redshift Serverless. For this exercise, name your workgroup sandbox and your namespace adx-demo.

Use Query Editor v2 to load customer data from Amazon S3

You can use Query Editor v2 to submit queries and load data to your data warehouse through a web interface. To configure Query Editor v2 for your AWS account, refer to Data load made easy and secure in Amazon Redshift using Query Editor V2. After it’s configured, complete the following steps:

  • Use the following SQL to create the customer_data schema within the dev database in your data warehouse:
CREATE SCHEMA customer_data;
  • Use the following SQL DDL to create your target table into which you’ll load your customer address data:
CREATE TABLE customer_data.customer_addresses (
    address character varying(256) ENCODE lzo,
    unitnumber character varying(256) ENCODE lzo,
    municipality character varying(256) ENCODE lzo,
    region character varying(256) ENCODE lzo,
    postalcode character varying(256) ENCODE lzo,
    country character varying(256) ENCODE lzo,
    customer_id integer ENCODE az64
) DISTSTYLE AUTO;

The file has no column headers and is pipe delimited (|). For information on how to load data from either Amazon S3 or your local desktop, refer to Loading data into a database.

Use Location Service to geocode and enrich address data

Location Service lets you add location data and functionality to applications, which includes capabilities such as maps, points of interest, geocoding, routing, geofences, and tracking.

Our data is in Amazon Redshift, so we need to access the Location Service APIs using SQL statements. Each row of data contains an address that we want to enrich and geotag using the Location Service APIs. Amazon Redshift allows developers to create UDFs using a SQL SELECT clause, Python, or Lambda.

Lambda is a compute service that lets you run code without provisioning or managing servers. With Lambda UDFs, you can write custom functions with complex logic and integrate with third-party components. Scalar Lambda UDFs return one result per invocation of the function—in this case, the Lambda function runs one time for each row of data it receives.

For this post, we write a Lambda function that uses the Location Service API to geotag and validate our customer addresses. Then we register this Lambda function as a UDF with our Redshift instance, allowing us to call the function from a SQL command.

For instructions to create a Location Service place index and create your Lambda function and scalar UDF, refer to Access Amazon Location Service from Amazon Redshift. For this post, we use ESRI as a provider and name the place index placeindex.redshift.

Test your new function with the following code, which returns the coordinates of the White House in Washington, DC:

select public.f_geocode_address('1600 Pennsylvania Ave.','Washington','DC','20500','USA');

Subscribe to demographic data from AWS Data Exchange

AWS Data Exchange is a data marketplace with more than 3,500 products from over 300 providers delivered—through files, APIs, or Amazon Redshift queries—directly to the data lakes, applications, analytics, and machine learning models that use it.

First, we need to give our Redshift namespace permission via AWS Identity and Access Management (IAM) to access subscriptions on AWS Data Exchange. Then we can subscribe to our sample demographic data. Complete the following steps:

  1. On the IAM console, add the AWSDataExchangeSubscriberFullAccess managed policy to your Amazon Redshift commands access role you assigned when creating the namespace.
  2. On the AWS Data Exchange console, navigate to the dataset ACS – Sociodemographics (USA, Census Block Groups, 2019), provided by CARTO.
  3. Choose Continue to subscribe, then choose Subscribe.

The subscription may take a few minutes to configure.

  1. When your subscription is in place, navigate back to the Redshift Serverless console.
  2. In the navigation pane, choose Datashares.
  3. On the Subscriptions tab, choose the datashare that you just subscribed to.
  4. On the datashare details page, choose Create database from datashare.
  5. Choose the namespace you created earlier and provide a name for the new database that will hold the shared objects from the dataset you subscribed to.

In Query Editor v2, you should see the new database you just created and two new tables: one that holds the block group polygons and another that holds the demographic information for each block group.

Query Editor v2 data source explorer

Join geocoded customer data to census data with geospatial queries

There are two primary types of spatial data: raster and vector data. Raster data is represented as a grid of pixels and is beyond the scope of this post. Vector data is comprised of vertices, edges, and polygons. With geospatial data, vertices are represented as latitude and longitude points and edges are the connections between pairs of vertices. Think of the road connecting two intersections on a map. A polygon is a set of vertices with a series of connecting edges that form a continuous shape. A simple rectangle is a polygon, just as the state border of Ohio can be represented as a polygon. The geography_usa_blockgroup_2019 dataset that you subscribed to has 220,134 rows, each representing a single census block group and its geographic shape.

Amazon Redshift supports the storage and querying of vector-based spatial data with the GEOMETRY and GEOGRAPHY data types. You can use Redshift SQL functions to perform queries such as a point in polygon operation to determine if a given latitude/longitude point falls within the boundaries of a given polygon (such as state or county boundary). In this dataset, you can observe that the geom column in geography_usa_blockgroup_2019 is of type GEOMETRY.

Our goal is to determine which census block (polygon) each of our geotagged addresses falls within so we can enrich our customer records with details that we know about the census block. Complete the following steps:

  • Build a new table with the geocoding results from our UDF:
CREATE TABLE customer_data.customer_addresses_geocoded AS 
select address
    ,unitnumber
    ,municipality
    ,region
    ,postalcode
    ,country
    ,customer_id
    ,public.f_geocode_address(address||' '||unitnumber,municipality,region,postalcode,country) as geocode_result
FROM customer_data.customer_addresses;
  • Use the following code to extract the different address fields and latitude/longitude coordinates from the JSON column and create a new table with the results:
CREATE TABLE customer_data.customer_addresses_points AS
SELECT customer_id
    ,geo_address
    address
    ,unitnumber
    ,municipality
    ,region
    ,postalcode
    ,country
    ,longitude
    ,latitude
    ,ST_SetSRID(ST_MakePoint(Longitude, Latitude),4326) as address_point
            --create new geom column of type POINT, set new point SRID = 4326
FROM
(
select customer_id
    ,address
    ,unitnumber
    ,municipality
    ,region
    ,postalcode
    ,country
    ,cast(json_extract_path_text(geocode_result, 'Label', true) as VARCHAR) as geo_address
    ,cast(json_extract_path_text(geocode_result, 'Longitude', true) as float) as longitude
    ,cast(json_extract_path_text(geocode_result, 'Latitude', true) as float) as latitude
        --use json function to extract fields from geocode_result
from customer_data.customer_addresses_geocoded) a;

This code uses the ST_POINT function to create a new column from the latitude/longitude coordinates called address_point of type GEOMETRY and subtype POINT.   It uses the ST_SetSRID geospatial function to set the spatial reference identifier (SRID) of the new column to 4326.

The SRID defines the spatial reference system to be used when evaluating the geometry data. It’s important when joining or comparing geospatial data that they have matching SRIDs. You can check the SRID of an existing geometry column by using the ST_SRID function. For more information on SRIDs and GEOMETRY data types, refer to Querying spatial data in Amazon Redshift.

  • Now that your customer addresses are geocoded as latitude/longitude points in a geometry column, you can use a join to identify which census block shape your new point falls within:
CREATE TABLE customer_data.customer_addresses_with_census AS
select c.*
    ,shapes.geoid as census_group_shape
    ,demo.*
from customer_data.customer_addresses_points c
inner join "carto_census_data"."carto".geography_usa_blockgroup_2019 shapes
on ST_Contains(shapes.geom, c.address_point)
    --join tables where the address point falls within the census block geometry
inner join carto_census_data.usa_acs.demographics_sociodemographics_usa_blockgroup_2019_yearly_2019 demo
on demo.geoid = shapes.geoid;

The preceding code creates a new table called customer_addresses_with_census, which joins the customer addresses to the census block in which they belong as well as the demographic data associated with that census block.

To do this, you used the ST_CONTAINS function, which accepts two geometry data types as an input and returns TRUE if the 2D projection of the first input geometry contains the second input geometry. In our case, we have census blocks represented as polygons and addresses represented as points. The join in the SQL statement succeeds when the point falls within the boundaries of the polygon.

Visualize the new demographic data with QuickSight

QuickSight is a cloud-scale business intelligence (BI) service that you can use to deliver easy-to-understand insights to the people who you work with, wherever they are. QuickSight connects to your data in the cloud and combines data from many different sources.

First, let’s build some new calculated fields that will help us better understand the demographics of our customer base. We can do this in QuickSight, or we can use SQL to build the columns in a Redshift view. The following is the code for a Redshift view:

CREATE VIEW customer_data.customer_features AS (
SELECT customer_id 
    ,postalcode
    ,region
    ,municipality
    ,geoid as census_geoid
    ,longitude
    ,latitude
    ,total_pop
    ,median_age
    ,white_pop/total_pop as perc_white
    ,black_pop/total_pop as perc_black
    ,asian_pop/total_pop as perc_asian
    ,hispanic_pop/total_pop as perc_hispanic
    ,amerindian_pop/total_pop as perc_amerindian
    ,median_income
    ,income_per_capita
    ,median_rent
    ,percent_income_spent_on_rent
    ,unemployed_pop/coalesce(pop_in_labor_force) as perc_unemployment
    ,(associates_degree + bachelors_degree + masters_degree + doctorate_degree)/total_pop as perc_college_ed
    ,(household_language_total - household_language_english)/coalesce(household_language_total) as perc_other_than_english
FROM "dev"."customer_data"."customer_addresses_with_census" t );

To get QuickSight to talk to our Redshift Serverless endpoint, complete the following steps:

Now you can create a new dataset in QuickSight.

  • On the QuickSight console, choose Datasets in the navigation pane.
  • Choose New dataset.

create a new dataset in quicksight

  • We want to create a dataset from a new data source and use the Redshift: Manual connect option.

Redshift manual connection

  • Provide the connection information for your Redshift Serverless workgroup.

You will need the endpoint for our workgroup and the user name and password that you created when you set up your workgroup. You can find your workgroup’s endpoint on the Redshift Serverless console by navigating to your workgroup configuration. The following screenshot is an example of the connection settings needed. Notice the connection type is the name of the VPC connection that you previously configured in QuickSight. When you copy the endpoint from the Redshift console, be sure to remove the database and port number from the end of the URL before entering it in the field.

Redshift edit data source

  • Save the new data source configuration.

You’ll be prompted to choose the table you want to use for your dataset.

  • Choose the new view that you created that has your new derived fields.

Quicksight choose your table

  • Select Directly query your data.

This will connect your visualizations directly to the data in the database rather than ingesting data into the QuickSight in-memory data store.

Directly query your data

  • To create a histogram of median income level, choose the blank visual on Sheet1 and then choose the histogram visual icon under Visual types.
  • Choose median_income under Fields list and drag it to the Value field well.

This builds a histogram showing the distribution of median_income for our customers based on the census block group in which they live.

QuickSight histogram

Conclusion

In this post, we demonstrated how companies can use open census data available on AWS Data Exchange to effortlessly gain a high-level understanding of their customer base from a demographic standpoint. This basic understanding of customers based on where they live can serve as the foundation for more targeted marketing campaigns and even influence product development and service offerings.

As always, AWS welcomes your feedback. Please leave your thoughts and questions in the comments section.


About the Author

Tony Stricker is a Principal Technologist on the Data Strategy team at AWS, where he helps senior executives adopt a data-driven mindset and align their people/process/technology in ways that foster innovation and drive towards specific, tangible business outcomes. He has a background as a data warehouse architect and data scientist and has delivered solutions in to production across multiple industries including oil and gas, financial services, public sector, and manufacturing. In his spare time, Tony likes to hang out with his dog and cat, work on home improvement projects, and restore vintage Airstream campers.

How the GoDaddy data platform achieved over 60% cost reduction and 50% performance boost by adopting Amazon EMR Serverless

Post Syndicated from Brandon Abear original https://aws.amazon.com/blogs/big-data/how-the-godaddy-data-platform-achieved-over-60-cost-reduction-and-50-performance-boost-by-adopting-amazon-emr-serverless/

This is a guest post co-written with Brandon Abear, Dinesh Sharma, John Bush, and Ozcan IIikhan from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

At GoDaddy, we take pride in being a data-driven company. Our relentless pursuit of valuable insights from data fuels our business decisions and ensures customer satisfaction. Our commitment to efficiency is unwavering, and we’ve undertaken an exciting initiative to optimize our batch processing jobs. In this journey, we have identified a structured approach that we refer to as the seven layers of improvement opportunities. This methodology has become our guide in the pursuit of efficiency.

In this post, we discuss how we enhanced operational efficiency with Amazon EMR Serverless. We share our benchmarking results and methodology, and insights into the cost-effectiveness of EMR Serverless vs. fixed capacity Amazon EMR on EC2 transient clusters on our data workflows orchestrated using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). We share our strategy for the adoption of EMR Serverless in areas where it excels. Our findings reveal significant benefits, including over 60% cost reduction, 50% faster Spark workloads, a remarkable five-times improvement in development and testing speed, and a significant reduction in our carbon footprint.

Background

In late 2020, GoDaddy’s data platform initiated its AWS Cloud journey, migrating an 800-node Hadoop cluster with 2.5 PB of data from its data center to EMR on EC2. This lift-and-shift approach facilitated a direct comparison between on-premises and cloud environments, ensuring a smooth transition to AWS pipelines, minimizing data validation issues and migration delays.

By early 2022, we successfully migrated our big data workloads to EMR on EC2. Using best practices learned from the AWS FinHack program, we fine-tuned resource-intensive jobs, converted Pig and Hive jobs to Spark, and reduced our batch workload spend by 22.75% in 2022. However, scalability challenges emerged due to the multitude of jobs. This prompted GoDaddy to embark on a systematic optimization journey, establishing a foundation for more sustainable and efficient big data processing.

Seven layers of improvement opportunities

In our quest for operational efficiency, we have identified seven distinct layers of opportunities for optimization within our batch processing jobs, as shown in the following figure. These layers range from precise code-level enhancements to more comprehensive platform improvements. This multi-layered approach has become our strategic blueprint in the ongoing pursuit of better performance and higher efficiency.

Seven layers of improvement opportunities

The layers are as follows:

  • Code optimization – Focuses on refining the code logic and how it can be optimized for better performance. This involves performance enhancements through selective caching, partition and projection pruning, join optimizations, and other job-specific tuning. Using AI coding solutions is also an integral part of this process.
  • Software updates – Updating to the latest versions of open source software (OSS) to capitalize on new features and improvements. For example, Adaptive Query Execution in Spark 3 brings significant performance and cost improvements.
  • Custom Spark configurations Tuning of custom Spark configurations to maximize resource utilization, memory, and parallelism. We can achieve significant improvements by right-sizing tasks, such as through spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes, spark.executor.cores, and spark.executor.memory. However, these custom configurations might be counterproductive if they are not compatible with the specific Spark version.
  • Resource provisioning time The time it takes to launch resources like ephemeral EMR clusters on Amazon Elastic Compute Cloud (Amazon EC2). Although some factors influencing this time are outside of an engineer’s control, identifying and addressing the factors that can be optimized can help reduce overall provisioning time.
  • Fine-grained scaling at task level Dynamically adjusting resources such as CPU, memory, disk, and network bandwidth based on each stage’s needs within a task. The aim here is to avoid fixed cluster sizes that could result in resource waste.
  • Fine-grained scaling across multiple tasks in a workflow Given that each task has unique resource requirements, maintaining a fixed resource size may result in under- or over-provisioning for certain tasks within the same workflow. Traditionally, the size of the largest task determines the cluster size for a multi-task workflow. However, dynamically adjusting resources across multiple tasks and steps within a workflow result in a more cost-effective implementation.
  • Platform-level enhancements – Enhancements at preceding layers can only optimize a given job or a workflow. Platform improvement aims to attain efficiency at the company level. We can achieve this through various means, such as updating or upgrading the core infrastructure, introducing new frameworks, allocating appropriate resources for each job profile, balancing service usage, optimizing the use of Savings Plans and Spot Instances, or implementing other comprehensive changes to boost efficiency across all tasks and workflows.

Layers 1–3: Previous cost reductions

After we migrated from on premises to AWS Cloud, we primarily focused our cost-optimization efforts on the first three layers shown in the diagram. By transitioning our most costly legacy Pig and Hive pipelines to Spark and optimizing Spark configurations for Amazon EMR, we achieved significant cost savings.

For example, a legacy Pig job took 10 hours to complete and ranked among the top 10 most expensive EMR jobs. Upon reviewing TEZ logs and cluster metrics, we discovered that the cluster was vastly over-provisioned for the data volume being processed and remained under-utilized for most of the runtime. Transitioning from Pig to Spark was more efficient. Although no automated tools were available for the conversion, manual optimizations were made, including:

  • Reduced unnecessary disk writes, saving serialization and deserialization time (Layer 1)
  • Replaced Airflow task parallelization with Spark, simplifying the Airflow DAG (Layer 1)
  • Eliminated redundant Spark transformations (Layer 1)
  • Upgraded from Spark 2 to 3, using Adaptive Query Execution (Layer 2)
  • Addressed skewed joins and optimized smaller dimension tables (Layer 3)

As a result, job cost decreased by 95%, and job completion time was reduced to 1 hour. However, this approach was labor-intensive and not scalable for numerous jobs.

Layers 4–6: Find and adopt the right compute solution

In late 2022, following our significant accomplishments in optimization at the previous levels, our attention moved towards enhancing the remaining layers.

Understanding the state of our batch processing

We use Amazon MWAA to orchestrate our data workflows in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. In this post, the terms workflow and job are used interchangeably, referring to the Directed Acyclic Graphs (DAGs) consisting of tasks orchestrated by Amazon MWAA. For each workflow, we have sequential or parallel tasks, and even a combination of both in the DAG between create_emr and terminate_emr tasks running on a transient EMR cluster with fixed compute capacity throughout the workflow run. Even after optimizing a portion of our workload, we still had numerous non-optimized workflows that were under-utilized due to over-provisioning of compute resources based on the most resource-intensive task in the workflow, as shown in the following figure.

This highlighted the impracticality of static resource allocation and led us to recognize the necessity of a dynamic resource allocation (DRA) system. Before proposing a solution, we gathered extensive data to thoroughly understand our batch processing. Analyzing the cluster step time, excluding provisioning and idle time, revealed significant insights: a right-skewed distribution with over half of the workflows completing in 20 minutes or less and only 10% taking more than 60 minutes. This distribution guided our choice of a fast-provisioning compute solution, dramatically reducing workflow runtimes. The following diagram illustrates step times (excluding provisioning and idle time) of EMR on EC2 transient clusters in one of our batch processing accounts.

Furthermore, based on the step time (excluding provisioning and idle time) distribution of the workflows, we categorized our workflows into three groups:

  • Quick run – Lasting 20 minutes or less
  • Medium run – Lasting between 20–60 minutes
  • Long run – Exceeding 60 minutes, often spanning several hours or more

Another factor we needed to consider was the extensive use of transient clusters for reasons such as security, job and cost isolation, and purpose-built clusters. Additionally, there was a significant variation in resource needs between peak hours and periods of low utilization.

Instead of fixed-size clusters, we could potentially use managed scaling on EMR on EC2 to achieve some cost benefits. However, migrating to EMR Serverless appears to be a more strategic direction for our data platform. In addition to potential cost benefits, EMR Serverless offers additional advantages such as a one-click upgrade to the newest Amazon EMR versions, a simplified operational and debugging experience, and automatic upgrades to the latest generations upon rollout. These features collectively simplify the process of operating a platform on a larger scale.

Evaluating EMR Serverless: A case study at GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

Recognizing the potential of EMR Serverless, we conducted an in-depth benchmark study using real production workflows. The study aimed to assess EMR Serverless performance and efficiency while also creating an adoption plan for large-scale implementation. The findings were highly encouraging, showing EMR Serverless can effectively handle our workloads.

Benchmarking methodology

We split our data workflows into three categories based on total step time (excluding provisioning and idle time): quick run (0–20 minutes), medium run (20–60 minutes), and long run (over 60 minutes). We analyzed the impact of the EMR deployment type (Amazon EC2 vs. EMR Serverless) on two key metrics: cost-efficiency and total runtime speedup, which served as our overall evaluation criteria. Although we did not formally measure ease of use and resiliency, these factors were considered throughout the evaluation process.

The high-level steps to assess the environment are as follows:

  1. Prepare the data and environment:
    1. Choose three to five random production jobs from each job category.
    2. Implement required adjustments to prevent interference with production.
  2. Run tests:
    1. Run scripts over several days or through multiple iterations to gather precise and consistent data points.
    2. Perform tests using EMR on EC2 and EMR Serverless.
  3. Validate data and test runs:
    1. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  4. Gather metrics and analyze results:
    1. Gather relevant metrics from the tests.
    2. Analyze results to draw insights and conclusions.

Benchmark results

Our benchmark results showed significant enhancements across all three job categories for both runtime speedup and cost-efficiency. The improvements were most pronounced for quick jobs, directly resulting from faster startup times. For instance, a 20-minute (including cluster provisioning and shut down) data workflow running on an EMR on EC2 transient cluster of fixed compute capacity finishes in 10 minutes on EMR Serverless, providing a shorter runtime with cost benefits. Overall, the shift to EMR Serverless delivered substantial performance improvements and cost reductions at scale across job brackets, as seen in the following figure.

Historically, we devoted more time to tuning our long-run workflows. Interestingly, we discovered that the existing custom Spark configurations for these jobs did not always translate well to EMR Serverless. In cases where the results were insignificant, a common approach was to discard previous Spark configurations related to executor cores. By allowing EMR Serverless to autonomously manage these Spark configurations, we often observed improved outcomes. The following graph shows the average runtime and cost improvement per job when comparing EMR Serverless to EMR on EC2.

Per Job Improvement

The following table shows a sample comparison of results for the same workflow running on different deployment options of Amazon EMR (EMR on EC2 and EMR Serverless).

Metric EMR on EC2
(Average)
EMR Serverless
(Average)
EMR on EC2 vs
EMR Serverless
Total Run Cost ($) $ 5.82 $ 2.60 55%
Total Run Time (Minutes) 53.40 39.40 26%
Provisioning Time (Minutes) 10.20 0.05 .
Provisioning Cost ($) $ 1.19 . .
Steps Time (Minutes) 38.20 39.16 -3%
Steps Cost ($) $ 4.30 . .
Idle Time (Minutes) 4.80 . .
EMR Release Label emr-6.9.0 .
Hadoop Distribution Amazon 3.3.3 .
Spark Version Spark 3.3.0 .
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3 .
Job Type Spark .

AWS Graviton2 on EMR Serverless performance evaluation

After seeing compelling results with EMR Serverless for our workloads, we decided to further analyze the performance of the AWS Graviton2 (arm64) architecture within EMR Serverless. AWS had benchmarked Spark workloads on Graviton2 EMR Serverless using the TPC-DS 3TB scale, showing a 27% overall price-performance improvement.

To better understand the integration benefits, we ran our own study using GoDaddy’s production workloads on a daily schedule and observed an impressive 23.8% price-performance enhancement across a range of jobs when using Graviton2. For more details about this study, see GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless.

Adoption strategy for EMR Serverless

We strategically implemented a phased rollout of EMR Serverless via deployment rings, enabling systematic integration. This gradual approach let us validate improvements and halt further adoption of EMR Serverless, if needed. It served both as a safety net to catch issues early and a means to refine our infrastructure. The process mitigated change impact through smooth operations while building team expertise of our Data Engineering and DevOps teams. Additionally, it fostered tight feedback loops, allowing prompt adjustments and ensuring efficient EMR Serverless integration.

We divided our workflows into three main adoption groups, as shown in the following image:

  • Canaries This group aids in detecting and resolving any potential problems early in the deployment stage.
  • Early adopters This is the second batch of workflows that adopt the new compute solution after initial issues have been identified and rectified by the canaries group.
  • Broad deployment rings The largest group of rings, this group represents the wide-scale deployment of the solution. These are deployed after successful testing and implementation in the previous two groups.

Rings

We further broke down these workflows into granular deployment rings to adopt EMR Serverless, as shown in the following table.

Ring # Name Details
Ring 0 Canary Low adoption risk jobs that are expected to yield some cost saving benefits.
Ring 1 Early Adopters Low risk Quick-run Spark jobs that expect to yield high gains.
Ring 2 Quick-run Rest of the Quick-run (step_time <= 20 min) Spark jobs
Ring 3 LargerJobs_EZ High potential gain, easy move, medium-run and long-run Spark jobs
Ring 4 LargerJobs Rest of the medium-run and long-run Spark jobs with potential gains
Ring 5 Hive Hive jobs with potentially higher cost savings
Ring 6 Redshift_EZ Easy migration Redshift jobs that suit EMR Serverless
Ring 7 Glue_EZ Easy migration Glue jobs that suit EMR Serverless

Production adoption results summary

The encouraging benchmarking and canary adoption results generated considerable interest in wider EMR Serverless adoption at GoDaddy. To date, the EMR Serverless rollout remains underway. Thus far, it has reduced costs by 62.5% and accelerated total batch workflow completion by 50.4%.

Based on preliminary benchmarks, our team expected substantial gains for quick jobs. To our surprise, actual production deployments surpassed projections, averaging 64.4% faster vs. 42% projected, and 71.8% cheaper vs. 40% predicted.

Remarkably, long-running jobs also saw significant performance improvements due to the rapid provisioning of EMR Serverless and aggressive scaling enabled by dynamic resource allocation. We observed substantial parallelization during high-resource segments, resulting in a 40.5% faster total runtime compared to traditional approaches. The following chart illustrates the average enhancements per job category.

Prod Jobs Savings

Additionally, we observed the highest degree of dispersion for speed improvements within the long-run job category, as shown in the following box-and-whisker plot.

Whisker Plot

Sample workflows adopted EMR Serverless

For a large workflow migrated to EMR Serverless, comparing 3-week averages pre- and post-migration revealed impressive cost savings—a 75.30% decrease based on retail pricing with 10% improvement in total runtime, boosting operational efficiency. The following graph illustrates the cost trend.

Although quick-run jobs realized minimal per-dollar cost reductions, they delivered the most significant percentage cost savings. With thousands of these workflows running daily, the accumulated savings are substantial. The following graph shows the cost trend for a small workload migrated from EMR on EC2 to EMR Serverless. Comparing 3-week pre- and post-migration averages revealed a remarkable 92.43% cost savings on the retail on-demand pricing, alongside an 80.6% acceleration in total runtime.

Sample workflows adopted EMR Serverless 2

Layer 7: Platform-wide improvements

We aim to revolutionize compute operations at GoDaddy, providing simplified yet powerful solutions for all users with our Intelligent Compute Platform. With AWS compute solutions like EMR Serverless and EMR on EC2, it provided optimized runs of data processing and machine learning (ML) workloads. An ML-powered job broker intelligently determines when and how to run jobs based on various parameters, while still allowing power users to customize. Additionally, an ML-powered compute resource manager pre-provisions resources based on load and historical data, providing efficient, fast provisioning at optimum cost. Intelligent compute empowers users with out-of-the-box optimization, catering to diverse personas without compromising power users.

The following diagram shows a high-level illustration of the intelligent compute architecture.

Insights and recommended best-practices

The following section discusses the insights we’ve gathered and the recommended best practices we’ve developed during our preliminary and wider adoption stages.

Infrastructure preparation

Although EMR Serverless is a deployment method within EMR, it requires some infrastructure preparedness to optimize its potential. Consider the following requirements and practical guidance on implementation:

  • Use large subnets across multiple Availability Zones – When running EMR Serverless workloads within your VPC, make sure the subnets span across multiple Availability Zones and are not constrained by IP addresses. Refer to Configuring VPC access and Best practices for subnet planning for details.
  • Modify maximum concurrent vCPU quota For extensive compute requirements, it is recommended to increase your max concurrent vCPUs per account service quota.
  • Amazon MWAA version compatibility When adopting EMR Serverless, GoDaddy’s decentralized Amazon MWAA ecosystem for data pipeline orchestration created compatibility issues from disparate AWS Providers versions. Directly upgrading Amazon MWAA was more efficient than updating numerous DAGs. We facilitated adoption by upgrading Amazon MWAA instances ourselves, documenting issues, and sharing findings and effort estimates for accurate upgrade planning.
  • GoDaddy EMR operator To streamline migrating numerous Airflow DAGs from EMR on EC2 to EMR Serverless, we developed custom operators adapting existing interfaces. This allowed seamless transitions while retaining familiar tuning options. Data engineers could easily migrate pipelines with simple find-replace imports and immediately use EMR Serverless.

Unexpected behavior mitigation

The following are unexpected behaviors we ran into and what we did to mitigate them:

  • Spark DRA aggressive scaling For some jobs (8.33% of initial benchmarks, 13.6% of production), cost increased after migrating to EMR Serverless. This was due to Spark DRA excessively assigning new workers briefly, prioritizing performance over cost. To counteract this, we set maximum executor thresholds by adjusting spark.dynamicAllocation.maxExecutor, effectively limiting EMR Serverless scaling aggression. When migrating from EMR on EC2, we suggest observing the max core count in the Spark History UI to replicate similar compute limits in EMR Serverless, such as --conf spark.executor.cores and --conf spark.dynamicAllocation.maxExecutors.
  • Managing disk space for large-scale jobs When transitioning jobs that process large data volumes with substantial shuffles and significant disk requirements to EMR Serverless, we recommend configuring spark.emr-serverless.executor.disk by referring to existing Spark job metrics. Furthermore, configurations like spark.executor.cores combined with spark.emr-serverless.executor.disk and spark.dynamicAllocation.maxExecutors allow control over the underlying worker size and total attached storage when advantageous. For example, a shuffle-heavy job with relatively low disk usage may benefit from using a larger worker to increase the likelihood of local shuffle fetches.

Conclusion

As discussed in this post, our experiences with adopting EMR Serverless on arm64 have been overwhelmingly positive. The impressive results we’ve achieved, including a 60% reduction in cost, 50% faster runs of batch Spark workloads, and an astounding five-times improvement in development and testing speed, speak volumes about the potential of this technology. Furthermore, our current results suggest that by widely adopting Graviton2 on EMR Serverless, we could potentially reduce the carbon footprint by up to 60% for our batch processing.

However, it’s crucial to understand that these results are not a one-size-fits-all scenario. The enhancements you can expect are subject to factors including, but not limited to, the specific nature of your workflows, cluster configurations, resource utilization levels, and fluctuations in computational capacity. Therefore, we strongly advocate for a data-driven, ring-based deployment strategy when considering the integration of EMR Serverless, which can help optimize its benefits to the fullest.

Special thanks to Mukul Sharma and Boris Berlin for their contributions to benchmarking. Many thanks to Travis Muhlestein (CDO), Abhijit Kundu (VP Eng), Vincent Yung (Sr. Director Eng.), and Wai Kin Lau (Sr. Director Data Eng.) for their continued support.


About the Authors

Brandon Abear is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He enjoys all things big data. In his spare time, he enjoys traveling, watching movies, and playing rhythm games.

Dinesh Sharma is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about user experience and developer productivity, always looking for ways to optimize engineering processes and saving cost. In his spare time, he loves reading and is an avid manga fan.

John Bush is a Principal Software Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about making it easier for organizations to manage data and use it to drive their businesses forward. In his spare time, he loves hiking, camping, and riding his ebike.

Ozcan Ilikhan is the Director of Engineering for the Data and ML Platform at GoDaddy. He has over two decades of multidisciplinary leadership experience, spanning startups to global enterprises. He has a passion for leveraging data and AI in creating solutions that delight customers, empower them to achieve more, and boost operational efficiency. Outside of his professional life, he enjoys reading, hiking, gardening, volunteering, and embarking on DIY projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in big data and analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Data governance in the age of generative AI

Post Syndicated from Krishna Rupanagunta original https://aws.amazon.com/blogs/big-data/data-governance-in-the-age-of-generative-ai/

Data is your generative AI differentiator, and a successful generative AI implementation depends on a robust data strategy incorporating a comprehensive data governance approach. Working with large language models (LLMs) for enterprise use cases requires the implementation of quality and privacy considerations to drive responsible AI. However, enterprise data generated from siloed sources combined with the lack of a data integration strategy creates challenges for provisioning the data for generative AI applications. The need for an end-to-end strategy for data management and data governance at every step of the journey—from ingesting, storing, and querying data to analyzing, visualizing, and running artificial intelligence (AI) and machine learning (ML) models—continues to be of paramount importance for enterprises.

In this post, we discuss the data governance needs of generative AI application data pipelines, a critical building block to govern data used by LLMs to improve the accuracy and relevance of their responses to user prompts in a safe, secure, and transparent manner. Enterprises are doing this by using proprietary data with approaches like Retrieval Augmented Generation (RAG), fine-tuning, and continued pre-training with foundation models.

Data governance is a critical building block across all these approaches, and we see two emerging areas of focus. First, many LLM use cases rely on enterprise knowledge that needs to be drawn from unstructured data such as documents, transcripts, and images, in addition to structured data from data warehouses. Unstructured data is typically stored across siloed systems in varying formats, and generally not managed or governed with the same level of rigor as structured data. Second, generative AI applications introduce a higher number of data interactions than conventional applications, which requires that the data security, privacy, and access control policies be implemented as part of the generative AI user workflows.

In this post, we cover data governance for building generative AI applications on AWS with a lens on structured and unstructured enterprise knowledge sources, and the role of data governance during the user request-response workflows.

Use case overview

Let’s explore an example of a customer support AI assistant. The following figure shows the typical conversational workflow that is initiated with a user prompt.

The workflow includes the following key data governance steps:

  1. Prompt user access control and security policies.
  2. Access policies to extract permissions based on relevant data and filter out results based on the prompt user role and permissions.
  3. Enforce data privacy policies such as personally identifiable information (PII) redactions.
  4. Enforce fine-grained access control.
  5. Grant the user role permissions for sensitive information and compliance policies.

To provide a response that includes the enterprise context, each user prompt needs to be augmented with a combination of insights from structured data from the data warehouse and unstructured data from the enterprise data lake. On the backend, the batch data engineering processes refreshing the enterprise data lake need to expand to ingest, transform, and manage unstructured data. As part of the transformation, the objects need to be treated to ensure data privacy (for example, PII redaction). Finally, access control policies also need to be extended to the unstructured data objects and to vector data stores.

Let’s look at how data governance can be applied to the enterprise knowledge source data pipelines and the user request-response workflows.

Enterprise knowledge: Data management

The following figure summarizes data governance considerations for data pipelines and the workflow for applying data governance.

Data governance steps in data pipelines

In the above figure, the data engineering pipelines include the following data governance steps:

  1. Create and update a catalog through data evolution.
  2. Implement data privacy policies.
  3. Implement data quality by data type and source.
  4. Link structured and unstructured datasets.
  5. Implement unified fine-grained access controls for structured and unstructured datasets.

Let’s look at some of the key changes in the data pipelines namely, data cataloging, data quality, and vector embedding security in more detail.

Data discoverability

Unlike structured data, which is managed in well-defined rows and columns, unstructured data is stored as objects. For users to be able to discover and comprehend the data, the first step is to build a comprehensive catalog using the metadata that is generated and captured in the source systems. This starts with the objects (such as documents and transcript files) being ingested from the relevant source systems into the raw zone in the data lake in Amazon Simple Storage Service (Amazon S3) in their respective native formats (as illustrated in the preceding figure). From here, object metadata (such as file owner, creation date, and confidentiality level) is extracted and queried using Amazon S3 capabilities. Metadata can vary by data source, and it’s important to examine the fields and, where required, derive the necessary fields to complete all the necessary metadata. For instance, if an attribute like content confidentiality is not tagged at a document level in the source application, this may need to be derived as part of the metadata extraction process and added as an attribute in the data catalog. The ingestion process needs to capture object updates (changes, deletions) in addition to new objects on an ongoing basis. For detailed implementation guidance, refer to Unstructured data management and governance using AWS AI/ML and analytics services. To further simplify the discovery and introspection between business glossaries and technical data catalogs, you can use Amazon DataZone for business users to discover and share data stored across data silos.

Data privacy

Enterprise knowledge sources often contain PII and other sensitive data (such as addresses and Social Security numbers). Based on your data privacy policies, these elements need to be treated (masked, tokenized, or redacted) from the sources before they can be used for downstream use cases. From the raw zone in Amazon S3, the objects need to be processed before they can be consumed by downstream generative AI models. A key requirement here is PII identification and redaction, which you can implement with Amazon Comprehend. It’s important to remember that it will not always be feasible to strip away all the sensitive data without impacting the context of the data. Semantic context is one of the key factors that drive the accuracy and relevance of generative AI model outputs, and it’s critical to work backward from the use case and strike the necessary balance between privacy controls and model performance.

Data enrichment

In addition, additional metadata may need to be extracted from the objects. Amazon Comprehend provides capabilities for entity recognition (for example, identifying domain-specific data like policy numbers and claim numbers) and custom classification (for example, categorizing a customer care chat transcript based on the issue description). Furthermore, you may need to combine the unstructured and structured data to create a holistic picture of key entities, like customers. For example, in an airline loyalty scenario, there would be significant value in linking unstructured data capture of customer interactions (such as customer chat transcripts and customer reviews) with structured data signals (such as ticket purchases and miles redemption) to create a more complete customer profile that can then enable the delivery of better and more relevant trip recommendations. AWS Entity Resolution is an ML service that helps in matching and linking records. This service helps link related sets of information to create deeper, more connected data about key entities like customers, products, and so on, which can further improve the quality and relevance of LLM outputs. This is available in the transformed zone in Amazon S3 and is ready to be consumed downstream for vector stores, fine-tuning, or training of LLMs. After these transformations, data can be made available in the curated zone in Amazon S3.

Data quality

A critical factor to realizing the full potential of generative AI is dependent on the quality of the data that is used to train the models as well as the data that is used to augment and enhance the model response to a user input. Understanding the models and their outcomes in the context of accuracy, bias, and reliability is directly proportional to the quality of data used to build and train the models.

Amazon SageMaker Model Monitor provides a proactive detection of deviations in model data quality drift and model quality metrics drift. It also monitors bias drift in your model’s predictions and feature attribution. For more details, refer to Monitoring in-production ML models at large scale using Amazon SageMaker Model Monitor. Detecting bias in your model is a fundamental building block to responsible AI, and Amazon SageMaker Clarify helps detect potential bias that can produce a negative or a less accurate result. To learn more, see Learn how Amazon SageMaker Clarify helps detect bias.

A newer area of focus in generative AI is the use and quality of data in prompts from enterprise and proprietary data stores. An emerging best practice to consider here is shift-left, which puts a strong emphasis on early and proactive quality assurance mechanisms. In the context of data pipelines designed to process data for generative AI applications, this implies identifying and resolving data quality issues earlier upstream to mitigate the potential impact of data quality issues later. AWS Glue Data Quality not only measures and monitors the quality of your data at rest in your data lakes, data warehouses, and transactional databases, but also allows early detection and correction of quality issues for your extract, transform, and load (ETL) pipelines to ensure your data meets the quality standards before it is consumed. For more details, refer to Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog.

Vector store governance

Embeddings in vector databases elevate the intelligence and capabilities of generative AI applications by enabling features such as semantic search and reducing hallucinations. Embeddings typically contain private and sensitive data, and encrypting the data is a recommended step in the user input workflow. Amazon OpenSearch Serverless stores and searches your vector embeddings, and encrypts your data at rest with AWS Key Management Service (AWS KMS). For more details, see Introducing the vector engine for Amazon OpenSearch Serverless, now in preview. Similarly, additional vector engine options on AWS, including Amazon Kendra and Amazon Aurora, encrypt your data at rest with AWS KMS. For more information, refer to Encryption at rest and Protecting data using encryption.

As embeddings are generated and stored in a vector store, controlling access to the data with role-based access control (RBAC) becomes a key requirement to maintaining overall security. Amazon OpenSearch Service provides fine-grained access controls (FGAC) features with AWS Identity and Access Management (IAM) rules that can be associated with Amazon Cognito users. Corresponding user access control mechanisms are also provided by OpenSearch Serverless, Amazon Kendra, and Aurora. To learn more, refer to Data access control for Amazon OpenSearch Serverless, Controlling user access to documents with tokens, and Identity and access management for Amazon Aurora, respectively.

User request-response workflows

Controls in the data governance plane need to be integrated into the generative AI application as part of the overall solution deployment to ensure compliance with data security (based on role-based access controls) and data privacy (based on role-based access to sensitive data) policies. The following figure illustrates the workflow for applying data governance.

Data governance in user prompt workflow

The workflow includes the following key data governance steps:

  1. Provide a valid input prompt for alignment with compliance policies (for example, bias and toxicity).
  2. Generate a query by mapping prompt keywords with the data catalog.
  3. Apply FGAC policies based on user role.
  4. Apply RBAC policies based on user role.
  5. Apply data and content redaction to the response based on user role permissions and compliance policies.

As part of the prompt cycle, the user prompt must be parsed and keywords extracted to ensure alignment with compliance policies using a service like Amazon Comprehend (see New for Amazon Comprehend – Toxicity Detection) or Guardrails for Amazon Bedrock (preview). When that is validated, if the prompt requires structured data to be extracted, the keywords can be used against the data catalog (business or technical) to extract the relevant data tables and fields and construct a query from the data warehouse. The user permissions are evaluated using AWS Lake Formation to filter the relevant data. In the case of unstructured data, the search results are restricted based on the user permission policies implemented in the vector store. As a final step, the output response from the LLM needs to be evaluated against user permissions (to ensure data privacy and security) and compliance with safety (for example, bias and toxicity guidelines).

Although this process is specific to a RAG implementation and is applicable to other LLM implementation strategies, there are additional controls:

  • Prompt engineering – Access to the prompt templates to invoke need to be restricted based on access controls augmented by business logic.
  • Fine-tuning models and training foundation models – In cases where objects from the curated zone in Amazon S3 are used as training data for fine-tuning the foundation models, the permissions policies need to be configured with Amazon S3 identity and access management at the bucket or object level based on the requirements.

Summary

Data governance is critical to enabling organizations to build enterprise generative AI applications. As enterprise use cases continue to evolve, there will be a need to expand the data infrastructure to govern and manage new, diverse, unstructured datasets to ensure alignment with privacy, security, and quality policies. These policies need to be implemented and managed as part of data ingestion, storage, and management of the enterprise knowledge base along with the user interaction workflows. This makes sure that the generative AI applications not only minimize the risk of sharing inaccurate or wrong information, but also protect from bias and toxicity that can lead to harmful or libelous outcomes. To learn more about data governance on AWS, see What is Data Governance?

In subsequent posts, we will provide implementation guidance on how to expand the governance of the data infrastructure to support generative AI use cases.


About the Authors

Krishna Rupanagunta leads a team of Data and AI Specialists at AWS. He and his team work with customers to help them innovate faster and make better decisions using Data, Analytics, and AI/ML. He can be reached via LinkedIn.

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Raghvender Arni (Arni) leads the Customer Acceleration Team (CAT) within AWS Industries. The CAT is a global cross-functional team of customer facing cloud architects, software engineers, data scientists, and AI/ML experts and designers that drives innovation via advanced prototyping, and drives cloud operational excellence via specialized technical expertise.