Tag Archives: AWS Big Data

Run interactive workloads on Amazon EMR Serverless from Amazon EMR Studio

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/run-interactive-workloads-on-amazon-emr-serverless-from-amazon-emr-studio/

Starting from release 6.14, Amazon EMR Studio supports interactive analytics on Amazon EMR Serverless. You can now use EMR Serverless applications as the compute, in addition to Amazon EMR on EC2 clusters and Amazon EMR on EKS virtual clusters, to run JupyterLab notebooks from EMR Studio Workspaces.

EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug analytics applications written in PySpark, Python, and Scala. EMR Serverless is a serverless option for Amazon EMR that makes it straightforward to run open source big data analytics frameworks such as Apache Spark without configuring, managing, and scaling clusters or servers.

In the post, we demonstrate how to do the following:

  • Create an EMR Serverless endpoint for interactive applications
  • Attach the endpoint to an existing EMR Studio environment
  • Create a notebook and run an interactive application
  • Seamlessly diagnose interactive applications from within EMR Studio

Prerequisites

In a typical organization, an AWS account administrator will set up AWS resources such as AWS Identity and Access management (IAM) roles, Amazon Simple Storage Service (Amazon S3) buckets, and Amazon Virtual Private Cloud (Amazon VPC) resources for internet access and access to other resources in the VPC. They assign EMR Studio administrators who manage setting up EMR Studios and assigning users to a specific EMR Studio. Once they’re assigned, EMR Studio developers can use EMR Studio to develop and monitor workloads.

Make sure you set up resources like your S3 bucket, VPC subnets, and EMR Studio in the same AWS Region.

Complete the following steps to deploy these prerequisites:

  1. Launch the following AWS CloudFormation stack.
    Launch Cloudformation Stack
  2. Enter values for AdminPassword and DevPassword and make a note of the passwords you create.
  3. Choose Next.
  4. Keep the settings as default and choose Next again.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Submit.

We have also provided instructions to deploy these resources manually with sample IAM policies in the GitHub repo.

Set up EMR Studio and a serverless interactive application

After the AWS account administrator completes the prerequisites, the EMR Studio administrator can log in to the AWS Management Console to create an EMR Studio, Workspace, and EMR Serverless application.

Create an EMR Studio and Workspace

The EMR Studio administrator should log in to the console using the emrs-interactive-app-admin-user user credentials. If you deployed the prerequisite resources using the provided CloudFormation template, use the password that you provided as an input parameter.

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. Choose Get started.
  3. Select Create and launch EMR Studio.

This creates a Studio with the default name studio_1 and a Workspace with the default name My_First_Workspace. A new browser tab will open for the Studio_1 user interface.

Create and Launch EMR Studio

Create an EMR Serverless application

Complete the following steps to create an EMR Serverless application:

  1. On the EMR Studio console, choose Applications in the navigation pane.
  2. Create a new application.
  3. For Name, enter a name (for example, my-serverless-interactive-application).
  4. For Application setup options, select Use custom settings for interactive workloads.
    Create Serverless Application using custom settings

For interactive applications, as a best practice, we recommend keeping the driver and workers pre-initialized by configuring the pre-initialized capacity at the time of application creation. This effectively creates a warm pool of workers for an application and keeps the resources ready to be consumed, enabling the application to respond in seconds. For further best practices for creating EMR Serverless applications, see Define per-team resource limits for big data workloads using Amazon EMR Serverless.

  1. In the Interactive endpoint section, select Enable Interactive endpoint.
  2. In the Network connections section, choose the VPC, private subnets, and security group you created previously.

If you deployed the CloudFormation stack provided in this post, choose emr-serverless-sg­  as the security group.

A VPC is needed for the workload to be able to access the internet from within the EMR Serverless application in order to download external Python packages. The VPC also allows you to access resources such as Amazon Relational Database Service (Amazon RDS) and Amazon Redshift that are in the VPC from this application. Attaching a serverless application to a VPC can lead to IP exhaustion in the subnet, so make sure there are sufficient IP addresses in your subnet.

  1. Choose Create and start application.

Enable Interactive Endpoints, Choose private subnets and security group

On the applications page, you can verify that the status of your serverless application changes to Started.

  1. Select your application and choose How it works.
  2. Choose View and launch workspaces.
  3. Choose Configure studio.

  1. For Service role¸ provide the EMR Studio service role you created as a prerequisite (emr-studio-service-role).
  2. For Workspace storage, enter the path of the S3 bucket you created as a prerequisite (emrserverless-interactive-blog-<account-id>-<region-name>).
  3. Choose Save changes.

Choose emr-studio-service-role and emrserverless-interactive-blog s3 bucket

14.  Navigate to the Studios console by choosing Studios in the left navigation menu in the EMR Studio section. Note the Studio access URL from the Studios console and provide it to your developers to run their Spark applications.

Run your first Spark application

After the EMR Studio administrator has created the Studio, Workspace, and serverless application, the Studio user can use the Workspace and application to develop and monitor Spark workloads.

Launch the Workspace and attach the serverless application

Complete the following steps:

  1. Using the Studio URL provided by the EMR Studio administrator, log in using the emrs-interactive-app-dev-user user credentials shared by the AWS account admin.

If you deployed the prerequisite resources using the provided CloudFormation template, use the password that you provided as an input parameter.

On the Workspaces page, you can check the status of your Workspace. When the Workspace is launched, you will see the status change to Ready.

  1. Launch the workspace by choosing the workspace name (My_First_Workspace).

This will open a new tab. Make sure your browser allows pop-ups.

  1. In the Workspace, choose Compute (cluster icon) in the navigation pane.
  2. For EMR Serverless application, choose your application (my-serverless-interactive-application).
  3. For Interactive runtime role, choose an interactive runtime role (for this post, we use emr-serverless-runtime-role).
  4. Choose Attach to attach the serverless application as the compute type for all the notebooks in this Workspace.

Choose my-serverless-interactive-application as your app and emr-serverless-runtime-role and attach

Run your Spark application interactively

Complete the following steps:

  1. Choose the Notebook samples (three dots icon) in the navigation pane and open Getting-started-with-emr-serverless notebook.
  2. Choose Save to Workspace.

There are three choices of kernels for our notebook: Python 3, PySpark, and Spark (for Scala).

  1. When prompted, choose PySpark as the kernel.
  2. Choose Select.

Choose PySpark as kernel

Now you can run your Spark application. To do so, use the %%configure Sparkmagic command, which configures the session creation parameters. Interactive applications support Python virtual environments. We use a custom environment in the worker nodes by specifying a path for a different Python runtime for the executor environment using spark.executorEnv.PYSPARK_PYTHON. See the following code:

%%configure -f
{
  "conf": {
    "spark.pyspark.virtualenv.enabled": "true",
    "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv",
    "spark.pyspark.virtualenv.type": "native",
    "spark.pyspark.python": "/usr/bin/python3",
    "spark.executorEnv.PYSPARK_PYTHON": "/usr/bin/python3"
  }
}

Install external packages

Now that you have an independent virtual environment for the workers, EMR Studio notebooks allow you to install external packages from within the serverless application by using the Spark install_pypi_package function through the Spark context. Using this function makes the package available for all the EMR Serverless workers.

First, install matplotlib, a Python package, from PyPi:

sc.install_pypi_package("matplotlib")

If the preceding step doesn’t respond, check your VPC setup and make sure it is configured correctly for internet access.

Now you can use a dataset and visualize your data.

Create visualizations

To create visualizations, we use a public dataset on NYC yellow taxis:

file_name = "s3://athena-examples-us-east-1/notebooks/yellow_tripdata_2016-01.parquet"
taxi_df = (spark.read.format("parquet").option("header", "true") \
.option("inferSchema", "true").load(file_name))

In the preceding code block, you read the Parquet file from a public bucket in Amazon S3. The file has headers, and we want Spark to infer the schema. You then use a Spark dataframe to group and count specific columns from taxi_df:

taxi1_df = taxi_df.groupBy("VendorID", "passenger_count").count()
taxi1_df.show()

Use %%display magic to view the result in table format:

%%display
taxi1_df

Table shows vendor_id, passenger_count and count columns

You can also quickly visualize your data with five types of charts. You can choose the display type and the chart will change accordingly. In the following screenshot, we use a bar chart to visualize our data.

bar chart showing passenger_count against each vendor_id

Interact with EMR Serverless using Spark SQL

You can interact with tables in the AWS Glue Data Catalog using Spark SQL on EMR Serverless. In the sample notebook, we show how you can transform data using a Spark dataframe.

First, create a new temporary view called taxis. This allows you to use Spark SQL to select data from this view. Then create a taxi dataframe for further processing:

taxi_df.createOrReplaceTempView("taxis")
sqlDF = spark.sql(
    "SELECT DOLocationID, sum(total_amount) as sum_total_amount \
     FROM taxis where DOLocationID < 25 Group by DOLocationID ORDER BY DOLocationID"
)
sqlDF.show(5)

Table shows vendor_id, passenger_count and count columns

In each cell in your EMR Studio notebook, you can expand Spark Job Progress to view the various stages of the job submitted to EMR Serverless while running this specific cell. You can see the time taken to complete each stage. In the following example, stage 14 of the job has 12 completed tasks. In addition, if there is any failure, you can see the logs, making troubleshooting a seamless experience. We discuss this more in the next section.

Job[14]: showString at NativeMethodAccessorImpl.java:0 and Job[15]: showString at NativeMethodAccessorImpl.java:0

Use the following code to visualize the processed dataframe using the matplotlib package. You use the maptplotlib library to plot the dropoff location and the total amount as a bar chart.

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
plt.clf()
df = sqlDF.toPandas()
plt.bar(df.DOLocationID, df.sum_total_amount)
%matplot plt

Diagnose interactive applications

You can get the session information for your Livy endpoint using the %%info Sparkmagic. This gives you links to access the Spark UI as well as the driver log right in your notebook.

The following screenshot is a driver log snippet for our application, which we opened via the link in our notebook.

driver log screenshot

Similarly, you can choose the link below Spark UI to open the UI. The following screenshot shows the Executors tab, which provides access to the driver and executor logs.

The following screenshot shows stage 14, which corresponds to the Spark SQL step we saw earlier in which we calculated the location wise sum of total taxi collections, which had been broken down into 12 tasks. Through the Spark UI, the interactive application provides fine-grained task-level status, I/O, and shuffle details, as well as links to corresponding logs for each task for this stage right from your notebook, enabling a seamless troubleshooting experience.

Clean up

If you no longer want to keep the resources created in this post, complete the following cleanup steps:

  1. Delete the EMR Serverless application.
  2. Delete the EMR Studio and the associated workspaces and notebooks.
  3. To delete rest of the resources, navigate to CloudFormation console, select the stack, and choose Delete.

All of the resources will be deleted except the S3 bucket, which has its deletion policy set to retain.

Conclusion

The post showed how to run interactive PySpark workloads in EMR Studio using EMR Serverless as the compute. You can also build and monitor Spark applications in an interactive JupyterLab Workspace.

In an upcoming post, we’ll discuss additional capabilities of EMR Serverless Interactive applications, such as:

  • Working with resources such as Amazon RDS and Amazon Redshift in your VPC (for example, for JDBC/ODBC connectivity)
  • Running transactional workloads using serverless endpoints

If this is your first time exploring EMR Studio, we recommend checking out the Amazon EMR workshops and referring to Create an EMR Studio.


About the Authors

Sekar Srinivasan is a Principal Specialist Solutions Architect at AWS focused on Data Analytics and AI. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, focused on underprivileged Children’s education.

Disha Umarwani is a Sr. Data Architect with Amazon Professional Services within Global Health Care and LifeSciences. She has worked with customers to design, architect and implement Data Strategy at scale. She specializes in architecting Data Mesh architectures for Enterprise platforms.

Automate large-scale data validation using Amazon EMR and Apache Griffin

Post Syndicated from Dipal Mahajan original https://aws.amazon.com/blogs/big-data/automate-large-scale-data-validation-using-amazon-emr-and-apache-griffin/

Many enterprises are migrating their on-premises data stores to the AWS Cloud. During data migration, a key requirement is to validate all the data that has been moved from source to target. This data validation is a critical step, and if not done correctly, may result in the failure of the entire project. However, developing custom solutions to determine migration accuracy by comparing the data between the source and target can often be time-consuming.

In this post, we walk through a step-by-step process to validate large datasets after migration using a configuration-based tool using Amazon EMR and the Apache Griffin open source library. Griffin is an open source data quality solution for big data, which supports both batch and streaming mode.

In today’s data-driven landscape, where organizations deal with petabytes of data, the need for automated data validation frameworks has become increasingly critical. Manual validation processes are not only time-consuming but also prone to errors, especially when dealing with vast volumes of data. Automated data validation frameworks offer a streamlined solution by efficiently comparing large datasets, identifying discrepancies, and ensuring data accuracy at scale. With such frameworks, organizations can save valuable time and resources while maintaining confidence in the integrity of their data, thereby enabling informed decision-making and enhancing overall operational efficiency.

The following are standout features for this framework:

  • Utilizes a configuration-driven framework
  • Offers plug-and-play functionality for seamless integration
  • Conducts count comparison to identify any disparities
  • Implements robust data validation procedures
  • Ensures data quality through systematic checks
  • Provides access to a file containing mismatched records for in-depth analysis
  • Generates comprehensive reports for insights and tracking purposes

Solution overview

This solution uses the following services:

  • Amazon Simple Storage Service (Amazon S3) or Hadoop Distributed File System (HDFS) as the source and target.
  • Amazon EMR to run the PySpark script. We use a Python wrapper on top of Griffin to validate data between Hadoop tables created over HDFS or Amazon S3.
  • AWS Glue to catalog the technical table, which stores the results of the Griffin job.
  • Amazon Athena to query the output table to verify the results.

We use tables that store the count for each source and target table and also create files that show the difference of records between source and target.

The following diagram illustrates the solution architecture.

Architecture_Diagram

In the depicted architecture and our typical data lake use case, our data either resides n Amazon S3 or is migrated from on premises to Amazon S3 using replication tools such as AWS DataSync or AWS Database Migration Service (AWS DMS). Although this solution is designed to seamlessly interact with both Hive Metastore and the AWS Glue Data Catalog, we use the Data Catalog as our example in this post.

This framework operates within Amazon EMR, automatically running scheduled tasks on a daily basis, as per the defined frequency. It generates and publishes reports in Amazon S3, which are then accessible via Athena. A notable feature of this framework is its capability to detect count mismatches and data discrepancies, in addition to generating a file in Amazon S3 containing full records that didn’t match, facilitating further analysis.

In this example, we use three tables in an on-premises database to validate between source and target : balance_sheet, covid, and survery_financial_report.

Prerequisites

Before getting started, make sure you have the following prerequisites:

Deploy the solution

To make it straightforward for you to get started, we have created a CloudFormation template that automatically configures and deploys the solution for you. Complete the following steps:

  1. Create an S3 bucket in your AWS account called bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region} (provide your AWS account ID and AWS Region).
  2. Unzip the following file to your local system.
  3. After unzipping the file to your local system, change <bucket name> to the one you created in your account (bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}) in the following files:
    1. bootstrap-bdb-3070-datavalidation.sh
    2. Validation_Metrics_Athena_tables.hql
    3. datavalidation/totalcount/totalcount_input.txt
    4. datavalidation/accuracy/accuracy_input.txt
  4. Upload all the folders and files in your local folder to your S3 bucket:
    aws s3 cp . s3://<bucket_name>/ --recursive

  5. Run the following CloudFormation template in your account.

The CloudFormation template creates a database called griffin_datavalidation_blog and an AWS Glue crawler called griffin_data_validation_blog on top of the data folder in the .zip file.

  1. Choose Next.
    Cloudformation_template_1
  2. Choose Next again.
  3. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs
  1. Run the AWS Glue crawler and verify that six tables have been created in the Data Catalog.
  2. Run the following CloudFormation template in your account.

This template creates an EMR cluster with a bootstrap script to copy Griffin-related JARs and artifacts. It also runs three EMR steps:

  • Create two Athena tables and two Athena views to see the validation matrix produced by the Griffin framework
  • Run count validation for all three tables to compare the source and target table
  • Run record-level and column-level validations for all three tables to compare between the source and target table
  1. For SubnetID, enter your subnet ID.
  2. Choose Next.
    Cloudformation_template_2
  3. Choose Next again.
  4. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.

You can view the stack outputs on the console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs

It takes approximately 5 minutes for the deployment to complete. When the stack is complete, you should see the EMRCluster resource launched and available in your account.

When the EMR cluster is launched, it runs the following steps as part of the post-cluster launch:

  • Bootstrap action – It installs the Griffin JAR file and directories for this framework. It also downloads sample data files to use in the next step.
  • Athena_Table_Creation – It creates tables in Athena to read the result reports.
  • Count_Validation – It runs the job to compare the data count between source and target data from the Data Catalog table and stores the results in an S3 bucket, which will be read via an Athena table.
  • Accuracy – It runs the job to compare the data rows between the source and target data from the Data Catalog table and store the results in an S3 bucket, which will be read via the Athena table.

Athena_table

When the EMR steps are complete, your table comparison is done and ready to view in Athena automatically. No manual intervention is needed for validation.

Validate data with Python Griffin

When your EMR cluster is ready and all the jobs are complete, it means the count validation and data validation are complete. The results have been stored in Amazon S3 and the Athena table is already created on top of that. You can query the Athena tables to view the results, as shown in the following screenshot.

The following screenshot shows the count results for all tables.

Summary_table

The following screenshot shows the data accuracy results for all tables.

Detailed_view

The following screenshot shows the files created for each table with mismatched records. Individual folders are generated for each table directly from the job.

mismatched_records

Every table folder contains a directory for each day the job is run.

S3_path_mismatched

Within that specific date, a file named __missRecords contains records that do not match.

S3_path_mismatched_2

The following screenshot shows the contents of the __missRecords file.

__missRecords

Clean up

To avoid incurring additional charges, complete the following steps to clean up your resources when you’re done with the solution:

  1. Delete the AWS Glue database griffin_datavalidation_blog and drop the database griffin_datavalidation_blog cascade.
  2. Delete the prefixes and objects you created from the bucket bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}.
  3. Delete the CloudFormation stack, which removes your additional resources.

Conclusion

This post showed how you can use Python Griffin to accelerate the post-migration data validation process. Python Griffin helps you calculate count and row- and column-level validation, identifying mismatched records without writing any code.

For more information about data quality use cases, refer to Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog and AWS Glue Data Quality.


About the Authors

Dipal Mahajan serves as a Lead Consultant at Amazon Web Services, providing expert guidance to global clients in developing highly secure, scalable, reliable, and cost-efficient cloud applications. With a wealth of experience in software development, architecture, and analytics across diverse sectors such as finance, telecom, retail, and healthcare, he brings invaluable insights to his role. Beyond the professional sphere, Dipal enjoys exploring new destinations, having already visited 14 out of 30 countries on his wish list.

Akhil is a Lead Consultant at AWS Professional Services. He helps customers design & build scalable data analytics solutions and migrate data pipelines and data warehouses to AWS. In his spare time, he loves travelling, playing games and watching movies.

Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

How Amazon optimized its high-volume financial reconciliation process with Amazon EMR for higher scalability and performance

Post Syndicated from Jeeshan Khetrapal original https://aws.amazon.com/blogs/big-data/how-amazon-optimized-its-high-volume-financial-reconciliation-process-with-amazon-emr-for-higher-scalability-and-performance/

Account reconciliation is an important step to ensure the completeness and accuracy of financial statements. Specifically, companies must reconcile balance sheet accounts that could contain significant or material misstatements. Accountants go through each account in the general ledger of accounts and verify that the balance listed is complete and accurate. When discrepancies are found, accountants investigate and take appropriate corrective action.

As part of Amazon’s FinTech organization, we offer a software platform that empowers the internal accounting teams at Amazon to conduct account reconciliations. To optimize the reconciliation process, these users require high performance transformation with the ability to scale on demand, as well as the ability to process variable file sizes ranging from as low as a few MBs to more than 100 GB. It’s not always possible to fit data onto a single machine or process it with one single program in a reasonable time frame. This computation has to be done fast enough to provide practical services where programming logic and underlying details (data distribution, fault tolerance, and scheduling) can be separated.

We can achieve these simultaneous computations on multiple machines or threads of the same function across groups of elements of a dataset by using distributed data processing solutions. This encouraged us to reinvent our reconciliation service powered by AWS services, including Amazon EMR and the Apache Spark distributed processing framework, which uses PySpark. This service enables users to process files over 100 GB containing up to 100 million transactions in less than 30 minutes. The reconciliation service has become a powerhouse for data processing, and now users can seamlessly perform a variety of operations, such as Pivot, JOIN (like an Excel VLOOKUP operation), arithmetic operations, and more, providing a versatile and efficient solution for reconciling vast datasets. This enhancement is a testament to the scalability and speed achieved through the adoption of distributed data processing solutions.

In this post, we explain how we integrated Amazon EMR to build a highly available and scalable system that enabled us to run a high-volume financial reconciliation process.

Architecture before migration

The following diagram illustrates our previous architecture.

Our legacy service was built with Amazon Elastic Container Service (Amazon ECS) on AWS Fargate. We processed the data sequentially using Python. However, due to its lack of parallel processing capability, we frequently had to increase the cluster size vertically to support larger datasets. For context, 5 GB of data with 50 operations took around 3 hours to process. This service was configured to scale horizontally to five ECS instances that polled messages from Amazon Simple Queue Service (Amazon SQS), which fed the transformation requests. Each instance was configured with 4 vCPUs and 30 GB of memory to allow horizontal scaling. However, we couldn’t expand its capacity on performance because the process happened sequentially, picking chunks of data from Amazon Simple Storage Service (Amazon S3) for processing. For example, a VLOOKUP operation where two files are to be joined required both files to be read in memory chunk by chunk to obtain the output. This became an obstacle for users because they had to wait for long periods of time to process their datasets.

As part of our re-architecture and modernization, we wanted to achieve the following:

  • High availability – The data processing clusters should be highly available, providing three 9s of availability (99.9%)
  • Throughput – The service should handle 1,500 runs per day
  • Latency – It should be able to process 100 GB of data within 30 minutes
  • Heterogeneity – The cluster should be able to support a wide variety of workloads, with files ranging from a few MBs to hundreds of GBs
  • Query concurrency – The implementation demands the ability to support a minimum of 10 degrees of concurrency
  • Reliability of jobs and data consistency – Jobs need to run reliably and consistently to avoid breaking Service Level Agreements (SLAs)
  • Cost-effective and scalable – It must be scalable based on the workload, making it cost-effective
  • Security and compliance – Given the sensitivity of data, it must support fine-grained access control and appropriate security implementations
  • Monitoring – The solution must offer end-to-end monitoring of the clusters and jobs

Why Amazon EMR

Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open source frameworks such as Apache Spark, Apache Hive, and Presto. With these frameworks and related open-source projects, you can process data for analytics purposes and BI workloads. Amazon EMR lets you transform and move large amounts of data in and out of other AWS data stores and databases, such as Amazon S3 and Amazon DynamoDB.

A notable advantage of Amazon EMR lies in its effective use of parallel processing with PySpark, marking a significant improvement over traditional sequential Python code. This innovative approach streamlines the deployment and scaling of Apache Spark clusters, allowing for efficient parallelization on large datasets. The distributed computing infrastructure not only enhances performance, but also enables the processing of vast amounts of data at unprecedented speeds. Equipped with libraries, PySpark facilitates Excel-like operations on DataFrames, and the higher-level abstraction of DataFrames simplifies intricate data manipulations, reducing code complexity. Combined with automatic cluster provisioning, dynamic resource allocation, and integration with other AWS services, Amazon EMR proves to be a versatile solution suitable for diverse workloads, ranging from batch processing to ML. The inherent fault tolerance in PySpark and Amazon EMR promotes robustness, even in the event of node failures, making it a scalable, cost-effective, and high-performance choice for parallel data processing on AWS.

Amazon EMR extends its capabilities beyond the basics, offering a variety of deployment options to cater to diverse needs. Whether it’s Amazon EMR on EC2, Amazon EMR on EKS, Amazon EMR Serverless, or Amazon EMR on AWS Outposts, you can tailor your approach to specific requirements. For those seeking a serverless environment for Spark jobs, integrating AWS Glue is also a viable option. In addition to supporting various open-source frameworks, including Spark, Amazon EMR provides flexibility in choosing deployment modes, Amazon Elastic Compute Cloud (Amazon EC2) instance types, scaling mechanisms, and numerous cost-saving optimization techniques.

Amazon EMR stands as a dynamic force in the cloud, delivering unmatched capabilities for organizations seeking robust big data solutions. Its seamless integration, powerful features, and adaptability make it an indispensable tool for navigating the complexities of data analytics and ML on AWS.

Redesigned architecture

The following diagram illustrates our redesigned architecture.

The solution operates under an API contract, where clients can submit transformation configurations, defining the set of operations alongside the S3 dataset location for processing. The request is queued through Amazon SQS, then directed to Amazon EMR via a Lambda function. This process initiates the creation of an Amazon EMR step for Spark framework implementation on a dedicated EMR cluster. Although Amazon EMR accommodates an unlimited number of steps over a long-running cluster’s lifetime, only 256 steps can be running or pending simultaneously. For optimal parallelization, the step concurrency is set at 10, allowing 10 steps to run concurrently. In case of request failures, the Amazon SQS dead-letter queue (DLQ) retains the event. Spark processes the request, translating Excel-like operations into PySpark code for an efficient query plan. Resilient DataFrames store input, output, and intermediate data in-memory, optimizing processing speed, reducing disk I/O cost, enhancing workload performance, and delivering the final output to the specified Amazon S3 location.

We define our SLA in two dimensions: latency and throughput. Latency is defined as the amount of time taken to perform one job against a deterministic dataset size and the number of operations performed on the dataset. Throughput is defined as the maximum number of simultaneous jobs the service can perform without breaching the latency SLA of one job. The overall scalability SLA of the service depends on the balance of horizontal scaling of elastic compute resources and vertical scaling of individual servers.

Because we had to run 1,500 processes per day with minimal latency and high performance, we choose to integrate Amazon EMR on EC2 deployment mode with managed scaling enabled to support processing variable file sizes.

The EMR cluster configuration provides many different selections:

  • EMR node types – Primary, core, or task nodes
  • Instance purchasing options – On-Demand Instances, Reserved Instances, or Spot Instances
  • Configuration options – EMR instance fleet or uniform instance group
  • Scaling optionsAuto Scaling or Amazon EMR managed scaling

Based on our variable workload, we configured an EMR instance fleet (for best practices, see Reliability). We also decided to use Amazon EMR managed scaling to scale the core and task nodes (for scaling scenarios, refer to Node allocation scenarios). Lastly, we chose memory-optimized AWS Graviton instances, which provide up to 30% lower cost and up to 15% improved performance for Spark workloads.

The following code provides a snapshot of our cluster configuration:

Concurrent steps:10

EMR Managed Scaling:
minimumCapacityUnits: 64
maximumCapacityUnits: 512
maximumOnDemandCapacityUnits: 512
maximumCoreCapacityUnits: 512

Master Instance Fleet:
r6g.xlarge
- 4 vCore, 30.5 GiB memory, EBS only storage
- EBS Storage:250 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 1 units
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:250 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 1 units

Core Instance Fleet:
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 8 units
r6g.4xlarge
- 16 vCore, 122 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 16 units

Task Instances:
r6g.2xlarge
- 8 vCore, 61 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 8 units
r6g.4xlarge
- 16 vCore, 122 GiB memory, EBS only storage
- EBS Storage:100 GiB
- Maximum Spot price: 100 % of On-demand price
- Each instance counts as 16 units

Performance

With our migration to Amazon EMR, we were able to achieve a system performance capable of handling a variety of datasets, ranging from as low as 273 B to as high as 88.5 GB with a p99 of 491 seconds (approximately 8 minutes).

The following figure illustrates the variety of file sizes processed.

The following figure shows our latency.

To compare against sequential processing, we took two datasets containing 53 million records and ran a VLOOKUP operation against each other, along with 49 other Excel-like operations. This took 26 minutes to process in the new service, compared to 5 days to process in the legacy service. This improvement is almost 300 times greater over the previous architecture in terms of performance.

Considerations

Keep in mind the following when considering this solution:

  • Right-sizing clusters – Although Amazon EMR is resizable, it’s important to right-size the clusters. Right-sizing mitigates a slow cluster, if undersized, or higher costs, if the cluster is oversized. To anticipate these issues, you can calculate the number and type of nodes that will be needed for the workloads.
  • Parallel steps – Running steps in parallel allows you to run more advanced workloads, increase cluster resource utilization, and reduce the amount of time taken to complete your workload. The number of steps allowed to run at one time is configurable and can be set when a cluster is launched and any time after the cluster has started. You need to consider and optimize the CPU/memory usage per job when multiple jobs are running in a single shared cluster.
  • Job-based transient EMR clusters – If applicable, it is recommended to use a job-based transient EMR cluster, which delivers superior isolation, verifying that each task operates within its dedicated environment. This approach optimizes resource utilization, helps prevent interference between jobs, and enhances overall performance and reliability. The transient nature enables efficient scaling, providing a robust and isolated solution for diverse data processing needs.
  • EMR Serverless – EMR Serverless is the ideal choice if you prefer not to handle the management and operation of clusters. It allows you to effortlessly run applications using open-source frameworks available within EMR Serverless, offering a straightforward and hassle-free experience.
  • Amazon EMR on EKS – Amazon EMR on EKS offers distinct advantages, such as faster startup times and improved scalability resolving compute capacity challenges—which is particularly beneficial for Graviton and Spot Instance users. The inclusion of a broader range of compute types enhances cost-efficiency, allowing tailored resource allocation. Furthermore, Multi-AZ support provides increased availability. These compelling features provide a robust solution for managing big data workloads with improved performance, cost optimization, and reliability across various computing scenarios.

Conclusion

In this post, we explained how Amazon optimized its high-volume financial reconciliation process with Amazon EMR for higher scalability and performance. If you have a monolithic application that’s dependent on vertical scaling to process additional requests or datasets, then migrating it to a distributed processing framework such as Apache Spark and choosing a managed service such as Amazon EMR for compute may help reduce the runtime to lower your delivery SLA, and also may help reduce the Total Cost of Ownership (TCO).

As we embrace Amazon EMR for this particular use case, we encourage you to explore further possibilities in your data innovation journey. Consider evaluating AWS Glue, along with other dynamic Amazon EMR deployment options such as EMR Serverless or Amazon EMR on EKS, to discover the best AWS service tailored to your unique use case. The future of the data innovation journey holds exciting possibilities and advancements to be explored further.


About the Authors

Jeeshan Khetrapal is a Sr. Software Development Engineer at Amazon, where he develops fintech products based on cloud computing serverless architectures that are responsible for companies’ IT general controls, financial reporting, and controllership for governance, risk, and compliance.

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Improve healthcare services through patient 360: A zero-ETL approach to enable near real-time data analytics

Post Syndicated from Saeed Barghi original https://aws.amazon.com/blogs/big-data/improve-healthcare-services-through-patient-360-a-zero-etl-approach-to-enable-near-real-time-data-analytics/

Healthcare providers have an opportunity to improve the patient experience by collecting and analyzing broader and more diverse datasets. This includes patient medical history, allergies, immunizations, family disease history, and individuals’ lifestyle data such as workout habits. Having access to those datasets and forming a 360-degree view of patients allows healthcare providers such as claim analysts to see a broader context about each patient and personalize the care they provide for every individual. This is underpinned by building a complete patient profile that enables claim analysts to identify patterns, trends, potential gaps in care, and adherence to care plans. They can then use the result of their analysis to understand a patient’s health status, treatment history, and past or upcoming doctor consultations to make more informed decisions, streamline the claim management process, and improve operational outcomes. Achieving this will also improve general public health through better and more timely interventions, identify health risks through predictive analytics, and accelerate the research and development process.

AWS has invested in a zero-ETL (extract, transform, and load) future so that builders can focus more on creating value from data, instead of having to spend time preparing data for analysis. The solution proposed in this post follows a zero-ETL approach to data integration to facilitate near real-time analytics and deliver a more personalized patient experience. The solution uses AWS services such as AWS HealthLake, Amazon Redshift, Amazon Kinesis Data Streams, and AWS Lake Formation to build a 360 view of patients. These services enable you to collect and analyze data in near real time and put a comprehensive data governance framework in place that uses granular access control to secure sensitive data from unauthorized users.

Zero-ETL refers to a set of features on the AWS Cloud that enable integrating different data sources with Amazon Redshift:

Solution overview

Organizations in the healthcare industry are currently spending a significant amount of time and money on building complex ETL pipelines for data movement and integration. This means data will be replicated across multiple data stores via bespoke and in some cases hand-written ETL jobs, resulting in data inconsistency, latency, and potential security and privacy breaches.

With support for querying cross-account Apache Iceberg tables via Amazon Redshift, you can now build a more comprehensive patient-360 analysis by querying all patient data from one place. This means you can seamlessly combine information such as clinical data stored in HealthLake with data stored in operational databases such as a patient relationship management system, together with data produced from wearable devices in near real-time. Having access to all this data enables healthcare organizations to form a holistic view of patients, improve care coordination across multiple organizations, and provide highly personalized care for each individual.

The following diagram depicts the high-level solution we build to achieve these outcomes.

Deploy the solution

You can use the following AWS CloudFormation template to deploy the solution components:

This stack creates the following resources and necessary permissions to integrate the services:

AWS Solution setup

AWS HealthLake

AWS HealthLake enables organizations in the health industry to securely store, transform, transact, and analyze health data. It stores data in HL7 FHIR format, which is an interoperability standard designed for quick and efficient exchange of health data. When you create a HealthLake data store, a Fast Healthcare Interoperability Resources (FHIR) data repository is made available via a RESTful API endpoint. Simultaneously and as part of AWS HealthLake managed service, the nested JSON FHIR data undergoes an ETL process and is stored in Apache Iceberg open table format in Amazon S3.

To create an AWS HealthLake data store, refer to Getting started with AWS HealthLake. Make sure to select the option Preload sample data when creating your data store.

In real-world scenarios and when you use AWS HealthLake in production environments, you don’t need to load sample data into your AWS HealthLake data store. Instead, you can use FHIR REST API operations to manage and search resources in your AWS HealthLake data store.

We use two tables from the sample data stored in HealthLake: patient and allergyintolerance.

Query AWS HealthLake tables with Redshift Serverless

Amazon Redshift is the data warehousing service available on the AWS Cloud that provides up to six times better price-performance than any other cloud data warehouses in the market, with a fully managed, AI-powered, massively parallel processing (MPP) data warehouse built for performance, scale, and availability. With continuous innovations added to Amazon Redshift, it is now more than just a data warehouse. It enables organizations of different sizes and in different industries to access all the data they have in their AWS environments and analyze it from one single location with a set of features under the zero-ETL umbrella. Amazon Redshift integrates with AWS HealthLake and data lakes through Redshift Spectrum and Amazon S3 auto-copy features, enabling you to query data directly from files on Amazon S3.

Query AWS HealthLake data with Amazon Redshift

Amazon Redshift makes it straightforward to query the data stored in S3-based data lakes with automatic mounting of an AWS Glue Data Catalog in the Redshift query editor v2. This means you no longer have to create an external schema in Amazon Redshift to use the data lake tables cataloged in the Data Catalog. To get started with this feature, see Querying the AWS Glue Data Catalog. After it is set up and you’re connected to the Redshift query editor v2, complete the following steps:

  1. Validate that your tables are visible in the query editor V2. The Data Catalog objects are listed under the awsdatacatalog database.

FHIR data stored in AWS HealthLake is highly nested. To learn about how to un-nest semi-structured data with Amazon Redshift, see Tutorial: Querying nested data with Amazon Redshift Spectrum.

  1. Use the following query to un-nest the allergyintolerance and patient tables, join them together, and get patient details and their allergies:
    WITH patient_allergy AS 
    (
        SELECT
            resourcetype, 
            c AS allery_category,
            a."patient"."reference",
            SUBSTRING(a."patient"."reference", 9, LEN(a."patient"."reference")) AS patient_id,
            a.recordeddate AS allergy_record_date,
            NVL(cd."code", 'NA') AS allergy_code,
            NVL(cd.display, 'NA') AS allergy_description
    
        FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."allergyintolerance" a
                LEFT JOIN a.category c ON TRUE
                LEFT JOIN a.reaction r ON TRUE
                LEFT JOIN r.manifestation m ON TRUE
                LEFT JOIN m.coding cd ON TRUE
    ), patinet_info AS
    (
        SELECT id,
                gender,
                g as given_name,
                n.family as family_name,
                pr as prefix
    
        FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."patient" p
                LEFT JOIN p.name n ON TRUE
                LEFT JOIN n.given g ON TRUE
                LEFT JOIN n.prefix pr ON TRUE
    )
    SELECT DISTINCT p.id, 
            p.gender, 
            p.prefix,
            p.given_name,
            p.family_name,
            pa.allery_category,
            pa.allergy_code,
            pa.allergy_description
    from patient_allergy pa
        JOIN patinet_info p
            ON pa.patient_id = p.id
    ORDER BY p.id, pa.allergy_code
    ;
    

To eliminate the need for Amazon Redshift to un-nest data every time a query is run, you can create a materialized view to hold un-nested and flattened data. Materialized views are an effective mechanism to deal with complex and repeating queries. They contain a precomputed result set, based on a SQL query over one or more base tables. You can issue SELECT statements to query a materialized view, in the same way that you can query other tables or views in the database.

  1. Use the following SQL to create a materialized view. You use it later to build a complete view of patients:
    CREATE MATERIALIZED VIEW patient_allergy_info AUTO REFRESH YES AS
    WITH patient_allergy AS 
    (
        SELECT
            resourcetype, 
            c AS allery_category,
            a."patient"."reference",
            SUBSTRING(a."patient"."reference", 9, LEN(a."patient"."reference")) AS patient_id,
            a.recordeddate AS allergy_record_date,
            NVL(cd."code", 'NA') AS allergy_code,
            NVL(cd.display, 'NA') AS allergy_description
    
        FROM
            "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."allergyintolerance" a
                LEFT JOIN a.category c ON TRUE
                LEFT JOIN a.reaction r ON TRUE
                LEFT JOIN r.manifestation m ON TRUE
                LEFT JOIN m.coding cd ON TRUE
    ), patinet_info AS
    (
        SELECT id,
                gender,
                g as given_name,
                n.family as family_name,
                pr as prefix
    
        FROM "awsdatacatalog"."datastore_01_179674d36391d68926a8d74c12599306_healthlake_view"."patient" p
                LEFT JOIN p.name n ON TRUE
                LEFT JOIN n.given g ON TRUE
                LEFT JOIN n.prefix pr ON TRUE
    )
    SELECT DISTINCT p.id, 
            p.gender, 
            p.prefix,
            p.given_name,
            p.family_name,
            pa.allery_category,
            pa.allergy_code,
            pa.allergy_description
    from patient_allergy pa
        JOIN patinet_info p
            ON pa.patient_id = p.id
    ORDER BY p.id, pa.allergy_code
    ;
    

You have confirmed you can query data in AWS HealthLake via Amazon Redshift. Next, you set up zero-ETL integration between Amazon Redshift and Amazon Aurora MySQL.

Set up zero-ETL integration between Amazon Aurora MySQL and Redshift Serverless

Applications such as front-desk software, which are used to schedule appointments and register new patients, store data in OLTP databases such as Aurora. To get data out of OLTP databases and have them ready for analytics use cases, data teams might have to spend a considerable amount of time to build, test, and deploy ETL jobs that are complex to maintain and scale.

With the Amazon Redshift zero-ETL integration with Amazon Aurora MySQL, you can run analytics on the data stored in OLTP databases and combine them with the rest of the data in Amazon Redshift and AWS HealthLake in near real time. In the next steps in this section, we connect to a MySQL database and set up zero-ETL integration with Amazon Redshift.

Connect to an Aurora MySQL database and set up data

Connect to your Aurora MySQL database using your editor of choice using AdminUsername and AdminPassword that you entered when running the CloudFormation stack. (For simplicity, it is the same for Amazon Redshift and Aurora.)

When you’re connected to your database, complete the following steps:

  1. Create a new database by running the following command:
    CREATE DATABASE front_desk_app_db;

  2. Create a new table. This table simulates storing patient information as they visit clinics and other healthcare centers. For simplicity and to demonstrate specific capabilities, we assume that patient IDs are the same in AWS HealthLake and the front-of-office application. In real-world scenarios, this can be a hashed version of a national health care number:
    CREATE TABLE patient_appointment ( 
          patient_id varchar(250), 
          gender varchar(1), 
          date_of_birth date, 
          appointment_datetime datetime, 
          phone_number varchar(15), 
          PRIMARY KEY (patient_id, appointment_datetime) 
    );

Having a primary key in the table is mandatory for zero-ETL integration to work.

  1. Insert new records into the source table in the Aurora MySQL database. To demonstrate the required functionalities, make sure the patient_id of the sample records inserted into the MySQL database match the ones in AWS HealthLake. Replace [patient_id_1] and [patient_id_2] in the following query with the ones from the Redshift query you ran previously (the query that joined allergyintolerance and patient):
    INSERT INTO front_desk_app_db.patient_appointment (patient_id, gender, date_of_birth, appointment_datetime, phone_number)
    
    VALUES([PATIENT_ID_1], 'F', '1988-7-04', '2023-12-19 10:15:00', '0401401401'),
    ([PATIENT_ID_1], 'F', '1988-7-04', '2023-09-19 11:00:00', '0401401401'),
    ([PATIENT_ID_1], 'F', '1988-7-04', '2023-06-06 14:30:00', '0401401401'),
    ([PATIENT_ID_2], 'F', '1972-11-14', '2023-12-19 08:15:00', '0401401402'),
    ([PATIENT_ID_2], 'F', '1972-11-14', '2023-01-09 12:15:00', '0401401402');

Now that your source table is populated with sample records, you can set up zero-ETL and have data ingested into Amazon Redshift.

Set up zero-ETL integration between Amazon Aurora MySQL and Amazon Redshift

Complete the following steps to create your zero-ETL integration:

  1. On the Amazon RDS console, choose Databases in the navigation pane.
  2. Choose the DB identifier of your cluster (not the instance).
  3. On the Zero-ETL Integration tab, choose Create zero-ETL integration.
  4. Follow the steps to create your integration.

Create a Redshift database from the integration

Next, you create a target database from the integration. You can do this by running a couple of simple SQL commands on Amazon Redshift. Log in to the query editor V2 and run the following commands:

  1. Get the integration ID of the zero-ETL you set up between your source database and Amazon Redshift:
    SELECT * FROM svv_integration;

  2. Create a database using the integration ID:
    CREATE DATABASE ztl_demo FROM INTEGRATION '[INTEGRATION_ID ';

  3. Query the database and validate that a new table is created and populated with data from your source MySQL database:
    SELECT * FROM ztl_demo.front_desk_app_db.patient_appointment;

It might take a few seconds for the first set of records to appear in Amazon Redshift.

This shows that the integration is working as expected. To validate it further, you can insert a new record in your Aurora MySQL database, and it will be available in Amazon Redshift for querying in near real time within a few seconds.

Set up streaming ingestion for Amazon Redshift

Another aspect of zero-ETL on AWS, for real-time and streaming data, is realized through Amazon Redshift Streaming Ingestion. It provides low-latency, high-speed ingestion of streaming data from Kinesis Data Streams and Amazon MSK. It lowers the effort required to have data ready for analytics workloads, lowers the cost of running such workloads on the cloud, and decreases the operational burden of maintaining the solution.

In the context of healthcare, understanding an individual’s exercise and movement patterns can help with overall health assessment and better treatment planning. In this section, you send simulated data from wearable devices to Kinesis Data Streams and integrate it with the rest of the data you already have access to from your Redshift Serverless data warehouse.

For step-by-step instructions, refer to Real-time analytics with Amazon Redshift streaming ingestion. Note the following steps when you set up streaming ingestion for Amazon Redshift:

  1. Select wearables_stream and use the following template when sending data to Amazon Kinesis Data Streams via Kinesis Data Generator, to simulate data generated by wearable devices. Replace [PATIENT_ID_1] and [PATIENT_ID_2] with the patient IDs you earlier when inserting new records into your Aurora MySQL table:
    {
       "patient_id": "{{random.arrayElement(["[PATIENT_ID_1]"," [PATIENT_ID_2]"])}}",
       "steps_increment": "{{random.arrayElement(
          [0,1]
       )}}",
       "heart_rate": {{random.number( 
          {
             "min":45,
             "max":120}
       )}}
    }

  2. Create an external schema called from_kds by running the following query and replacing [IAM_ROLE_ARN] with the ARN of the role created by the CloudFormation stack (Patient360BlogRole):
    CREATE EXTERNAL SCHEMA from_kds
    FROM KINESIS
    IAM_ROLE '[IAM_ROLE_ARN]';

  3. Use the following SQL when creating a materialized view to consume data from the stream:
    CREATE MATERIALIZED VIEW patient_wearable_data AUTO REFRESH YES AS 
    SELECT approximate_arrival_timestamp, 
          JSON_PARSE(kinesis_data) as Data FROM from_kds."wearables_stream" 
    WHERE CAN_JSON_PARSE(kinesis_data);

  4. To validate that streaming ingestion works as expected, refresh the materialized view to get the data you already sent to the data stream and query the table to make sure data has landed in Amazon Redshift:
    REFRESH MATERIALIZED VIEW patient_wearable_data;
    
    SELECT *
    FROM patient_wearable_data
    ORDER BY approximate_arrival_timestamp DESC;

Query and analyze patient wearable data

The results in the data column of the preceding query are in JSON format. Amazon Redshift makes it straightforward to work with semi-structured data in JSON format. It uses PartiQL language to offer SQL-compatible access to relational, semi-structured, and nested data. Use the following query to flatten data:

SELECT data."patient_id"::varchar AS patient_id,       
      data."steps_increment"::integer as steps_increment,       
      data."heart_rate"::integer as heart_rate, 
      approximate_arrival_timestamp 
FROM patient_wearable_data 
ORDER BY approximate_arrival_timestamp DESC;

The result looks like the following screenshot.

Now that you know how to flatten JSON data, you can analyze it further. Use the following query to get the number of minutes a patient has been physically active per day, based on their heart rate (greater than 80):

WITH patient_wearble_flattened AS
(
   SELECT data."patient_id"::varchar AS patient_id,
      data."steps_increment"::integer as steps_increment,
      data."heart_rate"::integer as heart_rate,
      approximate_arrival_timestamp,
      DATE(approximate_arrival_timestamp) AS date_received,
      extract(hour from approximate_arrival_timestamp) AS    hour_received,
      extract(minute from approximate_arrival_timestamp) AS minute_received
   FROM patient_wearable_data
), patient_active_minutes AS
(
   SELECT patient_id,
      date_received,
      hour_received,
      minute_received,
      avg(heart_rate) AS heart_rate
   FROM patient_wearble_flattened
   GROUP BY patient_id,
      date_received,
      hour_received,
      minute_received
   HAVING avg(heart_rate) > 80
)
SELECT patient_id,
      date_received,
      COUNT(heart_rate) AS active_minutes_count
FROM patient_active_minutes
GROUP BY patient_id,
      date_received
ORDER BY patient_id,
      date_received;

Create a complete patient 360

Now that you are able to query all patient data with Redshift Serverless, you can combine the three datasets you used in this post and form a comprehensive patient 360 view with the following query:

WITH patient_appointment_info AS
(
      SELECT "patient_id",
         "gender",
         "date_of_birth",
         "appointment_datetime",
         "phone_number"
      FROM ztl_demo.front_desk_app_db.patient_appointment
),
patient_wearble_flattened AS
(
      SELECT data."patient_id"::varchar AS patient_id,
         data."steps_increment"::integer as steps_increment,
         data."heart_rate"::integer as heart_rate,
         approximate_arrival_timestamp,
         DATE(approximate_arrival_timestamp) AS date_received,
         extract(hour from approximate_arrival_timestamp) AS hour_received,
         extract(minute from approximate_arrival_timestamp) AS minute_received
      FROM patient_wearable_data
), patient_active_minutes AS
(
      SELECT patient_id,
         date_received,
         hour_received,
         minute_received,
         avg(heart_rate) AS heart_rate
      FROM patient_wearble_flattened
      GROUP BY patient_id,
         date_received,
         hour_received,
         minute_received
         HAVING avg(heart_rate) > 80
), patient_active_minutes_count AS
(
      SELECT patient_id,
         date_received,
         COUNT(heart_rate) AS active_minutes_count
      FROM patient_active_minutes
      GROUP BY patient_id,
         date_received
)
SELECT pai.patient_id,
      pai.gender,
      pai.prefix,
      pai.given_name,
      pai.family_name,
      pai.allery_category,
      pai.allergy_code,
      pai.allergy_description,
      ppi.date_of_birth,
      ppi.appointment_datetime,
      ppi.phone_number,
      pamc.date_received,
      pamc.active_minutes_count
FROM patient_allergy_info pai
      LEFT JOIN patient_active_minutes_count pamc
            ON pai.patient_id = pamc.patient_id
      LEFT JOIN patient_appointment_info ppi
            ON pai.patient_id = ppi.patient_id
GROUP BY pai.patient_id,
      pai.gender,
      pai.prefix,
      pai.given_name,
      pai.family_name,
      pai.allery_category,
      pai.allergy_code,
      pai.allergy_description,
      ppi.date_of_birth,
      ppi.appointment_datetime,
      ppi.phone_number,
      pamc.date_received,
      pamc.active_minutes_count
ORDER BY pai.patient_id,
      pai.gender,
      pai.prefix,
      pai.given_name,
      pai.family_name,
      pai.allery_category,
      pai.allergy_code,
      pai.allergy_description,
      ppi.date_of_birth DESC,
      ppi.appointment_datetime DESC,
      ppi.phone_number DESC,
      pamc.date_received,
      pamc.active_minutes_count

You can use the solution and queries used here to expand the datasets used in your analysis. For example, you can include other tables from AWS HealthLake as needed.

Clean up

To clean up resources you created, complete the following steps:

  1. Delete the zero-ETL integration between Amazon RDS and Amazon Redshift.
  2. Delete the CloudFormation stack.
  3. Delete AWS HealthLake data store

Conclusion

Forming a comprehensive 360 view of patients by integrating data from various different sources offers numerous benefits for organizations operating in the healthcare industry. It enables healthcare providers to gain a holistic understanding of a patient’s medical journey, enhances clinical decision-making, and allows for more accurate diagnosis and tailored treatment plans. With zero-ETL features for data integration on AWS, it is effortless to build a view of patients securely, cost-effectively, and with minimal effort.

You can then use visualization tools such as Amazon QuickSight to build dashboards or use Amazon Redshift ML to enable data analysts and database developers to train machine learning (ML) models with the data integrated through Amazon Redshift zero-ETL. The result is a set of ML models that are trained with a broader view into patients, their medical history, and their lifestyle, and therefore enable you make more accurate predictions about their upcoming health needs.


About the Authors

Saeed Barghi is a Sr. Analytics Specialist Solutions Architect specializing in architecting enterprise data platforms. He has extensive experience in the fields of data warehousing, data engineering, data lakes, and AI/ML. Based in Melbourne, Australia, Saeed works with public sector customers in Australia and New Zealand.

Satesh Sonti is a Sr. Analytics Specialist Solutions Architect based out of Atlanta, specialized in building enterprise data platforms, data warehousing, and analytics solutions. He has over 17 years of experience in building data assets and leading complex data platform programs for banking and insurance clients across the globe.

Create an end-to-end data strategy for Customer 360 on AWS

Post Syndicated from Ismail Makhlouf original https://aws.amazon.com/blogs/big-data/create-an-end-to-end-data-strategy-for-customer-360-on-aws/

Customer 360 (C360) provides a complete and unified view of a customer’s interactions and behavior across all touchpoints and channels. This view is used to identify patterns and trends in customer behavior, which can inform data-driven decisions to improve business outcomes. For example, you can use C360 to segment and create marketing campaigns that are more likely to resonate with specific groups of customers.

In 2022, AWS commissioned a study conducted by the American Productivity and Quality Center (APQC) to quantify the Business Value of Customer 360. The following figure shows some of the metrics derived from the study. Organizations using C360 achieved 43.9% reduction in sales cycle duration, 22.8% increase in customer lifetime value, 25.3% faster time to market, and 19.1% improvement in net promoter score (NPS) rating.

Without C360, businesses face missed opportunities, inaccurate reports, and disjointed customer experiences, leading to customer churn. However, building a C360 solution can be complicated. A Gartner Marketing survey found only 14% of organizations have successfully implemented a C360 solution, due to lack of consensus on what a 360-degree view means, challenges with data quality, and lack of cross-functional governance structure for customer data.

In this post, we discuss how you can use purpose-built AWS services to create an end-to-end data strategy for C360 to unify and govern customer data that address these challenges. We structure it in five pillars that power C360: data collection, unification, analytics, activation, and data governance, along with a solution architecture that you can use for your implementation.

The five pillars of a mature C360

When you embark on creating a C360, you work with multiple use cases, types of customer data, and users and applications that require different tools. Building a C360 on the right datasets, adding new datasets over time while maintaining the quality of data, and keeping it secure needs an end-to-end data strategy for your customer data. You also need to provide tools that make it straightforward for your teams to build products that mature your C360.

We recommend building your data strategy around five pillars of C360, as shown in the following figure. This starts with basic data collection, unifying and linking data from various channels related to unique customers, and progresses towards basic to advanced analytics for decision-making, and personalized engagement through various channels. As you mature in each of these pillars, you progress towards responding to real-time customer signals.

The following diagram illustrates the functional architecture that combines the building blocks of a Customer Data Platform on AWS with additional components used to design an end-to-end C360 solution. This is aligned to the five pillars we discuss in this post.

Pillar 1: Data collection

As you start building your customer data platform, you have to collect data from various systems and touchpoints, such as your sales systems, customer support, web and social media, and data marketplaces. Think of the data collection pillar as a combination of ingestion, storage, and processing capabilities.

Data ingestion

You have to build ingestion pipelines based on factors like types of data sources (on-premises data stores, files, SaaS applications, third-party data), and flow of data (unbounded streams or batch data). AWS provides different services for building data ingestion pipelines:

  • AWS Glue is a serverless data integration service that ingests data in batches from on-premises databases and data stores in the cloud. It connects to more than 70 data sources and helps you build extract, transform, and load (ETL) pipelines without having to manage pipeline infrastructure. AWS Glue Data Quality checks for and alerts on poor data, making it straightforward to spot and fix issues before they harm your business.
  • Amazon AppFlow ingests data from software as a service (SaaS) applications like Google Analytics, Salesforce, SAP, and Marketo, giving you the flexibility to ingest data from more than 50 SaaS applications.
  • AWS Data Exchange makes it straightforward to find, subscribe to, and use third-party data for analytics. You can subscribe to data products that help enrich customer profiles, for example demographics data, advertising data, and financial markets data.
  • Amazon Kinesis ingests streaming events in real time from point-of-sales systems, clickstream data from mobile apps and websites, and social media data. You could also consider using Amazon Managed Streaming for Apache Kafka (Amazon MSK) for streaming events in real time.

The following diagram illustrates the different pipelines to ingest data from various source systems using AWS services.

Data storage

Structured, semi-structured, or unstructured batch data is stored in an object storage because these are cost-efficient and durable. Amazon Simple Storage Service (Amazon S3) is a managed storage service with archiving features that can store petabytes of data with eleven 9’s of durability. Streaming data with low latency needs is stored in Amazon Kinesis Data Streams for real-time consumption. This allows immediate analytics and actions for various downstream consumers—as seen with Riot Games’ central Riot Event Bus.

Data processing

Raw data is often cluttered with duplicates and irregular formats. You need to process this to make it ready for analysis. If you are consuming batch data and streaming data, consider using a framework that can handle both. A pattern such as the Kappa architecture views everything as a stream, simplifying the processing pipelines. Consider using Amazon Managed Service for Apache Flink to handle the processing work. With Managed Service for Apache Flink, you can clean and transform the streaming data and direct it to the appropriate destination based on latency requirements. You can also implement batch data processing using Amazon EMR on open source frameworks such as Apache Spark at 3.5 times better performance than the self-managed version. The architecture decision of using a batch or streaming processing system will depend on various factors; however, if you want to enable real-time analytics on your customer data, we recommend using a Kappa architecture pattern.

Pillar 2: Unification

To link the diverse data arriving from various touchpoints to a unique customer, you need to build an identity processing solution that identifies anonymous logins, stores useful customer information, links them to external data for better insights, and groups customers in domains of interest. Although the identity processing solution helps build the unified customer profile, we recommend considering this as part of your data processing capabilities. The following diagram illustrates the components of such a solution.

The key components are as follows:

  • Identity resolution – Identity resolution is a deduplication solution, where records are matched to identify a unique customer and prospects by linking multiple identifiers such as cookies, device identifiers, IP addresses, email IDs, and internal enterprise IDs to a known person or anonymous profile using privacy-compliant methods. This can be achieved using AWS Entity Resolution, which enables using rules and machine learning (ML) techniques to match records and resolve identities. Alternatively, you can build identity graphs using Amazon Neptune for a single unified view of your customers.
  • Profile aggregation – When you’ve uniquely identified a customer, you can build applications in Managed Service for Apache Flink to consolidate all their metadata, from name to interaction history. Then, you transform this data into a concise format. Instead of showing every transaction detail, you can offer an aggregated spend value and a link to their Customer Relationship Management (CRM) record. For customer service interactions, provide an average CSAT score and a link to the call center system for a deeper dive into their communication history.
  • Profile enrichment – After you resolve a customer to a single identity, enhance their profile using various data sources. Enrichment typically involves adding demographic, behavioral, and geolocation data. You can use third-party data products from AWS Marketplace delivered through AWS Data Exchange to gain insights on income, consumption patterns, credit risk scores, and many more dimensions to further refine the customer experience.
  • Customer segmentation – After uniquely identifying and enriching a customer’s profile, you can segment them based on demographics like age, spend, income, and location using applications in Managed Service for Apache Flink. As you advance, you can incorporate AI services for more precise targeting techniques.

After you have done the identity processing and segmentation, you need a storage capability to store the unique customer profile and provide search and query capabilities on top of it for downstream consumers to use the enriched customer data.

The following diagram illustrates the unification pillar for a unified customer profile and single view of the customer for downstream applications.

Unified customer profile

Graph databases excel in modeling customer interactions and relationships, offering a comprehensive view of the customer journey. If you are dealing with billions of profiles and interactions, you can consider using Neptune, a managed graph database service on AWS. Organizations such as Zeta and Activision have successfully used Neptune to store and query billions of unique identifiers per month and millions of queries per second at millisecond response time.

Single customer view

Although graph databases provide in-depth insights, yet they can be complex for regular applications. It is prudent to consolidate this data into a single customer view, serving as a primary reference for downstream applications, ranging from ecommerce platforms to CRM systems. This consolidated view acts as a liaison between the data platform and customer-centric applications. For such purposes, we recommend using Amazon DynamoDB for its adaptability, scalability, and performance, resulting in an up-to-date and efficient customer database. This database will accept a lot of write queries back from the activation systems that learn new information about the customers and feed them back.

Pillar 3: Analytics

The analytics pillar defines capabilities that help you generate insights on top of your customer data. Your analytics strategy applies to the wider organizational needs, not just C360. You can use the same capabilities to serve financial reporting, measure operational performance, or even monetize data assets. Strategize based on how your teams explore data, run analyses, wrangle data for downstream requirements, and visualize data at different levels. Plan on how you can enable your teams to use ML to move from descriptive to prescriptive analytics.

The AWS modern data architecture shows a way to build a purpose-built, secure, and scalable data platform in the cloud. Learn from this to build querying capabilities across your data lake and the data warehouse.

The following diagram breaks down the analytics capability into data exploration, visualization, data warehousing, and data collaboration. Let’s find out what role each of these components play in the context of C360.

Data exploration

Data exploration helps unearth inconsistencies, outliers, or errors. By spotting these early on, your teams can have cleaner data integration for C360, which in turn leads to more accurate analytics and predictions. Consider the personas exploring the data, their technical skills, and the time to insight. For instance, data analysts who know to write SQL can directly query the data residing in Amazon S3 using Amazon Athena. Users interested in visual exploration can do so using AWS Glue DataBrew. Data scientists or engineers can use Amazon EMR Studio or Amazon SageMaker Studio to explore data from the notebook, and for a low-code experience, you can use Amazon SageMaker Data Wrangler. Because these services directly query S3 buckets, you can explore the data as it lands in the data lake, reducing time to insight.

Visualization

Turning complex datasets into intuitive visuals unravels hidden patterns in the data, and is crucial for C360 use cases. With this capability, you can design reports for different levels catering to varying needs: executive reports offering strategic overviews, management reports highlighting operational metrics, and detailed reports diving into the specifics. Such visual clarity helps your organization make informed decisions across all tiers, centralizing the customer’s perspective.

The following diagram shows a sample C360 dashboard built on Amazon QuickSight. QuickSight offers scalable, serverless visualization capabilities. You can benefit from its ML integrations for automated insights like forecasting and anomaly detection or natural language querying with Amazon Q in QuickSight, direct data connectivity from various sources, and pay-per-session pricing. With QuickSight, you can embed dashboards to external websites and applications, and the SPICE engine enables rapid, interactive data visualization at scale. The following screenshot shows an example C360 dashboard built on QuickSight.

Data warehouse

Data warehouses are efficient in consolidating structured data from multifarious sources and serving analytics queries from a large number of concurrent users. Data warehouses can provide a unified, consistent view of a vast amount of customer data for C360 use cases. Amazon Redshift addresses this need by adeptly handling large volumes of data and diverse workloads. It provides strong consistency across datasets, allowing organizations to derive reliable, comprehensive insights about their customers, which is essential for informed decision-making. Amazon Redshift offers real-time insights and predictive analytics capabilities for analyzing data from terabytes to petabytes. With Amazon Redshift ML, you can embed ML on top of the data stored in the data warehouse with minimum development overhead. Amazon Redshift Serverless simplifies application building and makes it straightforward for companies to embed rich data analytics capabilities.

Data collaboration

You can securely collaborate and analyze collective datasets from your partners without sharing or copying one another’s underlying data using AWS Clean Rooms. You can bring together disparate data from across engagement channels and partner datasets to form a 360-degree view of your customers. AWS Clean Rooms can enhance C360 by enabling use cases like cross-channel marketing optimization, advanced customer segmentation, and privacy-compliant personalization. By safely merging datasets, it offers richer insights and robust data privacy, meeting business needs and regulatory standards.

Pillar 4: Activation

The value of data diminishes the older it gets, leading to higher opportunity costs over time. In a survey conducted by Intersystems, 75% of the organizations surveyed believe untimely data inhibited business opportunities. In another survey, 58% of organizations (out of 560 respondents of HBR Advisory council and readers) stated they saw an increase in customer retention and loyalty using real-time customer analytics.

You can achieve a maturity in C360 when you build the ability to act on all the insights acquired from the previous pillars we discussed in real time. For example, at this maturity level, you can act on customer sentiment based on the context you automatically derived with an enriched customer profile and integrated channels. For this you need to implement prescriptive decision-making on how to address the customer’s sentiment. To do this at scale, you have to use AI/ML services for decision-making. The following diagram illustrates the architecture to activate insights using ML for prescriptive analytics and AI services for targeting and segmentation.

Use ML for the decision-making engine

With ML, you can improve the overall customer experience—you can create predictive customer behavior models, design hyper-personalized offers, and target the right customer with the right incentive. You can build them using Amazon SageMaker, which features a suite of managed services mapped to the data science lifecycle, including data wrangling, model training, model hosting, model inference, model drift detection, and feature storage. SageMaker enables you to build and operationalize your ML models, infusing them back into your applications to produce the right insight to the right person at the right time.

Amazon Personalize supports contextual recommendations, through which you can improve the relevance of recommendations by generating them within a context—for instance, device type, location, or time of day. Your team can get started without any prior ML experience using APIs to build sophisticated personalization capabilities in a few clicks. For more information, see Customize your recommendations by promoting specific items using business rules with Amazon Personalize.

Activate channels across marketing, advertising, direct-to-consumer, and loyalty

Now that you know who your customers are and who to reach out to, you can build solutions to run targeting campaigns at scale. With Amazon Pinpoint, you can personalize and segment communications to engage customers across multiple channels. For example, you can use Amazon Pinpoint to build engaging customer experiences through various communication channels like email, SMS, push notifications, and in-app notifications.

Pillar 5: Data governance

Establishing the right governance that balances control and access gives users trust and confidence in data. Imagine offering promotions on products that a customer doesn’t need, or bombarding the wrong customers with notifications. Poor data quality can lead to such situations, and ultimately results in customer churn. You have to build processes that validate data quality and take corrective actions. AWS Glue Data Quality can help you build solutions that validate the quality of data at rest and in transit, based on predefined rules.

To set up a cross-functional governance structure for customer data, you need a capability for governing and sharing data across your organization. With Amazon DataZone, admins and data stewards can manage and govern access to data, and consumers such as data engineers, data scientists, product managers, analysts, and other business users can discover, use, and collaborate with that data to drive insights. It streamlines data access, letting you find and use customer data, promotes team collaboration with shared data assets, and provides personalized analytics either via a web app or API on a portal. AWS Lake Formation makes sure data is accessed securely, guaranteeing the right people see the right data for the right reasons, which is crucial for effective cross-functional governance in any organization. Business metadata is stored and managed by Amazon DataZone, which is underpinned by technical metadata and schema information, which is registered in the AWS Glue Data Catalog. This technical metadata is also used both by other governance services such as Lake Formation and Amazon DataZone, and analytics services such as Amazon Redshift, Athena, and AWS Glue.

Bringing it all together

Using the following diagram as a reference, you can create projects and teams for building and operating different capabilities. For example, you can have a data integration team focus on the data collection pillar—you can then align functional roles, like data architects and data engineers. You can build your analytics and data science practices to focus on the analytics and activation pillars, respectively. Then you can create a specialized team for customer identity processing and for building the unified view of the customer. You can establish a data governance team with data stewards from different functions, security admins, and data governance policymakers to design and automate policies.

Conclusion

Building a robust C360 capability is fundamental for your organization to gain insights into your customer base. AWS Databases, Analytics, and AI/ML services can help streamline this process, providing scalability and efficiency. Following the five pillars to guide your thinking, you can build an end-to-end data strategy that defines the C360 view across the organization, makes sure data is accurate, and establishes cross-functional governance for customer data. You can categorize and prioritize the products and features you have to build within each pillar, select the right tool for the job, and build the skills you need in your teams.

Visit AWS for Data Customer Stories to learn how AWS is transforming customer journeys, from the world’s largest enterprises to growing startups.


About the Authors

Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily works with organizations in retail, ecommerce, FinTech, HealthTech, and travel to achieve their business objectives with well architected data platforms.

Sandipan Bhaumik (Sandi) is a Senior Analytics Specialist Solutions Architect at AWS. He helps customers modernize their data platforms in the cloud to perform analytics securely at scale, reduce operational overhead, and optimize usage for cost-effectiveness and sustainability.

Enrich your customer data with geospatial insights using Amazon Redshift, AWS Data Exchange, and Amazon QuickSight

Post Syndicated from Tony Stricker original https://aws.amazon.com/blogs/big-data/enrich-your-customer-data-with-geospatial-insights-using-amazon-redshift-aws-data-exchange-and-amazon-quicksight/

It always pays to know more about your customers, and AWS Data Exchange makes it straightforward to use publicly available census data to enrich your customer dataset.

The United States Census Bureau conducts the US census every 10 years and gathers household survey data. This data is anonymized, aggregated, and made available for public use. The smallest geographic area for which the Census Bureau collects and aggregates data are census blocks, which are formed by streets, roads, railroads, streams and other bodies of water, other visible physical and cultural features, and the legal boundaries shown on Census Bureau maps.

If you know the census block in which a customer lives, you are able to make general inferences about their demographic characteristics. With these new attributes, you are able to build a segmentation model to identify distinct groups of customers that you can target with personalized messaging. This data is available to subscribe to on AWS Data Exchange—and with data sharing, you don’t need to pay to store a copy of it in your account in order to query it.

In this post, we show how to use customer addresses to enrich a dataset with additional demographic details from the US Census Bureau dataset.

Solution overview

The solution includes the following high-level steps:

  1. Set up an Amazon Redshift Serverless endpoint and load customer data.
  2. Set up a place index in Amazon Location Service.
  3. Write an AWS Lambda user-defined function (UDF) to call Location Service from Amazon Redshift.
  4. Subscribe to census data on AWS Data Exchange.
  5. Use geospatial queries to tag addresses to census blocks.
  6. Create a new customer dataset in Amazon Redshift.
  7. Evaluate new customer data in Amazon QuickSight.

The following diagram illustrates the solution architecture.

architecture diagram

Prerequisites

You can use the following AWS CloudFormation template to deploy the required infrastructure. Before deployment, you need to sign up for QuickSight access through the AWS Management Console.

Load generic address data to Amazon Redshift

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. Redshift Serverless makes it straightforward to run analytics workloads of any size without having to manage data warehouse infrastructure.

To load our address data, we first create a Redshift Serverless workgroup. Then we use Amazon Redshift Query Editor v2 to load customer data from Amazon Simple Storage Service (Amazon S3).

Create a Redshift Serverless workgroup

There are two primary components of the Redshift Serverless architecture:

  • Namespace – A collection of database objects and users. Namespaces group together all of the resources you use in Redshift Serverless, such as schemas, tables, users, datashares, and snapshots.
  • Workgroup – A collection of compute resources. Workgroups have network and security settings that you can configure using the Redshift Serverless console, the AWS Command Line Interface (AWS CLI), or the Redshift Serverless APIs.

To create your namespace and workgroup, refer to Creating a data warehouse with Amazon Redshift Serverless. For this exercise, name your workgroup sandbox and your namespace adx-demo.

Use Query Editor v2 to load customer data from Amazon S3

You can use Query Editor v2 to submit queries and load data to your data warehouse through a web interface. To configure Query Editor v2 for your AWS account, refer to Data load made easy and secure in Amazon Redshift using Query Editor V2. After it’s configured, complete the following steps:

  • Use the following SQL to create the customer_data schema within the dev database in your data warehouse:
CREATE SCHEMA customer_data;
  • Use the following SQL DDL to create your target table into which you’ll load your customer address data:
CREATE TABLE customer_data.customer_addresses (
    address character varying(256) ENCODE lzo,
    unitnumber character varying(256) ENCODE lzo,
    municipality character varying(256) ENCODE lzo,
    region character varying(256) ENCODE lzo,
    postalcode character varying(256) ENCODE lzo,
    country character varying(256) ENCODE lzo,
    customer_id integer ENCODE az64
) DISTSTYLE AUTO;

The file has no column headers and is pipe delimited (|). For information on how to load data from either Amazon S3 or your local desktop, refer to Loading data into a database.

Use Location Service to geocode and enrich address data

Location Service lets you add location data and functionality to applications, which includes capabilities such as maps, points of interest, geocoding, routing, geofences, and tracking.

Our data is in Amazon Redshift, so we need to access the Location Service APIs using SQL statements. Each row of data contains an address that we want to enrich and geotag using the Location Service APIs. Amazon Redshift allows developers to create UDFs using a SQL SELECT clause, Python, or Lambda.

Lambda is a compute service that lets you run code without provisioning or managing servers. With Lambda UDFs, you can write custom functions with complex logic and integrate with third-party components. Scalar Lambda UDFs return one result per invocation of the function—in this case, the Lambda function runs one time for each row of data it receives.

For this post, we write a Lambda function that uses the Location Service API to geotag and validate our customer addresses. Then we register this Lambda function as a UDF with our Redshift instance, allowing us to call the function from a SQL command.

For instructions to create a Location Service place index and create your Lambda function and scalar UDF, refer to Access Amazon Location Service from Amazon Redshift. For this post, we use ESRI as a provider and name the place index placeindex.redshift.

Test your new function with the following code, which returns the coordinates of the White House in Washington, DC:

select public.f_geocode_address('1600 Pennsylvania Ave.','Washington','DC','20500','USA');

Subscribe to demographic data from AWS Data Exchange

AWS Data Exchange is a data marketplace with more than 3,500 products from over 300 providers delivered—through files, APIs, or Amazon Redshift queries—directly to the data lakes, applications, analytics, and machine learning models that use it.

First, we need to give our Redshift namespace permission via AWS Identity and Access Management (IAM) to access subscriptions on AWS Data Exchange. Then we can subscribe to our sample demographic data. Complete the following steps:

  1. On the IAM console, add the AWSDataExchangeSubscriberFullAccess managed policy to your Amazon Redshift commands access role you assigned when creating the namespace.
  2. On the AWS Data Exchange console, navigate to the dataset ACS – Sociodemographics (USA, Census Block Groups, 2019), provided by CARTO.
  3. Choose Continue to subscribe, then choose Subscribe.

The subscription may take a few minutes to configure.

  1. When your subscription is in place, navigate back to the Redshift Serverless console.
  2. In the navigation pane, choose Datashares.
  3. On the Subscriptions tab, choose the datashare that you just subscribed to.
  4. On the datashare details page, choose Create database from datashare.
  5. Choose the namespace you created earlier and provide a name for the new database that will hold the shared objects from the dataset you subscribed to.

In Query Editor v2, you should see the new database you just created and two new tables: one that holds the block group polygons and another that holds the demographic information for each block group.

Query Editor v2 data source explorer

Join geocoded customer data to census data with geospatial queries

There are two primary types of spatial data: raster and vector data. Raster data is represented as a grid of pixels and is beyond the scope of this post. Vector data is comprised of vertices, edges, and polygons. With geospatial data, vertices are represented as latitude and longitude points and edges are the connections between pairs of vertices. Think of the road connecting two intersections on a map. A polygon is a set of vertices with a series of connecting edges that form a continuous shape. A simple rectangle is a polygon, just as the state border of Ohio can be represented as a polygon. The geography_usa_blockgroup_2019 dataset that you subscribed to has 220,134 rows, each representing a single census block group and its geographic shape.

Amazon Redshift supports the storage and querying of vector-based spatial data with the GEOMETRY and GEOGRAPHY data types. You can use Redshift SQL functions to perform queries such as a point in polygon operation to determine if a given latitude/longitude point falls within the boundaries of a given polygon (such as state or county boundary). In this dataset, you can observe that the geom column in geography_usa_blockgroup_2019 is of type GEOMETRY.

Our goal is to determine which census block (polygon) each of our geotagged addresses falls within so we can enrich our customer records with details that we know about the census block. Complete the following steps:

  • Build a new table with the geocoding results from our UDF:
CREATE TABLE customer_data.customer_addresses_geocoded AS 
select address
    ,unitnumber
    ,municipality
    ,region
    ,postalcode
    ,country
    ,customer_id
    ,public.f_geocode_address(address||' '||unitnumber,municipality,region,postalcode,country) as geocode_result
FROM customer_data.customer_addresses;
  • Use the following code to extract the different address fields and latitude/longitude coordinates from the JSON column and create a new table with the results:
CREATE TABLE customer_data.customer_addresses_points AS
SELECT customer_id
    ,geo_address
    address
    ,unitnumber
    ,municipality
    ,region
    ,postalcode
    ,country
    ,longitude
    ,latitude
    ,ST_SetSRID(ST_MakePoint(Longitude, Latitude),4326) as address_point
            --create new geom column of type POINT, set new point SRID = 4326
FROM
(
select customer_id
    ,address
    ,unitnumber
    ,municipality
    ,region
    ,postalcode
    ,country
    ,cast(json_extract_path_text(geocode_result, 'Label', true) as VARCHAR) as geo_address
    ,cast(json_extract_path_text(geocode_result, 'Longitude', true) as float) as longitude
    ,cast(json_extract_path_text(geocode_result, 'Latitude', true) as float) as latitude
        --use json function to extract fields from geocode_result
from customer_data.customer_addresses_geocoded) a;

This code uses the ST_POINT function to create a new column from the latitude/longitude coordinates called address_point of type GEOMETRY and subtype POINT.   It uses the ST_SetSRID geospatial function to set the spatial reference identifier (SRID) of the new column to 4326.

The SRID defines the spatial reference system to be used when evaluating the geometry data. It’s important when joining or comparing geospatial data that they have matching SRIDs. You can check the SRID of an existing geometry column by using the ST_SRID function. For more information on SRIDs and GEOMETRY data types, refer to Querying spatial data in Amazon Redshift.

  • Now that your customer addresses are geocoded as latitude/longitude points in a geometry column, you can use a join to identify which census block shape your new point falls within:
CREATE TABLE customer_data.customer_addresses_with_census AS
select c.*
    ,shapes.geoid as census_group_shape
    ,demo.*
from customer_data.customer_addresses_points c
inner join "carto_census_data"."carto".geography_usa_blockgroup_2019 shapes
on ST_Contains(shapes.geom, c.address_point)
    --join tables where the address point falls within the census block geometry
inner join carto_census_data.usa_acs.demographics_sociodemographics_usa_blockgroup_2019_yearly_2019 demo
on demo.geoid = shapes.geoid;

The preceding code creates a new table called customer_addresses_with_census, which joins the customer addresses to the census block in which they belong as well as the demographic data associated with that census block.

To do this, you used the ST_CONTAINS function, which accepts two geometry data types as an input and returns TRUE if the 2D projection of the first input geometry contains the second input geometry. In our case, we have census blocks represented as polygons and addresses represented as points. The join in the SQL statement succeeds when the point falls within the boundaries of the polygon.

Visualize the new demographic data with QuickSight

QuickSight is a cloud-scale business intelligence (BI) service that you can use to deliver easy-to-understand insights to the people who you work with, wherever they are. QuickSight connects to your data in the cloud and combines data from many different sources.

First, let’s build some new calculated fields that will help us better understand the demographics of our customer base. We can do this in QuickSight, or we can use SQL to build the columns in a Redshift view. The following is the code for a Redshift view:

CREATE VIEW customer_data.customer_features AS (
SELECT customer_id 
    ,postalcode
    ,region
    ,municipality
    ,geoid as census_geoid
    ,longitude
    ,latitude
    ,total_pop
    ,median_age
    ,white_pop/total_pop as perc_white
    ,black_pop/total_pop as perc_black
    ,asian_pop/total_pop as perc_asian
    ,hispanic_pop/total_pop as perc_hispanic
    ,amerindian_pop/total_pop as perc_amerindian
    ,median_income
    ,income_per_capita
    ,median_rent
    ,percent_income_spent_on_rent
    ,unemployed_pop/coalesce(pop_in_labor_force) as perc_unemployment
    ,(associates_degree + bachelors_degree + masters_degree + doctorate_degree)/total_pop as perc_college_ed
    ,(household_language_total - household_language_english)/coalesce(household_language_total) as perc_other_than_english
FROM "dev"."customer_data"."customer_addresses_with_census" t );

To get QuickSight to talk to our Redshift Serverless endpoint, complete the following steps:

Now you can create a new dataset in QuickSight.

  • On the QuickSight console, choose Datasets in the navigation pane.
  • Choose New dataset.

create a new dataset in quicksight

  • We want to create a dataset from a new data source and use the Redshift: Manual connect option.

Redshift manual connection

  • Provide the connection information for your Redshift Serverless workgroup.

You will need the endpoint for our workgroup and the user name and password that you created when you set up your workgroup. You can find your workgroup’s endpoint on the Redshift Serverless console by navigating to your workgroup configuration. The following screenshot is an example of the connection settings needed. Notice the connection type is the name of the VPC connection that you previously configured in QuickSight. When you copy the endpoint from the Redshift console, be sure to remove the database and port number from the end of the URL before entering it in the field.

Redshift edit data source

  • Save the new data source configuration.

You’ll be prompted to choose the table you want to use for your dataset.

  • Choose the new view that you created that has your new derived fields.

Quicksight choose your table

  • Select Directly query your data.

This will connect your visualizations directly to the data in the database rather than ingesting data into the QuickSight in-memory data store.

Directly query your data

  • To create a histogram of median income level, choose the blank visual on Sheet1 and then choose the histogram visual icon under Visual types.
  • Choose median_income under Fields list and drag it to the Value field well.

This builds a histogram showing the distribution of median_income for our customers based on the census block group in which they live.

QuickSight histogram

Conclusion

In this post, we demonstrated how companies can use open census data available on AWS Data Exchange to effortlessly gain a high-level understanding of their customer base from a demographic standpoint. This basic understanding of customers based on where they live can serve as the foundation for more targeted marketing campaigns and even influence product development and service offerings.

As always, AWS welcomes your feedback. Please leave your thoughts and questions in the comments section.


About the Author

Tony Stricker is a Principal Technologist on the Data Strategy team at AWS, where he helps senior executives adopt a data-driven mindset and align their people/process/technology in ways that foster innovation and drive towards specific, tangible business outcomes. He has a background as a data warehouse architect and data scientist and has delivered solutions in to production across multiple industries including oil and gas, financial services, public sector, and manufacturing. In his spare time, Tony likes to hang out with his dog and cat, work on home improvement projects, and restore vintage Airstream campers.

How the GoDaddy data platform achieved over 60% cost reduction and 50% performance boost by adopting Amazon EMR Serverless

Post Syndicated from Brandon Abear original https://aws.amazon.com/blogs/big-data/how-the-godaddy-data-platform-achieved-over-60-cost-reduction-and-50-performance-boost-by-adopting-amazon-emr-serverless/

This is a guest post co-written with Brandon Abear, Dinesh Sharma, John Bush, and Ozcan IIikhan from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

At GoDaddy, we take pride in being a data-driven company. Our relentless pursuit of valuable insights from data fuels our business decisions and ensures customer satisfaction. Our commitment to efficiency is unwavering, and we’ve undertaken an exciting initiative to optimize our batch processing jobs. In this journey, we have identified a structured approach that we refer to as the seven layers of improvement opportunities. This methodology has become our guide in the pursuit of efficiency.

In this post, we discuss how we enhanced operational efficiency with Amazon EMR Serverless. We share our benchmarking results and methodology, and insights into the cost-effectiveness of EMR Serverless vs. fixed capacity Amazon EMR on EC2 transient clusters on our data workflows orchestrated using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). We share our strategy for the adoption of EMR Serverless in areas where it excels. Our findings reveal significant benefits, including over 60% cost reduction, 50% faster Spark workloads, a remarkable five-times improvement in development and testing speed, and a significant reduction in our carbon footprint.

Background

In late 2020, GoDaddy’s data platform initiated its AWS Cloud journey, migrating an 800-node Hadoop cluster with 2.5 PB of data from its data center to EMR on EC2. This lift-and-shift approach facilitated a direct comparison between on-premises and cloud environments, ensuring a smooth transition to AWS pipelines, minimizing data validation issues and migration delays.

By early 2022, we successfully migrated our big data workloads to EMR on EC2. Using best practices learned from the AWS FinHack program, we fine-tuned resource-intensive jobs, converted Pig and Hive jobs to Spark, and reduced our batch workload spend by 22.75% in 2022. However, scalability challenges emerged due to the multitude of jobs. This prompted GoDaddy to embark on a systematic optimization journey, establishing a foundation for more sustainable and efficient big data processing.

Seven layers of improvement opportunities

In our quest for operational efficiency, we have identified seven distinct layers of opportunities for optimization within our batch processing jobs, as shown in the following figure. These layers range from precise code-level enhancements to more comprehensive platform improvements. This multi-layered approach has become our strategic blueprint in the ongoing pursuit of better performance and higher efficiency.

Seven layers of improvement opportunities

The layers are as follows:

  • Code optimization – Focuses on refining the code logic and how it can be optimized for better performance. This involves performance enhancements through selective caching, partition and projection pruning, join optimizations, and other job-specific tuning. Using AI coding solutions is also an integral part of this process.
  • Software updates – Updating to the latest versions of open source software (OSS) to capitalize on new features and improvements. For example, Adaptive Query Execution in Spark 3 brings significant performance and cost improvements.
  • Custom Spark configurations Tuning of custom Spark configurations to maximize resource utilization, memory, and parallelism. We can achieve significant improvements by right-sizing tasks, such as through spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes, spark.executor.cores, and spark.executor.memory. However, these custom configurations might be counterproductive if they are not compatible with the specific Spark version.
  • Resource provisioning time The time it takes to launch resources like ephemeral EMR clusters on Amazon Elastic Compute Cloud (Amazon EC2). Although some factors influencing this time are outside of an engineer’s control, identifying and addressing the factors that can be optimized can help reduce overall provisioning time.
  • Fine-grained scaling at task level Dynamically adjusting resources such as CPU, memory, disk, and network bandwidth based on each stage’s needs within a task. The aim here is to avoid fixed cluster sizes that could result in resource waste.
  • Fine-grained scaling across multiple tasks in a workflow Given that each task has unique resource requirements, maintaining a fixed resource size may result in under- or over-provisioning for certain tasks within the same workflow. Traditionally, the size of the largest task determines the cluster size for a multi-task workflow. However, dynamically adjusting resources across multiple tasks and steps within a workflow result in a more cost-effective implementation.
  • Platform-level enhancements – Enhancements at preceding layers can only optimize a given job or a workflow. Platform improvement aims to attain efficiency at the company level. We can achieve this through various means, such as updating or upgrading the core infrastructure, introducing new frameworks, allocating appropriate resources for each job profile, balancing service usage, optimizing the use of Savings Plans and Spot Instances, or implementing other comprehensive changes to boost efficiency across all tasks and workflows.

Layers 1–3: Previous cost reductions

After we migrated from on premises to AWS Cloud, we primarily focused our cost-optimization efforts on the first three layers shown in the diagram. By transitioning our most costly legacy Pig and Hive pipelines to Spark and optimizing Spark configurations for Amazon EMR, we achieved significant cost savings.

For example, a legacy Pig job took 10 hours to complete and ranked among the top 10 most expensive EMR jobs. Upon reviewing TEZ logs and cluster metrics, we discovered that the cluster was vastly over-provisioned for the data volume being processed and remained under-utilized for most of the runtime. Transitioning from Pig to Spark was more efficient. Although no automated tools were available for the conversion, manual optimizations were made, including:

  • Reduced unnecessary disk writes, saving serialization and deserialization time (Layer 1)
  • Replaced Airflow task parallelization with Spark, simplifying the Airflow DAG (Layer 1)
  • Eliminated redundant Spark transformations (Layer 1)
  • Upgraded from Spark 2 to 3, using Adaptive Query Execution (Layer 2)
  • Addressed skewed joins and optimized smaller dimension tables (Layer 3)

As a result, job cost decreased by 95%, and job completion time was reduced to 1 hour. However, this approach was labor-intensive and not scalable for numerous jobs.

Layers 4–6: Find and adopt the right compute solution

In late 2022, following our significant accomplishments in optimization at the previous levels, our attention moved towards enhancing the remaining layers.

Understanding the state of our batch processing

We use Amazon MWAA to orchestrate our data workflows in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. In this post, the terms workflow and job are used interchangeably, referring to the Directed Acyclic Graphs (DAGs) consisting of tasks orchestrated by Amazon MWAA. For each workflow, we have sequential or parallel tasks, and even a combination of both in the DAG between create_emr and terminate_emr tasks running on a transient EMR cluster with fixed compute capacity throughout the workflow run. Even after optimizing a portion of our workload, we still had numerous non-optimized workflows that were under-utilized due to over-provisioning of compute resources based on the most resource-intensive task in the workflow, as shown in the following figure.

This highlighted the impracticality of static resource allocation and led us to recognize the necessity of a dynamic resource allocation (DRA) system. Before proposing a solution, we gathered extensive data to thoroughly understand our batch processing. Analyzing the cluster step time, excluding provisioning and idle time, revealed significant insights: a right-skewed distribution with over half of the workflows completing in 20 minutes or less and only 10% taking more than 60 minutes. This distribution guided our choice of a fast-provisioning compute solution, dramatically reducing workflow runtimes. The following diagram illustrates step times (excluding provisioning and idle time) of EMR on EC2 transient clusters in one of our batch processing accounts.

Furthermore, based on the step time (excluding provisioning and idle time) distribution of the workflows, we categorized our workflows into three groups:

  • Quick run – Lasting 20 minutes or less
  • Medium run – Lasting between 20–60 minutes
  • Long run – Exceeding 60 minutes, often spanning several hours or more

Another factor we needed to consider was the extensive use of transient clusters for reasons such as security, job and cost isolation, and purpose-built clusters. Additionally, there was a significant variation in resource needs between peak hours and periods of low utilization.

Instead of fixed-size clusters, we could potentially use managed scaling on EMR on EC2 to achieve some cost benefits. However, migrating to EMR Serverless appears to be a more strategic direction for our data platform. In addition to potential cost benefits, EMR Serverless offers additional advantages such as a one-click upgrade to the newest Amazon EMR versions, a simplified operational and debugging experience, and automatic upgrades to the latest generations upon rollout. These features collectively simplify the process of operating a platform on a larger scale.

Evaluating EMR Serverless: A case study at GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

Recognizing the potential of EMR Serverless, we conducted an in-depth benchmark study using real production workflows. The study aimed to assess EMR Serverless performance and efficiency while also creating an adoption plan for large-scale implementation. The findings were highly encouraging, showing EMR Serverless can effectively handle our workloads.

Benchmarking methodology

We split our data workflows into three categories based on total step time (excluding provisioning and idle time): quick run (0–20 minutes), medium run (20–60 minutes), and long run (over 60 minutes). We analyzed the impact of the EMR deployment type (Amazon EC2 vs. EMR Serverless) on two key metrics: cost-efficiency and total runtime speedup, which served as our overall evaluation criteria. Although we did not formally measure ease of use and resiliency, these factors were considered throughout the evaluation process.

The high-level steps to assess the environment are as follows:

  1. Prepare the data and environment:
    1. Choose three to five random production jobs from each job category.
    2. Implement required adjustments to prevent interference with production.
  2. Run tests:
    1. Run scripts over several days or through multiple iterations to gather precise and consistent data points.
    2. Perform tests using EMR on EC2 and EMR Serverless.
  3. Validate data and test runs:
    1. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  4. Gather metrics and analyze results:
    1. Gather relevant metrics from the tests.
    2. Analyze results to draw insights and conclusions.

Benchmark results

Our benchmark results showed significant enhancements across all three job categories for both runtime speedup and cost-efficiency. The improvements were most pronounced for quick jobs, directly resulting from faster startup times. For instance, a 20-minute (including cluster provisioning and shut down) data workflow running on an EMR on EC2 transient cluster of fixed compute capacity finishes in 10 minutes on EMR Serverless, providing a shorter runtime with cost benefits. Overall, the shift to EMR Serverless delivered substantial performance improvements and cost reductions at scale across job brackets, as seen in the following figure.

Historically, we devoted more time to tuning our long-run workflows. Interestingly, we discovered that the existing custom Spark configurations for these jobs did not always translate well to EMR Serverless. In cases where the results were insignificant, a common approach was to discard previous Spark configurations related to executor cores. By allowing EMR Serverless to autonomously manage these Spark configurations, we often observed improved outcomes. The following graph shows the average runtime and cost improvement per job when comparing EMR Serverless to EMR on EC2.

Per Job Improvement

The following table shows a sample comparison of results for the same workflow running on different deployment options of Amazon EMR (EMR on EC2 and EMR Serverless).

Metric EMR on EC2
(Average)
EMR Serverless
(Average)
EMR on EC2 vs
EMR Serverless
Total Run Cost ($) $ 5.82 $ 2.60 55%
Total Run Time (Minutes) 53.40 39.40 26%
Provisioning Time (Minutes) 10.20 0.05 .
Provisioning Cost ($) $ 1.19 . .
Steps Time (Minutes) 38.20 39.16 -3%
Steps Cost ($) $ 4.30 . .
Idle Time (Minutes) 4.80 . .
EMR Release Label emr-6.9.0 .
Hadoop Distribution Amazon 3.3.3 .
Spark Version Spark 3.3.0 .
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3 .
Job Type Spark .

AWS Graviton2 on EMR Serverless performance evaluation

After seeing compelling results with EMR Serverless for our workloads, we decided to further analyze the performance of the AWS Graviton2 (arm64) architecture within EMR Serverless. AWS had benchmarked Spark workloads on Graviton2 EMR Serverless using the TPC-DS 3TB scale, showing a 27% overall price-performance improvement.

To better understand the integration benefits, we ran our own study using GoDaddy’s production workloads on a daily schedule and observed an impressive 23.8% price-performance enhancement across a range of jobs when using Graviton2. For more details about this study, see GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless.

Adoption strategy for EMR Serverless

We strategically implemented a phased rollout of EMR Serverless via deployment rings, enabling systematic integration. This gradual approach let us validate improvements and halt further adoption of EMR Serverless, if needed. It served both as a safety net to catch issues early and a means to refine our infrastructure. The process mitigated change impact through smooth operations while building team expertise of our Data Engineering and DevOps teams. Additionally, it fostered tight feedback loops, allowing prompt adjustments and ensuring efficient EMR Serverless integration.

We divided our workflows into three main adoption groups, as shown in the following image:

  • Canaries This group aids in detecting and resolving any potential problems early in the deployment stage.
  • Early adopters This is the second batch of workflows that adopt the new compute solution after initial issues have been identified and rectified by the canaries group.
  • Broad deployment rings The largest group of rings, this group represents the wide-scale deployment of the solution. These are deployed after successful testing and implementation in the previous two groups.

Rings

We further broke down these workflows into granular deployment rings to adopt EMR Serverless, as shown in the following table.

Ring # Name Details
Ring 0 Canary Low adoption risk jobs that are expected to yield some cost saving benefits.
Ring 1 Early Adopters Low risk Quick-run Spark jobs that expect to yield high gains.
Ring 2 Quick-run Rest of the Quick-run (step_time <= 20 min) Spark jobs
Ring 3 LargerJobs_EZ High potential gain, easy move, medium-run and long-run Spark jobs
Ring 4 LargerJobs Rest of the medium-run and long-run Spark jobs with potential gains
Ring 5 Hive Hive jobs with potentially higher cost savings
Ring 6 Redshift_EZ Easy migration Redshift jobs that suit EMR Serverless
Ring 7 Glue_EZ Easy migration Glue jobs that suit EMR Serverless

Production adoption results summary

The encouraging benchmarking and canary adoption results generated considerable interest in wider EMR Serverless adoption at GoDaddy. To date, the EMR Serverless rollout remains underway. Thus far, it has reduced costs by 62.5% and accelerated total batch workflow completion by 50.4%.

Based on preliminary benchmarks, our team expected substantial gains for quick jobs. To our surprise, actual production deployments surpassed projections, averaging 64.4% faster vs. 42% projected, and 71.8% cheaper vs. 40% predicted.

Remarkably, long-running jobs also saw significant performance improvements due to the rapid provisioning of EMR Serverless and aggressive scaling enabled by dynamic resource allocation. We observed substantial parallelization during high-resource segments, resulting in a 40.5% faster total runtime compared to traditional approaches. The following chart illustrates the average enhancements per job category.

Prod Jobs Savings

Additionally, we observed the highest degree of dispersion for speed improvements within the long-run job category, as shown in the following box-and-whisker plot.

Whisker Plot

Sample workflows adopted EMR Serverless

For a large workflow migrated to EMR Serverless, comparing 3-week averages pre- and post-migration revealed impressive cost savings—a 75.30% decrease based on retail pricing with 10% improvement in total runtime, boosting operational efficiency. The following graph illustrates the cost trend.

Although quick-run jobs realized minimal per-dollar cost reductions, they delivered the most significant percentage cost savings. With thousands of these workflows running daily, the accumulated savings are substantial. The following graph shows the cost trend for a small workload migrated from EMR on EC2 to EMR Serverless. Comparing 3-week pre- and post-migration averages revealed a remarkable 92.43% cost savings on the retail on-demand pricing, alongside an 80.6% acceleration in total runtime.

Sample workflows adopted EMR Serverless 2

Layer 7: Platform-wide improvements

We aim to revolutionize compute operations at GoDaddy, providing simplified yet powerful solutions for all users with our Intelligent Compute Platform. With AWS compute solutions like EMR Serverless and EMR on EC2, it provided optimized runs of data processing and machine learning (ML) workloads. An ML-powered job broker intelligently determines when and how to run jobs based on various parameters, while still allowing power users to customize. Additionally, an ML-powered compute resource manager pre-provisions resources based on load and historical data, providing efficient, fast provisioning at optimum cost. Intelligent compute empowers users with out-of-the-box optimization, catering to diverse personas without compromising power users.

The following diagram shows a high-level illustration of the intelligent compute architecture.

Insights and recommended best-practices

The following section discusses the insights we’ve gathered and the recommended best practices we’ve developed during our preliminary and wider adoption stages.

Infrastructure preparation

Although EMR Serverless is a deployment method within EMR, it requires some infrastructure preparedness to optimize its potential. Consider the following requirements and practical guidance on implementation:

  • Use large subnets across multiple Availability Zones – When running EMR Serverless workloads within your VPC, make sure the subnets span across multiple Availability Zones and are not constrained by IP addresses. Refer to Configuring VPC access and Best practices for subnet planning for details.
  • Modify maximum concurrent vCPU quota For extensive compute requirements, it is recommended to increase your max concurrent vCPUs per account service quota.
  • Amazon MWAA version compatibility When adopting EMR Serverless, GoDaddy’s decentralized Amazon MWAA ecosystem for data pipeline orchestration created compatibility issues from disparate AWS Providers versions. Directly upgrading Amazon MWAA was more efficient than updating numerous DAGs. We facilitated adoption by upgrading Amazon MWAA instances ourselves, documenting issues, and sharing findings and effort estimates for accurate upgrade planning.
  • GoDaddy EMR operator To streamline migrating numerous Airflow DAGs from EMR on EC2 to EMR Serverless, we developed custom operators adapting existing interfaces. This allowed seamless transitions while retaining familiar tuning options. Data engineers could easily migrate pipelines with simple find-replace imports and immediately use EMR Serverless.

Unexpected behavior mitigation

The following are unexpected behaviors we ran into and what we did to mitigate them:

  • Spark DRA aggressive scaling For some jobs (8.33% of initial benchmarks, 13.6% of production), cost increased after migrating to EMR Serverless. This was due to Spark DRA excessively assigning new workers briefly, prioritizing performance over cost. To counteract this, we set maximum executor thresholds by adjusting spark.dynamicAllocation.maxExecutor, effectively limiting EMR Serverless scaling aggression. When migrating from EMR on EC2, we suggest observing the max core count in the Spark History UI to replicate similar compute limits in EMR Serverless, such as --conf spark.executor.cores and --conf spark.dynamicAllocation.maxExecutors.
  • Managing disk space for large-scale jobs When transitioning jobs that process large data volumes with substantial shuffles and significant disk requirements to EMR Serverless, we recommend configuring spark.emr-serverless.executor.disk by referring to existing Spark job metrics. Furthermore, configurations like spark.executor.cores combined with spark.emr-serverless.executor.disk and spark.dynamicAllocation.maxExecutors allow control over the underlying worker size and total attached storage when advantageous. For example, a shuffle-heavy job with relatively low disk usage may benefit from using a larger worker to increase the likelihood of local shuffle fetches.

Conclusion

As discussed in this post, our experiences with adopting EMR Serverless on arm64 have been overwhelmingly positive. The impressive results we’ve achieved, including a 60% reduction in cost, 50% faster runs of batch Spark workloads, and an astounding five-times improvement in development and testing speed, speak volumes about the potential of this technology. Furthermore, our current results suggest that by widely adopting Graviton2 on EMR Serverless, we could potentially reduce the carbon footprint by up to 60% for our batch processing.

However, it’s crucial to understand that these results are not a one-size-fits-all scenario. The enhancements you can expect are subject to factors including, but not limited to, the specific nature of your workflows, cluster configurations, resource utilization levels, and fluctuations in computational capacity. Therefore, we strongly advocate for a data-driven, ring-based deployment strategy when considering the integration of EMR Serverless, which can help optimize its benefits to the fullest.

Special thanks to Mukul Sharma and Boris Berlin for their contributions to benchmarking. Many thanks to Travis Muhlestein (CDO), Abhijit Kundu (VP Eng), Vincent Yung (Sr. Director Eng.), and Wai Kin Lau (Sr. Director Data Eng.) for their continued support.


About the Authors

Brandon Abear is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He enjoys all things big data. In his spare time, he enjoys traveling, watching movies, and playing rhythm games.

Dinesh Sharma is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about user experience and developer productivity, always looking for ways to optimize engineering processes and saving cost. In his spare time, he loves reading and is an avid manga fan.

John Bush is a Principal Software Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about making it easier for organizations to manage data and use it to drive their businesses forward. In his spare time, he loves hiking, camping, and riding his ebike.

Ozcan Ilikhan is the Director of Engineering for the Data and ML Platform at GoDaddy. He has over two decades of multidisciplinary leadership experience, spanning startups to global enterprises. He has a passion for leveraging data and AI in creating solutions that delight customers, empower them to achieve more, and boost operational efficiency. Outside of his professional life, he enjoys reading, hiking, gardening, volunteering, and embarking on DIY projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in big data and analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Data governance in the age of generative AI

Post Syndicated from Krishna Rupanagunta original https://aws.amazon.com/blogs/big-data/data-governance-in-the-age-of-generative-ai/

Data is your generative AI differentiator, and a successful generative AI implementation depends on a robust data strategy incorporating a comprehensive data governance approach. Working with large language models (LLMs) for enterprise use cases requires the implementation of quality and privacy considerations to drive responsible AI. However, enterprise data generated from siloed sources combined with the lack of a data integration strategy creates challenges for provisioning the data for generative AI applications. The need for an end-to-end strategy for data management and data governance at every step of the journey—from ingesting, storing, and querying data to analyzing, visualizing, and running artificial intelligence (AI) and machine learning (ML) models—continues to be of paramount importance for enterprises.

In this post, we discuss the data governance needs of generative AI application data pipelines, a critical building block to govern data used by LLMs to improve the accuracy and relevance of their responses to user prompts in a safe, secure, and transparent manner. Enterprises are doing this by using proprietary data with approaches like Retrieval Augmented Generation (RAG), fine-tuning, and continued pre-training with foundation models.

Data governance is a critical building block across all these approaches, and we see two emerging areas of focus. First, many LLM use cases rely on enterprise knowledge that needs to be drawn from unstructured data such as documents, transcripts, and images, in addition to structured data from data warehouses. Unstructured data is typically stored across siloed systems in varying formats, and generally not managed or governed with the same level of rigor as structured data. Second, generative AI applications introduce a higher number of data interactions than conventional applications, which requires that the data security, privacy, and access control policies be implemented as part of the generative AI user workflows.

In this post, we cover data governance for building generative AI applications on AWS with a lens on structured and unstructured enterprise knowledge sources, and the role of data governance during the user request-response workflows.

Use case overview

Let’s explore an example of a customer support AI assistant. The following figure shows the typical conversational workflow that is initiated with a user prompt.

The workflow includes the following key data governance steps:

  1. Prompt user access control and security policies.
  2. Access policies to extract permissions based on relevant data and filter out results based on the prompt user role and permissions.
  3. Enforce data privacy policies such as personally identifiable information (PII) redactions.
  4. Enforce fine-grained access control.
  5. Grant the user role permissions for sensitive information and compliance policies.

To provide a response that includes the enterprise context, each user prompt needs to be augmented with a combination of insights from structured data from the data warehouse and unstructured data from the enterprise data lake. On the backend, the batch data engineering processes refreshing the enterprise data lake need to expand to ingest, transform, and manage unstructured data. As part of the transformation, the objects need to be treated to ensure data privacy (for example, PII redaction). Finally, access control policies also need to be extended to the unstructured data objects and to vector data stores.

Let’s look at how data governance can be applied to the enterprise knowledge source data pipelines and the user request-response workflows.

Enterprise knowledge: Data management

The following figure summarizes data governance considerations for data pipelines and the workflow for applying data governance.

Data governance steps in data pipelines

In the above figure, the data engineering pipelines include the following data governance steps:

  1. Create and update a catalog through data evolution.
  2. Implement data privacy policies.
  3. Implement data quality by data type and source.
  4. Link structured and unstructured datasets.
  5. Implement unified fine-grained access controls for structured and unstructured datasets.

Let’s look at some of the key changes in the data pipelines namely, data cataloging, data quality, and vector embedding security in more detail.

Data discoverability

Unlike structured data, which is managed in well-defined rows and columns, unstructured data is stored as objects. For users to be able to discover and comprehend the data, the first step is to build a comprehensive catalog using the metadata that is generated and captured in the source systems. This starts with the objects (such as documents and transcript files) being ingested from the relevant source systems into the raw zone in the data lake in Amazon Simple Storage Service (Amazon S3) in their respective native formats (as illustrated in the preceding figure). From here, object metadata (such as file owner, creation date, and confidentiality level) is extracted and queried using Amazon S3 capabilities. Metadata can vary by data source, and it’s important to examine the fields and, where required, derive the necessary fields to complete all the necessary metadata. For instance, if an attribute like content confidentiality is not tagged at a document level in the source application, this may need to be derived as part of the metadata extraction process and added as an attribute in the data catalog. The ingestion process needs to capture object updates (changes, deletions) in addition to new objects on an ongoing basis. For detailed implementation guidance, refer to Unstructured data management and governance using AWS AI/ML and analytics services. To further simplify the discovery and introspection between business glossaries and technical data catalogs, you can use Amazon DataZone for business users to discover and share data stored across data silos.

Data privacy

Enterprise knowledge sources often contain PII and other sensitive data (such as addresses and Social Security numbers). Based on your data privacy policies, these elements need to be treated (masked, tokenized, or redacted) from the sources before they can be used for downstream use cases. From the raw zone in Amazon S3, the objects need to be processed before they can be consumed by downstream generative AI models. A key requirement here is PII identification and redaction, which you can implement with Amazon Comprehend. It’s important to remember that it will not always be feasible to strip away all the sensitive data without impacting the context of the data. Semantic context is one of the key factors that drive the accuracy and relevance of generative AI model outputs, and it’s critical to work backward from the use case and strike the necessary balance between privacy controls and model performance.

Data enrichment

In addition, additional metadata may need to be extracted from the objects. Amazon Comprehend provides capabilities for entity recognition (for example, identifying domain-specific data like policy numbers and claim numbers) and custom classification (for example, categorizing a customer care chat transcript based on the issue description). Furthermore, you may need to combine the unstructured and structured data to create a holistic picture of key entities, like customers. For example, in an airline loyalty scenario, there would be significant value in linking unstructured data capture of customer interactions (such as customer chat transcripts and customer reviews) with structured data signals (such as ticket purchases and miles redemption) to create a more complete customer profile that can then enable the delivery of better and more relevant trip recommendations. AWS Entity Resolution is an ML service that helps in matching and linking records. This service helps link related sets of information to create deeper, more connected data about key entities like customers, products, and so on, which can further improve the quality and relevance of LLM outputs. This is available in the transformed zone in Amazon S3 and is ready to be consumed downstream for vector stores, fine-tuning, or training of LLMs. After these transformations, data can be made available in the curated zone in Amazon S3.

Data quality

A critical factor to realizing the full potential of generative AI is dependent on the quality of the data that is used to train the models as well as the data that is used to augment and enhance the model response to a user input. Understanding the models and their outcomes in the context of accuracy, bias, and reliability is directly proportional to the quality of data used to build and train the models.

Amazon SageMaker Model Monitor provides a proactive detection of deviations in model data quality drift and model quality metrics drift. It also monitors bias drift in your model’s predictions and feature attribution. For more details, refer to Monitoring in-production ML models at large scale using Amazon SageMaker Model Monitor. Detecting bias in your model is a fundamental building block to responsible AI, and Amazon SageMaker Clarify helps detect potential bias that can produce a negative or a less accurate result. To learn more, see Learn how Amazon SageMaker Clarify helps detect bias.

A newer area of focus in generative AI is the use and quality of data in prompts from enterprise and proprietary data stores. An emerging best practice to consider here is shift-left, which puts a strong emphasis on early and proactive quality assurance mechanisms. In the context of data pipelines designed to process data for generative AI applications, this implies identifying and resolving data quality issues earlier upstream to mitigate the potential impact of data quality issues later. AWS Glue Data Quality not only measures and monitors the quality of your data at rest in your data lakes, data warehouses, and transactional databases, but also allows early detection and correction of quality issues for your extract, transform, and load (ETL) pipelines to ensure your data meets the quality standards before it is consumed. For more details, refer to Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog.

Vector store governance

Embeddings in vector databases elevate the intelligence and capabilities of generative AI applications by enabling features such as semantic search and reducing hallucinations. Embeddings typically contain private and sensitive data, and encrypting the data is a recommended step in the user input workflow. Amazon OpenSearch Serverless stores and searches your vector embeddings, and encrypts your data at rest with AWS Key Management Service (AWS KMS). For more details, see Introducing the vector engine for Amazon OpenSearch Serverless, now in preview. Similarly, additional vector engine options on AWS, including Amazon Kendra and Amazon Aurora, encrypt your data at rest with AWS KMS. For more information, refer to Encryption at rest and Protecting data using encryption.

As embeddings are generated and stored in a vector store, controlling access to the data with role-based access control (RBAC) becomes a key requirement to maintaining overall security. Amazon OpenSearch Service provides fine-grained access controls (FGAC) features with AWS Identity and Access Management (IAM) rules that can be associated with Amazon Cognito users. Corresponding user access control mechanisms are also provided by OpenSearch Serverless, Amazon Kendra, and Aurora. To learn more, refer to Data access control for Amazon OpenSearch Serverless, Controlling user access to documents with tokens, and Identity and access management for Amazon Aurora, respectively.

User request-response workflows

Controls in the data governance plane need to be integrated into the generative AI application as part of the overall solution deployment to ensure compliance with data security (based on role-based access controls) and data privacy (based on role-based access to sensitive data) policies. The following figure illustrates the workflow for applying data governance.

Data governance in user prompt workflow

The workflow includes the following key data governance steps:

  1. Provide a valid input prompt for alignment with compliance policies (for example, bias and toxicity).
  2. Generate a query by mapping prompt keywords with the data catalog.
  3. Apply FGAC policies based on user role.
  4. Apply RBAC policies based on user role.
  5. Apply data and content redaction to the response based on user role permissions and compliance policies.

As part of the prompt cycle, the user prompt must be parsed and keywords extracted to ensure alignment with compliance policies using a service like Amazon Comprehend (see New for Amazon Comprehend – Toxicity Detection) or Guardrails for Amazon Bedrock (preview). When that is validated, if the prompt requires structured data to be extracted, the keywords can be used against the data catalog (business or technical) to extract the relevant data tables and fields and construct a query from the data warehouse. The user permissions are evaluated using AWS Lake Formation to filter the relevant data. In the case of unstructured data, the search results are restricted based on the user permission policies implemented in the vector store. As a final step, the output response from the LLM needs to be evaluated against user permissions (to ensure data privacy and security) and compliance with safety (for example, bias and toxicity guidelines).

Although this process is specific to a RAG implementation and is applicable to other LLM implementation strategies, there are additional controls:

  • Prompt engineering – Access to the prompt templates to invoke need to be restricted based on access controls augmented by business logic.
  • Fine-tuning models and training foundation models – In cases where objects from the curated zone in Amazon S3 are used as training data for fine-tuning the foundation models, the permissions policies need to be configured with Amazon S3 identity and access management at the bucket or object level based on the requirements.

Summary

Data governance is critical to enabling organizations to build enterprise generative AI applications. As enterprise use cases continue to evolve, there will be a need to expand the data infrastructure to govern and manage new, diverse, unstructured datasets to ensure alignment with privacy, security, and quality policies. These policies need to be implemented and managed as part of data ingestion, storage, and management of the enterprise knowledge base along with the user interaction workflows. This makes sure that the generative AI applications not only minimize the risk of sharing inaccurate or wrong information, but also protect from bias and toxicity that can lead to harmful or libelous outcomes. To learn more about data governance on AWS, see What is Data Governance?

In subsequent posts, we will provide implementation guidance on how to expand the governance of the data infrastructure to support generative AI use cases.


About the Authors

Krishna Rupanagunta leads a team of Data and AI Specialists at AWS. He and his team work with customers to help them innovate faster and make better decisions using Data, Analytics, and AI/ML. He can be reached via LinkedIn.

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Raghvender Arni (Arni) leads the Customer Acceleration Team (CAT) within AWS Industries. The CAT is a global cross-functional team of customer facing cloud architects, software engineers, data scientists, and AI/ML experts and designers that drives innovation via advanced prototyping, and drives cloud operational excellence via specialized technical expertise.

Reference guide to analyze transactional data in near-real time on AWS

Post Syndicated from Jason Dalba original https://aws.amazon.com/blogs/big-data/reference-guide-to-analyze-transactional-data-in-near-real-time-on-aws/

Business leaders and data analysts use near-real-time transaction data to understand buyer behavior to help evolve products. The primary challenge businesses face with near-real-time analytics is getting the data prepared for analytics in a timely manner, which can often take days. Companies commonly maintain entire teams to facilitate the flow of data from ingestion to analysis.

The consequence of delays in your organization’s analytics workflow can be costly. As online transactions have gained popularity with consumers, the volume and velocity of data ingestion has led to challenges in data processing. Consumers expect more fluid changes to service and products. Organizations that can’t quickly adapt their business strategy to align with consumer behavior may experience loss of opportunity and revenue in competitive markets.

To overcome these challenges, businesses need a solution that can provide near-real-time analytics on transactional data with services that don’t lead to latent processing and bloat from managing the pipeline. With a properly deployed architecture using the latest technologies in artificial intelligence (AI), data storage, streaming ingestions, and cloud computing, data will become more accurate, timely, and actionable. With such a solution, businesses can make actionable decisions in near-real time, allowing leaders to change strategic direction as soon as the market changes.

In this post, we discuss how to architect a near-real-time analytics solution with AWS managed analytics, AI and machine learning (ML), and database services.

Solution overview

The most common workloads, agnostic of industry, involve transactional data. Transactional data volumes and velocity have continued to rapidly expand as workloads have been pushed online. Near-real-time data is data stored, processed, and analyzed on a continual basis. It generates information that is available for use almost immediately after being generated. With the power of near-real-time analytics, business units across an organization, including sales, marketing, and operations, can make agile, strategic decisions. Without the proper architecture to support near real-time analytics, organizations will be dependent on delayed data and will not be able to capitalize on emerging opportunities. Missed opportunities could impact operational efficiency, customer satisfaction, or product innovation.

Managed AWS Analytics and Database services allow for each component of the solution, from ingestion to analysis, to be optimized for speed, with little management overhead. It is crucial for critical business solutions to follow the six pillars of the AWS Well-Architected Framework. The framework helps cloud architects build the most secure, high performing, resilient, and efficient infrastructure for critical workloads.

The following diagram illustrates the solution architecture.

Solution architecture

By combining the appropriate AWS services, your organization can run near-real-time analytics off a transactional data store. In the following sections, we discuss the key components of the solution.

Transactional data storage

In this solution, we use Amazon DynamoDB as our transactional data store. DynamoDB is a managed NoSQL database solution that acts as a key-value store for transactional data. As a NoSQL solution, DynamoDB is optimized for compute (as opposed to storage) and therefore the data needs to be modeled and served up to the application based on how the application needs it. This makes DynamoDB good for applications with known access patterns, which is a property of many transactional workloads.

In DynamoDB, you can create, read, update, or delete items in a table through a partition key. For example, if you want to keep track of how many fitness quests a user has completed in your application, you can query the partition key of the user ID to find the item with an attribute that holds data related to completed quests, then update the relevant attribute to reflect a specific quests completion. There are also some added benefits of DynamoDB by design, such as the ability to scale to support massive global internet-scale applications while maintaining consistent single-digit millisecond latency performance, because the date will be horizontally partitioned across the underlying storage nodes by the service itself through the partition keys. Modeling your data here is very important so DynamoDB can horizontally scale based on a partition key, which is again why it’s a good fit for a transactional store. In transactional workloads, when you know what the access patterns are, it will be easier to optimize a data model around those patterns as opposed to creating a data model to accept ad hoc requests. All that being said, DynamoDB doesn’t perform scans across many items as efficiently, so for this solution, we integrate DynamoDB with other services to help meet the data analysis requirements.

Data streaming

Now that we have stored our workload’s transactional data in DynamoDB, we need to move that data to another service that will be better suited for analysis of said data. The time to insights on this data matters, so rather than send data off in batches, we stream the data into an analytics service, which helps us get the near-real time aspect of this solution.

We use Amazon Kinesis Data Streams to stream the data from DynamoDB to Amazon Redshift for this specific solution. Kinesis Data Streams captures item-level modifications in DynamoDB tables and replicates them to a Kinesis data stream. Your applications can access this stream and view item-level changes in near-real time. You can continuously capture and store terabytes of data per hour. Additionally, with the enhanced fan-out capability, you can simultaneously reach two or more downstream applications. Kinesis Data Streams also provides durability and elasticity. The delay between the time a record is put into the stream and the time it can be retrieved (put-to-get delay) is typically less than 1 second. In other words, a Kinesis Data Streams application can start consuming the data from the stream almost immediately after the data is added. The managed service aspect of Kinesis Data Streams relieves you of the operational burden of creating and running a data intake pipeline. The elasticity of Kinesis Data Streams enables you to scale the stream up or down, so you never lose data records before they expire.

Analytical data storage

The next service in this solution is Amazon Redshift, a fully managed, petabyte-scale data warehouse service in the cloud. As opposed to DynamoDB, which is meant to update, delete, or read more specific pieces of data, Amazon Redshift is better suited for analytic queries where you are retrieving, comparing, and evaluating large amounts of data in multi-stage operations to produce a final result. Amazon Redshift achieves efficient storage and optimum query performance through a combination of massively parallel processing, columnar data storage, and very efficient, targeted data compression encoding schemes.

Beyond just the fact that Amazon Redshift is built for analytical queries, it can natively integrate with Amazon streaming engines. Amazon Redshift Streaming Ingestion ingests hundreds of megabytes of data per second, so you can query data in near-real time and drive your business forward with analytics. With this zero-ETL approach, Amazon Redshift Streaming Ingestion enables you to connect to multiple Kinesis data streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams and pull data directly to Amazon Redshift without staging data in Amazon Simple Storage Service (Amazon S3). You can define a schema or choose to ingest semi-structured data with the SUPER data type. With streaming ingestion, a materialized view is the landing area for the data read from the Kinesis data stream, and the data is processed as it arrives. When the view is refreshed, Redshift compute nodes allocate each data shard to a compute slice. We recommend you enable auto refresh for this materialized view so that your data is continuously updated.

Data analysis and visualization

After the data pipeline is set up, the last piece is data analysis with Amazon QuickSight to visualize the changes in consumer behavior. QuickSight is a cloud-scale business intelligence (BI) service that you can use to deliver easy-to-understand insights to the people who you work with, wherever they are.

QuickSight connects to your data in the cloud and combines data from many different sources. In a single data dashboard, QuickSight can include AWS data, third-party data, big data, spreadsheet data, SaaS data, B2B data, and more. As a fully managed cloud-based service, QuickSight provides enterprise-grade security, global availability, and built-in redundancy. It also provides the user-management tools that you need to scale from 10 users to 10,000, all with no infrastructure to deploy or manage.

QuickSight gives decision-makers the opportunity to explore and interpret information in an interactive visual environment. They have secure access to dashboards from any device on your network and from mobile devices. Connecting QuickSight to the rest of our solution will complete the flow of data from being initially ingested into DynamoDB to being streamed into Amazon Redshift. QuickSight can create a visual analysis of the data in near-real time because that data is relatively up to date, so this solution can support use cases for making quick decisions on transactional data.

Using AWS for data services allows for each component of the solution, from ingestion to storage to analysis, to be optimized for speed and with little management overhead. With these AWS services, business leaders and analysts can get near-real-time insights to drive immediate change based on customer behavior, enabling organizational agility and ultimately leading to customer satisfaction.

Next steps

The next step to building a solution to analyze transactional data in near-real time on AWS would be to go through the workshop Enable near real-time analytics on data stored in Amazon DynamoDB using Amazon Redshift. In the workshop, you will get hands-on with AWS managed analytics, AI/ML, and database services to dive deep into an end-to-end solution delivering near-real-time analytics on transactional data. By the end of the workshop, you will have gone through the configuration and deployment of the critical pieces that will enable users to perform analytics on transactional workloads.

Conclusion

Developing an architecture that can serve transactional data to near-real-time analytics on AWS can help business become more agile in critical decisions. By ingesting and processing transactional data delivered directly from the application on AWS, businesses can optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction.

The end-to-end solution is designed for individuals in various roles, such as business users, data engineers, data scientists, and data analysts, who are responsible for comprehending, creating, and overseeing processes related to retail inventory forecasting. Overall, being able to analyze near-real time transactional data on AWS can provide businesses timely insight, allowing for quicker decision making in fast paced industries.


About the Authors

Jason D’Alba is an AWS Solutions Architect leader focused on database and enterprise applications, helping customers architect highly available and scalable database solutions.

Veerendra Nayak is a Principal Database Solutions Architect based in the Bay Area, California. He works with customers to share best practices on database migrations, resiliency, and integrating operational data with analytics and AI services.

Evan Day is a Database Solutions Architect at AWS, where he helps customers define technical solutions for business problems using the breadth of managed database services on AWS. He also focuses on building solutions that are reliable, performant, and cost efficient.

Design a data mesh on AWS that reflects the envisioned organization

Post Syndicated from Claudia Chitu original https://aws.amazon.com/blogs/big-data/design-a-data-mesh-on-aws-that-reflects-the-envisioned-organization/

This post is written in collaboration with Claudia Chitu and Spyridon Dosis from ACAST.

Founded in 2014, Acast is the world’s leading independent podcast company, elevating podcast creators and podcast advertisers for the ultimate listening experience. By championing an independent and open ecosystem for podcasting, Acast aims to fuel podcasting with the tools and monetization needed to thrive.

The company uses AWS Cloud services to build data-driven products and scale engineering best practices. To ensure a sustainable data platform amid growth and profitability phases, their tech teams adopted a decentralized data mesh architecture.

In this post, we discuss how Acast overcame the challenge of coupled dependencies between teams working with data at scale by employing the concept of a data mesh.

The problem

With an accelerated growth and expansion, Acast encountered a challenge that resonates globally. Acast found itself with diverse business units and a vast amount of data generated across the organization. The existing monolith and centralized architecture was struggling to meet the growing demands of data consumers. Data engineers were finding it increasingly challenging to maintain and scale the data infrastructure, resulting in data access, data silos, and inefficiencies in data management. A key objective was to enhance the end-to-end user experience, starting from the business needs.

Acast needed to address these challenges in order to get to an operational scale, meaning a global maximum of the number of people that can independently operate and deliver value. In this case, Acast tried to tackle the challenge of this monolith structure and the high time to value for product teams, tech teams, end consumers. It’s worth mentioning that they also have other product and tech teams, including operational or business teams, without AWS accounts.

Acast has a variable number of product teams, continuously evolving by merging existing ones, splitting them, adding new people, or simply creating new teams. In the last 2 years, they have had between 10–20 teams, consisting of 4–10 people each. Each team owns at least two AWS accounts, up to 10 accounts, depending on the ownership. The majority of data produced by these accounts is used downstream for business intelligence (BI) purposes and in Amazon Athena, by hundreds of business users every day.

The solution Acast implemented is a data mesh, architected on AWS. The solution mirrors the organizational structure rather than an explicit architectural decision. As per the Inverse Conway Maneuver, Acast’s technology architecture displays isomorphism with the business architecture. In this case, the business users are enabled through the data mesh architecture to get faster time to insights and know directly who the domain specific owners are, speeding up collaboration. This will be further detailed when we discuss the AWS Identity and Access Management (IAM) roles used, because one of the roles is dedicated to the business group.

Parameters of success

Acast succeeded in bootstrapping and scaling a new team- and domain-oriented data product and its corresponding infrastructure and setup, resulting in less friction in gathering insights and happier users and consumers.

The success of the implementation meant assessing various aspects of the data infrastructure, data management, and business outcomes. They classified the metrics and indicators in the following categories:

  • Data usage – A clear understanding of who is consuming what data source, materialized with a mapping of consumers and producers. Discussions with users showed they were happier to have faster access to data in a simpler way, a more structured data organization, and a clear mapping of who the producer is. A lot of progress has been made to advance their data-driven culture (data literacy, data sharing, and collaboration across business units).
  • Data governance – With their service-level object stating when the data sources are available (among other details), teams know whom to notify and can do so in a shorter time when there is late data coming in or other issues with the data. With a data steward role in place, the ownership has been strengthened.
  • Data team productivity – Through engineering retrospectives, Acast found that their teams appreciate autonomy to make decisions regarding their data domains.
  • Cost and resource efficiency – This is an area where Acast observed a reduction in data duplication, and therefore cost reduction (in some accounts, removing the copy of data 100%), by reading data across accounts while enabling scaling.

Data mesh overview

A data mesh is a sociotechnical approach to build a decentralized data architecture by using a domain-oriented, self-serve design (in a software development perspective), and borrows Eric Evans’ theory of domain-driven design and Manuel Pais’ and Matthew Skelton’s theory of team topologies. It’s important to establish the context to understand what data mesh is because it sets the stage for the technical details that follow and can help you understand how the concepts discussed in this post fit into the broader framework of a data mesh.

To recap before diving deeper into Acast’s implementation, the data mesh concept is based on the following principles:

  • It’s domain driven, as opposed to pipelines as a first-class concern
  • It serves data as a product
  • It’s a good product that delights users (data is trustworthy, documentation is available, and it’s easily consumable)
  • It offers federated computational governance and decentralized ownership—a self-serve data platform

Domain-driven architecture

In Acast’s approach of owning the operational and analytical datasets, teams are structured with ownership based on domain, reading directly from the producer of the data, via an API or programmatically from Amazon S3 storage or using Athena as a SQL query engine. Some examples of Acast’s domains are presented in the following figure.

As illustrated in the preceding figure, some domains are loosely coupled to other domains’ operational or analytical endpoints, with a different ownership. Others might have stronger dependency, which is expected, for business (some podcasters can be also advertisers, creating sponsorship creatives and running campaigns for their own shows, or transacting ads using Acast’s software as a service).

Data as a product

Treating data as a product entails three key components: the data itself, the metadata, and the associated code and infrastructure. In this approach, teams responsible for generating data are referred to as producers. These producer teams possess in-depth knowledge about their consumers, understanding how their data product is utilized. Any changes planned by the data producers are communicated in advance to all consumers. This proactive notification ensures that downstream processes are not disrupted. By providing consumers with advance notice, they have sufficient time to prepare for and adapt to the upcoming changes, maintaining a smooth and uninterrupted workflow. The producers run a new version of the initial dataset in parallel, notify the consumers individually, and discuss with them their necessary timeframe to start consuming the new version. When all consumers are using the new version, the producers make the initial version unavailable.

Data schemas are inferred from the common agreed-upon format to share files between teams, which is Parquet in the case of Acast. Data can be shared in files, batched or stream events, and more. Each team has its own AWS account acting as an independent and autonomous entity with its own infrastructure. For orchestration, they use the AWS Cloud Development Kit (AWS CDK) for infrastructure as code (IaC) and AWS Glue Data Catalogs for metadata management. Users can also raise requests to producers to improve the way the data is presented or to enrich the data with new data points for generating a higher business value.

With each team owning an AWS account and a data catalog ID from Athena, it’s straightforward to see this through the lenses of a distributed data lake on top of Amazon S3, with a common catalog mapping all the catalogs from all the accounts.

At the same time, each team can also map other catalogs to their own account and use their own data, which they produce along with the data from other accounts. Unless it is sensitive data, the data can be accessed programmatically or from the AWS Management Console in a self-service manner without being dependent on the data infrastructure engineers. This is a domain-agnostic, shared way to self-serve data. The product discovery happens through the catalog registration. Using only a few standards commonly agreed upon and adopted across the company, for the purpose of interoperability, Acast addressed the fragmented silos and friction to exchange data or consume domain-agnostic data.

With this principle, teams get assurance that the data is secure, trustworthy, and accurate, and appropriate access controls are managed at each domain level. Moreover, on the central account, roles are defined for different types of permissions and access, using AWS IAM Identity Center permissions. All datasets are discoverable from a single central account. The following figure illustrates how it’s instrumented, where two IAM roles are assumed by two types of user (consumer) groups: one that has access to a limited dataset, which is restricted data, and one that has access to non-restricted data. There is also a way to assume any of these roles, for service accounts, such as those used by data processing jobs in Amazon Managed Workflows for Apache Airflow (Amazon MWAA), for example.

How Acast solved for high alignment and a loosely coupled architecture

The following diagram shows a conceptual architecture of how Acast’s teams are organizing data and collaborating with each other.

Acast used the Well-Architected Framework for the central account to improve its practice running analytical workloads in the cloud. Through the lenses of the tool, Acast was able to address better monitoring, cost optimization, performance, and security. It helped them understand the areas where they could improve their workloads and how to address common issues, with automated solutions, as well as how to measure the success, defining KPIs. It saved them time to get the learnings that otherwise would have been taking longer to find. Spyridon Dosis, Acast’s Information Security Officer, shares, “We are happy AWS is always ahead with releasing tools that enable the configuration, assessment, and review of multi-account setup. This is a big plus for us, working in a decentralized organization.” Spyridon also adds, “A very important concept we value is the AWS security defaults (e.g. default encryption for S3 buckets).”

In the architecture diagram, we can see that each team can be a data producer, except the team owning the central account, which serves as the central data platform, modeling the logic from multiple domains to paint the full business picture. All other teams can be data producers or data consumers. They can connect to the central account and discover datasets via the cross-account AWS Glue Data Catalog, analyze them in the Athena query editor or with Athena notebooks, or map the catalog to their own AWS account. Access to the central Athena catalog is implemented with IAM Identity Center, with roles for open data and restricted data access.

For non-sensitive data (open data), Acast uses a template where the datasets are by default open to the entire organization to read from, using a condition to provide the organization-assigned ID parameter, as shown in the following code snippet:

{
    "Version": "2012-10-17",
    "Statement": [
        
        {
            "Effect": "Allow",
            "Principal": "*",
            "Action": [
               "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"  
            ],
            "Resource": [
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET",
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:PrincipalOrgID": "ORG-ID-NUMBER"
                }
            }
        }
    ]
}

When handling sensitive data like financials, the teams use a collaborative data steward model. The data steward works with the requester to evaluate access justification for the intended use case. Together, they determine appropriate access methods to meet the need while maintaining security. This could include IAM roles, service accounts, or specific AWS services. This approach enables business users outside the tech organization (which means they don’t have an AWS account) to independently access and analyze the information they need. By granting access through IAM policies on AWS Glue resources and S3 buckets, Acast provides self-serve capabilities while still governing delicate data through human review. The data steward role has been valuable for understanding use cases, assessing security risks, and ultimately facilitating access that accelerates the business through analytical insights.

For Acast’s use case, granular row- or column-level access controls weren’t needed, so the approach sufficed. However, other organizations may require more fine-grained governance over sensitive data fields. In those cases, solutions like AWS Lake Formation could implement permissions needed, while still providing a self-serve data access model. For more information, refer to Design a data mesh architecture using AWS Lake Formation and AWS Glue.

At the same time, teams can read from other producers directly, from Amazon S3 or via an API, keeping the dependency at minimum, which enhances the velocity of development and delivery. Therefore, an account can be a producer and a consumer in parallel. Each team is autonomous, and is accountable for their own tech stack.

Additional learnings

What did Acast learn? So far, we’ve discussed that the architectural design is an effect of the organizational structure. Because the tech organization consists of multiple cross-functional teams, and it’s straightforward to bootstrap a new team, following the common principles of data mesh, Acast learned this doesn’t go seamlessly every time. To set up a fully new account in AWS, teams go through the same journey, but slightly different, considering their own set of particularities.

This can create certain frictions, and it’s difficult to get all data producing teams to reach a high maturity of being data producers. This can be explained by the different data competencies in those cross-functional teams and not being dedicated data teams.

By implementing the decentralized solution, Acast effectively tackled the scalability challenge by adapting their teams to align with evolving business needs. This approach ensures high decoupling and alignment. Furthermore, they strengthened ownership, significantly reducing the time needed to identify and resolve issues because the upstream source is readily known and easily accessible with specified SLAs. The volume of data support inquiries has seen a reduction of over 50%, because business users are empowered to gain faster insights. Notably, they successfully eliminated tens of terabytes of redundant storage that were previously copied solely to fulfill downstream requests. This achievement was made possible through the implementation of cross-account reading, leading to the removal of associated development and maintenance costs for these pipelines.

Conclusion

Acast used the Inverse Conway Maneuver law and employed AWS services where each cross-functional product team has its own AWS account to build a data mesh architecture that allows scalability, high ownership, and self-service data consumption. This has been working well for the company, regarding how data ownership and operations were approached, to meet their engineering principles, resulting in having the data mesh as an effect rather than a deliberate intent. For other organizations, the desired data mesh might look different and the approach might have other learnings.

To conclude, a modern data architecture on AWS allows you to efficiently construct data products and data mesh infrastructure at a low cost without compromising on performance.

The following are some examples of AWS services you can use to design your desired data mesh on AWS:


About the Authors

Claudia Chitu is a Data strategist and an influential leader in the Analytics space. Focused on aligning data initiatives with the overall strategic goals of the organization, she employs data as a guiding force for long-term planning and sustainable growth.

Spyridon Dosis is an Information Security Professional in Acast. Spyridon supports the organization in designing, implementing and operating its services in a secure manner protecting the company and users’ data.

Srikant Das is an Acceleration Lab Solutions Architect at Amazon Web Services. He has over 13 years of experience in Big Data analytics and Data Engineering, where he enjoys building reliable, scalable, and efficient solutions. Outside of work, he enjoys traveling and blogging his experiences in social media.

How smava makes loans transparent and affordable using Amazon Redshift Serverless

Post Syndicated from Alex Naumov original https://aws.amazon.com/blogs/big-data/how-smava-makes-loans-transparent-and-affordable-using-amazon-redshift-serverless/

This is a guest post co-written by Alex Naumov, Principal Data Architect at smava.

smava GmbH is one of the leading financial services companies in Germany, making personal loans transparent, fair, and affordable for consumers. Based on digital processes, smava compares loan offers from more than 20 banks. In this way, borrowers can choose the deals that are most favorable to them in a fast, digitalized, and efficient way.

smava believes in and takes advantage of data-driven decisions in order to become the market leader. The Data Platform team is responsible for supporting data-driven decisions at smava by providing data products across all departments and branches of the company. The departments include teams from engineering to sales and marketing. Branches range by products, namely B2C loans, B2B loans, and formerly also B2C mortgages. The data products used inside the company include insights from user journeys, operational reports, and marketing campaign results, among others. The data platform serves on average 60 thousand queries per day. The data volume is in double-digit TBs with steady growth as business and data sources evolve.

smava’s Data Platform team faced the challenge to deliver data to stakeholders with different SLAs, while maintaining the flexibility to scale up and down while staying cost-efficient. It took up to 3 hours to generate daily reporting, which impacted business decision-making when re-calculations needed to happen during the day. To speed up the self-service analytics and foster innovation based on data, a solution was needed to provide ways to allow any team to create data products on their own in a decentralized manner. To create and manage the data products, smava uses Amazon Redshift, a cloud data warehouse.

In this post, we show how smava optimized their data platform by using Amazon Redshift Serverless and Amazon Redshift data sharing to overcome right-sizing challenges for unpredictable workloads and further improve price-performance. Through the optimizations, smava achieved up to 50% cost savings and up to three times faster report generation compared to the previous analytics infrastructure.

Overview of solution

As a data-driven company, smava relies on the AWS Cloud to power their analytics use cases. To bring their customers the best deals and user experience, smava follows the modern data architecture principles with a data lake as a scalable, durable data store and purpose-built data stores for analytical processing and data consumption.

smava ingests data from various external and internal data sources into a landing stage on the data lake based on Amazon Simple Storage Service (Amazon S3). To ingest the data, smava uses a set of popular third-party customer data platforms complemented by custom scripts.

After the data lands in Amazon S3, smava uses the AWS Glue Data Catalog and crawlers to automatically catalog the available data, capture the metadata, and provide an interface that allows querying all data assets.

Data analysts who require access to the raw assets on the data lake use Amazon Athena, a serverless, interactive analytics service for exploration with ad hoc queries. For the downstream consumption by all departments across the organization, smava’s Data Platform team prepares curated data products following the extract, load, and transform (ELT) pattern. smava uses Amazon Redshift as their cloud data warehouse to transform, store, and analyze data, and uses Amazon Redshift Spectrum to efficiently query and retrieve structured and semi-structured data from the data lake using SQL.

smava follows the data vault modeling methodology with the Raw Vault, Business Vault, and Data Mart stages to prepare the data products for end consumers. The Raw Vault describes objects loaded directly from the data sources and represents a copy of the landing stage in the data lake. The Business Vault is populated with data sourced from the Raw Vault and transformed according to the business rules. Finally, the data is aggregated into specific data products oriented to a specific business line. This is the Data Mart stage. The data products from the Business Vault and Data Mart stages are now available for consumers. smava decided to use Tableau for business intelligence, data visualization, and further analytics. The data transformations are managed with dbt to simplify the workflow governance and team collaboration.

The following diagram shows the high-level data platform architecture before the optimizations.

High-level Data Platform architecture before the optimizations

Evolution of the data platform requirements

smava started with a single Redshift cluster to host all three data stages. They chose provisioned cluster nodes of the RA3 type with Reserved Instances (RIs) for cost optimization. As data volumes grew 53% year over year, so did the complexity and requirements from various analytic workloads.

smava quickly addressed the growing data volumes by right-sizing the cluster and using Amazon Redshift Concurrency Scaling for peak workloads. Furthermore, smava wanted to give all teams the option to create their own data products in a self-service manner to increase the pace of innovation. To avoid any interference with the centrally managed data products, the decentralized product development environments needed to be strictly isolated. The same requirement was also applied for the isolation of different product stages curated by the Data Platform team.

Optimizing the architecture with data sharing and Redshift Serverless

To meet the evolved requirements, smava decided to separate the workload by splitting the single provisioned Redshift cluster into multiple data warehouses, with each warehouse serving a different stage. In addition, smava added new staging environments in the Business Vault to develop new data products without the risk of interfering with existing product pipelines. To avoid any interference with the centrally managed data products of the Data Platform team, smava introduced an additional Redshift cluster, isolating the decentralized workloads.

smava was looking for an out-of-the-box solution to achieve workload isolation without managing a complex data replication pipeline.

Right after the launch of Redshift data sharing capabilities in 2021, the Data Platform team recognized that this was the solution they had been looking for. smava adopted the data sharing feature to have the data from producer clusters available for read access on different consumer clusters, with each of those consumer clusters serving a different stage.

Redshift data sharing enables instant, granular, and fast data access across Redshift clusters without the need to copy data. It provides live access to data so that users always see the most up-to-date and consistent information as it’s updated in the data warehouse. With data sharing, you can securely share live data with Redshift clusters in the same or different AWS accounts and across Regions.

With Redshift data sharing, smava was able to optimize the data architecture by separating the data workloads to individual consumer clusters without having to replicate the data. The following diagram illustrates the high-level data platform architecture after splitting the single Redshift cluster into multiple clusters.

High-level Data Platform architecture after splitting the single Redshift cluster in multiple clusters

By providing a self-service data mart, smava increased data democratization by providing users with access to all aspects of the data. They also provided teams with a set of custom tools for data discovery, ad hoc analysis, prototyping, and operating the full lifecycle of mature data products.

After collecting operational data from the individual clusters, the Data Platform team identified further potential optimizations: the Raw Vault cluster was under steady load 24/7, but the Business Vault clusters were only updated nightly. To optimize for costs, smava used the pause and resume capabilities of Redshift provisioned clusters. These capabilities are useful for clusters that need to be available at specific times. While the cluster is paused, on-demand billing is suspended. Only the cluster’s storage incurs charges.

The pause and resume feature helped smava optimize for cost, but it required additional operational overhead to trigger the cluster operations. Additionally, the development clusters remained subject to idle times during working hours. These challenges were finally solved by adopting Redshift Serverless in 2022. The Data Platform team decided to move the Business Data Vault stage clusters to Redshift Serverless, which allows them to pay for the data warehouse only when in use, reliably and efficiently.

Redshift Serverless is ideal for cases when it’s difficult to predict compute needs such as variable workloads, periodic workloads with idle time, and steady-state workloads with spikes. Additionally, as usage demand evolves with new workloads and more concurrent users, Redshift Serverless automatically provisions the right compute resources, and the data warehouse scales seamlessly and automatically, without the need for manual intervention. Data sharing is supported in both directions between Redshift Serverless and provisioned Redshift clusters with RA3 nodes, so no changes to the smava architecture were needed. The following diagram shows the high-level architecture setup after the move to Redshift Serverless.

High-level Data Platform architecture after introducing Redshift Serverless for Business Vault clusters

smava combined the benefits of Redshift Serverless and dbt through a seamless CI/CD pipeline, adopting a trunk-based development methodology. Changes on the Git repository are automatically deployed to a test stage and validated using automated integration tests. This approach increased the efficiency of developers and decreased the average time to production from days to minutes.

smava adopted an architecture that utilizes both provisioned and serverless Redshift data warehouses, together with the data sharing capability to isolate the workloads. By choosing the right architectural patterns for their needs, smava was able to accomplish the following:

  • Simplify the data pipelines and reduce operational overhead
  • Reduce the feature release time from days to minutes
  • Increase price-performance by reducing idle times and right-sizing the workload
  • Achieve up to three times faster report generation (faster calculations and higher parallelization) at 50% of the original setup costs
  • Increase agility of all departments and support data-driven decision-making by democratizing access to data
  • Increase the speed of innovation by exposing self-service data capabilities for teams across all departments and strengthening the A/B test capabilities to cover the complete customer journey

Now, all departments at smava are using the available data products to make data-driven, accurate, and agile decisions.

Future vision

For the future, smava plans to continue to optimize the Data Platform based on operational metrics. They’re considering switching more provisioned clusters like the Self-Service Data Mart cluster to serverless. Additionally, smava is optimizing the ELT orchestration toolchain to increase the number of parallel data pipelines to be run. This will increase the utilization of provisioned Redshift resources and allow for cost reductions.

With the introduction of the decentralized, self-service for data product creation, smava made a step forward towards a data mesh architecture. In the future, the Data Platform team plans to further evaluate the needs of their service users and establish further data mesh principles like federated data governance.

Conclusion

In this post, we showed how smava optimized their data platform by isolating environments and workloads using Redshift Serverless and data sharing features. Those Redshift environments are well integrated with their infrastructure, flexible in scaling on demand, and highly available, and they require minimum administration efforts. Overall, smava has increased performance by three times while reducing the total platform costs by 50%. Additionally, they reduced operational overhead to a minimum while maintaining the existing SLAs for report generation times. Moreover, smava has strengthened the culture of innovation by providing self-service data product capabilities to speed up their time to market.

If you’re interested in learning more about Amazon Redshift capabilities, we recommend watching the most recent What’s new with Amazon Redshift session in the AWS Events channel to get an overview of the features recently added to the service. You can also explore the self-service, hands-on Amazon Redshift labs to experiment with key Amazon Redshift functionalities in a guided manner.

You can also dive deeper into Redshift Serverless use cases and data sharing use cases. Additionally, check out the data sharing best practices and discover how other customers optimized for cost and performance with Redshift data sharing to get inspired for your own workloads.

If you prefer books, check out Amazon Redshift: The Definitive Guide by O’Reilly, where the authors detail the capabilities of Amazon Redshift and provide you with insights on corresponding patterns and techniques.


About the Authors

Blog author: Alex NaumovAlex Naumov is a Principal Data Architect at smava GmbH, and leads the transformation projects at the Data department. Alex previously worked 10 years as a consultant and data/solution architect in a wide variety of domains, such as telecommunications, banking, energy, and finance, using various tech stacks, and in many different countries. He has a great passion for data and transforming organizations to become data-driven and the best in what they do.

Blog author: Lingli ZhengLingli Zheng works as a Business Development Manager in the AWS worldwide specialist organization, supporting customers in the DACH region to get the best value out of Amazon analytics services. With over 12 years of experience in energy, automation, and the software industry with a focus on data analytics, AI, and ML, she is dedicated to helping customers achieve tangible business results through digital transformation.

Blog author: Alexander SpivakAlexander Spivak is a Senior Startup Solutions Architect at AWS, focusing on B2B ISV customers across EMEA North. Prior to AWS, Alexander worked as a consultant in financial services engagements, including various roles in software development and architecture. He is passionate about data analytics, serverless architectures, and creating efficient organizations.


This post was reviewed for technical accuracy by David Greenshtein, Senior Analytics Solutions Architect.

Unlocking the value of data as your differentiator

Post Syndicated from G2 Krishnamoorthy original https://aws.amazon.com/blogs/big-data/unlocking-the-value-of-data-as-your-differentiator/

Today on the AWS re:Invent keynote stage, Swami Sivasubramanian, VP of Data and AI, AWS, spoke about the beneficial relationship among data, generative AI, and humans—all working together to unleash new possibilities in efficiency and creativity. There has never been a more exciting time in modern technology. Innovation is accelerating everywhere, and the future is rife with possibility. While Swami explored many facets of this beneficial relationship in the keynote today, one area that is especially critical for our customers to get right if they want to see success in generative AI is data. When you want to build generative AI applications that are unique to your business needs, data is the differentiator. This week, we launched many new tools to help you turn your data into your differentiator. This includes tools to help you customize your foundation models, and new services and features to build a strong data foundation to fuel your generative AI applications.

Customizing foundation models

The need for data is quite obvious if you are building your own foundation models (FMs). These models need vast amounts of data. But data is necessary even when you are building on top of FMs. If you think about it, everyone has access to the same models for building generative AI applications. It’s data that is the key to moving from generic applications to generative AI applications that create real value for your customers and your business. For instance, Intuit’s new generative AI-powered assistant, Intuit Assist, uses relevant contextual datasets spanning small business, consumer finance, and tax information to deliver personalized financial insights to their customers. With Amazon Bedrock, you can privately customize FMs for your specific use case using a small set of your own labeled data through a visual interface without writing any code. Today, we announced the ability to fine-tune Cohere Command and Meta Llama 2 in addition to Amazon Titan. In addition to fine-tuning, we’re also making it easier for you to provide models with up-to-date and contextually relevant information from your data sources using Retrieval Augmented Generation (RAG). Amazon Bedrock’s Knowledge Bases feature, which went to general availability today, supports the entire RAG workflow, from ingestion, to retrieval, and prompt augmentation. Knowledge Bases works with popular vector databases and engines including Amazon OpenSearch Serverless, Redis Enterprise Cloud, and Pinecone, with support for Amazon Aurora and MongoDB coming soon.

Building a strong data foundation

To produce the high-quality data that you need to build or customize FMs for generative AI, you need a strong data foundation. Of course, the value of a strong data foundation is not new and the need for one spans well beyond generative AI. Across all types of use cases, from generative AI to business intelligence (BI), we’ve found that a strong data foundation includes a comprehensive set of services to meet all your use case needs, integrations across those services to break down data silos, and tools to govern data across the end-to-end data workflow so you can innovate more quickly. These tools also need to be intelligent to remove the heavy lifting from data management.

Comprehensive

First, you need a comprehensive set of data services so you can get the price/performance, speed, flexibility, and capabilities for any use case. AWS offers a broad set of tools that enable you to store, organize, access, and act upon various types of data. We have the broadest selection of database services, including relational databases like Aurora and Amazon Relational Database Service (Amazon RDS)—and on Monday, we introduced the newest addition to the RDS family: Amazon RDS for Db2. Now Db2 customers can easily set up, operate, and scale highly available Db2 databases in the cloud. We also offer non-relational databases like Amazon DynamoDB, used by over 1 million customers for its serverless, single-digit millisecond performance at any scale. You also need services to store data for analysis and machine learning (ML) like Amazon Simple Storage Service (Amazon S3). Customers have created hundreds of thousands of data lakes on Amazon S3. It also includes our data warehouse, Amazon Redshift, which delivers more than 6 times better price/performance than other cloud data warehouses. We also have tools that enable you to act on your data, including Amazon QuickSight for BI, Amazon SageMaker for ML, and of course, Amazon Bedrock for generative AI.

Serverless enhancements

The dynamic nature of data makes it perfectly suited to serverless technologies, which is why AWS offers a broad range of serverless database and analytics offerings that help support our customers’ most demanding workloads. This week, we made even more improvements to our serverless options in this area, including a new Aurora capability that automatically scales to millions of write transactions per second and manages petabytes of data while maintaining the simplicity of operating a single database. We also released a new serverless option for Amazon ElastiCache, which makes it faster and easier to create highly available caches and instantly scales to meet application demand. Finally, we announced new AI-driven scaling and optimizations for Amazon Redshift Serverless that enable the service to learn from your patterns and proactively scale on multiple dimensions, including concurrent users, data variability, and query complexity. It does all of this while factoring in your price/performance targets so you can optimize between cost and performance.

Vector capabilities across more databases

Your data foundation also needs to include services to store, index, retrieve, and search vector data. As our customers need vector embeddings as part as part of their generative AI application workflows, they told us they want to use vector capabilities in their existing databases to eliminate the steep learning curve for new programming tools, APIs, and SDKs. They also feel more confident knowing their existing databases are proven in production and meet requirements for scalability, availability, and storage and compute. And when your vectors and business data are stored in the same place, your applications will run faster—and there’s no data sync or data movement to worry about.

For all of these reasons, we’ve invested in adding vector capabilities to some of our most popular data services, including Amazon OpenSearch Service and OpenSearch Serverless, Aurora, and Amazon RDS. Today, we added four more to that list, with the addition of vector support in Amazon MemoryDB for Redis, Amazon DocumentDB (with MongoDB compatibility), DynamoDB, and Amazon Neptune. Now you can use vectors and generative AI with your database of choice.

Integrated

Another key to your data foundation is integrating data across your data sources for a more complete view of your business. Typically, connecting data across different data sources requires complex extract, transform, and load (ETL) pipelines, which can take hours—if not days—to build. These pipelines also have to be continuously maintained and can be brittle. AWS is investing in a zero-ETL future so you can quickly and easily connect and act on all your data, no matter where it lives. We’re delivering on this vision in a number of ways, including zero-ETL integrations between our most popular data stores. Earlier this year, we brought you our fully managed zero-ETL integration between Amazon Aurora MySQL-Compatible Edition and Amazon Redshift. Within seconds of data being written into Aurora, you can use Amazon Redshift to do near-real-time analytics and ML on petabytes of data. Woolworths, a pioneer in retail who helped build the retail model of today, was able to reduce development time for analysis of promotions and other events from 2 months to 1 day using the Aurora zero-ETL integration with Amazon Redshift.

More zero-ETL options

At re:Invent, we announced three more zero-ETL integrations with Amazon Redshift, including Amazon Aurora PostgreSQL-Compatible Edition, Amazon RDS for MySQL, and DynamoDB, to make it easier for you to take advantage of near-real-time analytics to improve your business outcomes. In addition to Amazon Redshift, we’ve also expanded our zero ETL support to OpenSearch Service, which tens of thousands of customers use for real-time search, monitoring, and analysis of business and operational data. This includes zero-ETL integrations with DynamoDB and Amazon S3. With all of these zero-ETL integrations, we’re making it even easier to leverage relevant data for your applications, including generative AI.

Governed

Finally, your data foundation needs to be secure and governed to ensure the data that’s used throughout the development cycle of your generative AI applications is high quality and compliant. To help with this, we launched Amazon DataZone last year. Amazon DataZone is being used by companies like Guardant Health and Bristol Meyers Squibb to catalog, discover, share, and govern data across their organization. Amazon DataZone uses ML to automatically add metadata to your data catalog, making all of your data more discoverable. This week, we added a new feature to Amazon DataZone that uses generative AI to automatically create business descriptions and context for your datasets with just a few clicks, making data even easier to understand and apply. While Amazon DataZone helps you share data in a governed way within your organization, many customers also want to securely share data with their partners.

Infusing intelligence across the data foundation

Not only have we added generative AI to Amazon DataZone, but we’re leveraging intelligent technology across our data services to make data easier to use, more intuitive to work with, and more accessible. Amazon Q, our new generative AI assistant, helps you in QuickSight to author dashboards and create compelling visual stories from your dashboard data using natural language. We also announced that Amazon Q can help you create data integration pipelines using natural language. For example, you can ask Q to “read JSON files from S3, join on ‘accountid’, and load into DynamoDB,” and Q will return an end-to-end data integration job to perform this action. Amazon Q is also making it easier to query data in your data warehouse with generative AI SQL in Amazon Redshift Query Editor (in preview). Now data analysts, scientists, and engineers can be more productive using generative AI text-to-code functionality. You can also improve accuracy by enabling query history access to specific users—without compromising data privacy.

These new innovations are going to make it easy for you to leverage data to differentiate your generative AI applications and create new value for your customers and your business. We look forward to seeing what you create!


About the authors

G2 Krishnamoorthy is VP of Analytics, leading AWS data lake services, data integration, Amazon OpenSearch Service, and Amazon QuickSight. Prior to his current role, G2 built and ran the Analytics and ML Platform at Facebook/Meta, and built various parts of the SQL Server database, Azure Analytics, and Azure ML at Microsoft.

Rahul Pathak is VP of Relational Database Engines, leading Amazon Aurora, Amazon Redshift, and Amazon QLDB. Prior to his current role, he was VP of Analytics at AWS, where he worked across the entire AWS database portfolio. He has co-founded two companies, one focused on digital media analytics and the other on IP-geolocation.

BMW Cloud Efficiency Analytics powered by Amazon QuickSight and Amazon Athena

Post Syndicated from Phillip Karg original https://aws.amazon.com/blogs/big-data/bmw-cloud-efficiency-analytics-powered-by-amazon-quicksight-and-amazon-athena/

This post is written in collaboration with Philipp Karg and Alex Gutfreund  from BMW Group.

Bayerische Motoren Werke AG (BMW) is a motor vehicle manufacturer headquartered in Germany with 149,475 employees worldwide and the profit before tax in the financial year 2022 was € 23.5 billion on revenues amounting to € 142.6 billion. BMW Group is one of the world’s leading premium manufacturers of automobiles and motorcycles, also providing premium financial and mobility services.

BMW Group uses 4,500 AWS Cloud accounts across the entire organization but is faced with the challenge of reducing unnecessary costs, optimizing spend, and having a central place to monitor costs. BMW Cloud Efficiency Analytics (CLEA) is a homegrown tool developed within the BMW FinOps CoE (Center of Excellence) aiming to optimize and reduce costs across all these accounts.

In this post, we explore how the BMW Group FinOps CoE implemented their Cloud Efficiency Analytics tool (CLEA), powered by Amazon QuickSight and Amazon Athena. With this tool, they effectively reduced costs and optimized spend across all their AWS Cloud accounts, utilizing a centralized cost monitoring system and using key AWS services. The CLEA dashboards were built on the foundation of the Well-Architected Lab. For more information on this foundation, refer to A Detailed Overview of the Cost Intelligence Dashboard.

CLEA gives full transparency into cloud costs, usage, and efficiency from a high-level overview to granular service, resource, and operational levels. It seamlessly consolidates data from various data sources within AWS, including AWS Cost Explorer (and forecasting with Cost Explorer), AWS Trusted Advisor, and AWS Compute Optimizer. Additionally, it incorporates BMW Group’s internal system to integrate essential metadata, offering a comprehensive view of the data across various dimensions, such as group, department, product, and applications.

The ultimate goal is to raise awareness of cloud efficiency and optimize cloud utilization in a cost-effective and sustainable manner. The dashboards, which offer a holistic view together with a variety of cost and BMW Group-related dimensions, were successfully launched in May 2023 and became accessible to users within the BMW Group.

Overview of the BMW Cloud Data Hub

At the BMW Group, Cloud Data Hub (CDH) is the central platform for managing company-wide data and data solutions. It works as a bundle for resources that are bound to a specific staging environment and Region to store data on Amazon Simple Storage Service (Amazon S3), which is renowned for its industry-leading scalability, data availability, security, and performance. Additionally, it manages table definitions in the AWS Glue Data Catalog, containing references to data sources and targets of extract, transform, and load (ETL) jobs in AWS Glue.

Data providers and consumers are the two fundamental users of a CDH dataset. Providers create datasets within assigned domain and as the owner of a dataset, they are responsible for the actual content and for providing appropriate metadata. They can use their own toolsets or rely on provided blueprints to ingest the data from source systems. Once released, consumers use datasets from different providers for analysis, machine learning (ML) workloads, and visualization.

Each CDH dataset has three processing layers: source (raw data), prepared (transformed data in Parquet), and semantic (combined datasets). It is possible to define stages (DEV, INT, PROD) in each layer to allow structured release and test without affecting PROD. Within each stage, it’s possible to create resources for storing actual data. Two resource types are associated with each database in a layer:

  • File store – S3 buckets for data storage
  • Database – AWS Glue databases for metadata sharing

Overview of the CLEA Landscape

The following diagram is a high-level overview of some of the technologies used for the extract, load, and transform (ELT) stages, as well as the final visualization and analysis layer. You might notice that this differs slightly from traditional ETL. The difference lies in when and where data transformation takes place. In ETL, data is transformed before it’s loaded into the data warehouse. In ELT, raw data is loaded into the data warehouse first, then it’s transformed directly within the warehouse. The ELT process has gained popularity with the rise of cloud-based, high-performance data warehouses, where transformation can be done more efficiently after loading.

Regardless of the method used, the goal is to provide high-quality, reliable data that can be used to drive business decisions.

CLEA Architecture

In this section, we take a closer look at the three essential stages mentioned previously: extract, load and transform.

Extract

The extract stage plays a pivotal role in the CLEA, serving as the initial step where data related to cost and usage and optimization is collected from a diverse range of sources within AWS. These sources encompass the AWS Cost and Usage Reports, Cost Explorer (and forecasting with Cost Explorer), Trusted Advisor, and Compute Optimizer. Furthermore, it fetches essential metadata from BMW Group’s internal system, offering a comprehensive view of the data across various dimensions, such as group, department, product, and applications in the later stages of data transformation.

The following diagram illustrates one of the data collection architectures that we use to collect Trusted Advisor data from nearly 4,500 AWS accounts and subsequently load that into Cloud Data Hub.

Let’s go through each numbered step as outlined in the architecture:

  1. A time-based rule in Amazon EventBridge triggers the CLEA Shared Workflow AWS Step Functions state machine.
  2. Based on the inputs, the Shared Workflow state machine invokes the Account Collector AWS Lambda function to retrieve AWS account details from AWS Organizations.
  3. The Account Collector Lambda function assumes an AWS Identity and Access Management (IAM) role to access linked account details via the Organizations API and writes them to Amazon Simple Queue Service (Amazon SQS) queues.
  4. The SQS queues trigger the Data Collector Lambda function using SQS Lambda triggers.
  5. The Data Collector Lambda function assumes an IAM role in each linked account to retrieve the relevant data and load it into the CDH source S3 bucket.
  6. When all linked accounts data is collected, the Shared Workflow state machine triggers an AWS Glue job for further data transformation.
  7. The AWS Glue job reads raw data from the CDH source bucket and transforms it into a compact Parquet format.

Load and transform

For the data transformations, we used an open-source data transformation tool called dbt (Data Build Tool), modifying and preprocessing the data through a number of abstract data layers:

  • Source – This layer contains the raw data the data source provides. The preferred data format is Parquet, but JSON, CSV, or plain text file are also allowed.
  • Prepared – The source layer is transformed and stored as the prepared layer in Parquet format for optimized columnar access. Initial cleaning, filtering, and basic transformations are performed in this layer.
  • Semantic – A semantic layer combines several prepared layer datasets to a single dataset that contains transformations, calculations, and business logic to deliver business-friendly insights.
  • QuickSight – QuickSight is the final presentation layer, which is directly ingested into QuickSight SPICE from Athena via incremental daily ingestion queries. These ingested datasets are used as a source in CLEA dashboards.

Overall, using dbt’s data modeling and the pay-as-you-go pricing of Athena, BMW Group can control costs by running efficient queries on demand. Furthermore, with the serverless architecture of Athena and dbt’s structured transformations, you can scale data processing without worrying about infrastructure management. In CLEA there are currently more than 120 dbt models implemented with complex transformations. The semantic layer is incrementally materialized and partially ingested into QuickSight with up to 4 TB of SPICE capacity. For dbt deployment and scheduling, we use GitHub Actions which allows us to introduce new dbt models and changes easily with automatic deployments and tests.

CLEA Access control

In this section, we explain how we implemented access control using row-level security in QuickSight and QuickSight embedding for authentication and authorization.

RLS for QuickSight

Row-level security (RLS) is a key feature that governs data access and privacy, which we implemented for CLEA. RLS is a mechanism that allows us to control the visibility of data at the row level based on user attributes. In essence, it ensures that users can only access the data that they are authorized to view, adding an additional layer of data protection within the QuickSight environment.

Understanding the importance of RLS requires a broader view of the data landscape. In organizations where multiple users interact with the same datasets but require different access levels due to their roles, RLS becomes a pivotal tool. It ensures data security and compliance with privacy regulations, preventing unauthorized access to sensitive data. Additionally, it offers a tailored user experience by displaying only relevant data to the user, thereby enhancing the effectiveness of data analysis.

For CLEA, we collected BMW Group metadata such as department, application, and organization, which are quite important to allow users to only see the accounts within their department, application, organization, and so on. This is achieved using both a user name and group name for access control. We use the user name for user-specific access control and the group name for adding some users to a specific group to extend their permissions for different use cases.

Lastly, because there are many dashboards created by CLEA, we also control which users a unique user can see and also the data itself in the dashboard. This is done at the group level. By default, all users are assigned to CLEA-READER, which is granted access to core dashboards that we want to share with users, but there are different groups that allow users to see additional dashboards after they’re assigned to that group.

The RLS dataset is refreshed daily to catch recent changes regarding new user additions, group changes, or any other user access changes. This dataset is also ingested to SPICE daily, which automatically updates all datasets restricted via this RLS dataset.

QuickSight embedding

CLEA is a cross-platform application that provides secure access to QuickSight embedded content with custom-built authentication and authorization logic that sits on top of BMW Group identity and role management services (referred to as BMW IAM).

CLEA provides access to sensitive data to multiple users with different permissions, which is why it is designed with fine-grained access control rules. It enforces access control using role-based access control (RBAC) and attribute-based access control (ABAC) models at two different levels:

  • At the dashboard level via QuickSight user groups (RBAC)
  • At the dashboard data level via QuickSight RLS (RBAC and ABAC)

Dashboard-level permissions define the list of dashboards users are able to visualize.

Dashboard data-level permissions define the subsets of dashboard data shown to the user and are applied using RLS with the user attributes mentioned earlier. Although the majority of roles defined in CLEA are used for dashboard-level permissions, some specific roles are strategically defined to grant permissions at the dashboard data level, taking priority over the ABAC model.

BMW has a defined set of guidelines suggesting the usage of their IAM services as the single source of truth for identity and access control, which the team took into careful consideration when designing the authentication and authorization processes for CLEA.

Upon their first login, users are automatically registered in CLEA and assigned a base role that grants them access to a basic set of dashboards.

The process of registering users in CLEA consists of mapping a user’s identity as retrieved from BMW’s identity provider (IdP) to a QuickSight user, then assigning the newly created user to the respective QuickSight user group.

For users that require more extensive permissions (at one of the levels mentioned before), it is possible to order additional role assignments via BMW’s self-service portal for role management. Authorized reviewers will then review it and either accept or reject the role assignments.

Role assignments will take effect the next time the user logs in, at which time the user’s assigned roles in BMW Group IAM are synced to the user’s QuickSight groups—internally referred to as the identity and permissions sync. As shown in the following diagram, the sync groups step calculates which users’ group memberships should be kept, created, and deleted following the logic.

Usage Insights

Amazon CloudWatch plays an indispensable role in enhancing the efficiency and usability of CLEA dashboards. Not only does CloudWatch offer real-time monitoring of AWS resources, but it also allows to track user activity and dashboard utilization. By analyzing usage metrics and logs, we can see who has logged in to the CLEA dashboards, what features are most frequently accessed, and how long users interact with various elements. These insights are invaluable for making data-driven decisions on how to improve the dashboards for a better user experience. Through the intuitive interface of CloudWatch, it’s possible to set up alarms for alerting about abnormal activities or performance issues. Ultimately, employing CloudWatch for monitoring offers a comprehensive view of both system health and user engagement, helping us refine and enhance our dashboards continually.

Conclusion

BMW Group’s CLEA platform offers a comprehensive and effective solution to manage and optimize cloud resources. By providing full transparency into cloud costs, usage, and efficiency, CLEA offers insights from high-level overviews to granular details at the service, resource, and operational level.

CLEA aggregates data from various sources, enabling a detailed roadmap of the cloud operations, tracking footprints across primes, departments, products, applications, resources, and tags. This dynamic vision helps identify trends, anticipate future needs, and make strategic decisions.

Future plans for CLEA include enhancing capabilities with data consistency and accuracy, integrating additional sources like Amazon S3 Storage Lens for deeper insights, and introducing Amazon QuickSight Q for intelligent recommendations powered by machine learning, further streamlining cloud operations.

By following the practices here, you can unlock the potential of efficient cloud resource management by implementing Cloud Intelligence Dashboards, providing you with precise insights into costs, savings, and operational effectiveness.


About the Authors

Philipp Karg is Lead FinOps Engineer at BMW Group and founder of the CLEA platform. He focus on boosting cloud efficiency initiatives and establishing a cost-aware culture within the company to ultimately leverage the cloud in a sustainable way.

Alex Gutfreund is Head of Product and Technology Integration at the BMW Group. He spearheads the digital transformation with a particular focus on platforms ecosystems and efficiencies. With extensive experience at the interface of business and IT, he drives change and makes an impact in various organizations. His industry knowledge spans from automotive, semiconductor, public transportation, and renewable energies.

Cizer Pereira is a Senior DevOps Architect at AWS Professional Services. He works closely with AWS customers to accelerate their journey to the cloud. He has a deep passion for Cloud Native and DevOps, and in his free time, he also enjoys contributing to open-source projects.

Selman Ay is a Data Architect in the AWS Professional Services team. He has worked with customers from various industries such as e-commerce, pharma, automotive and finance to build scalable data architectures and generate insights from the data. Outside of work, he enjoys playing tennis and engaging in outdoor activities.

Nick McCarthy is a Senior Machine Learning Engineer in the AWS Professional Services team. He has worked with AWS clients across various industries including healthcare, finance, sports, telecoms and energy to accelerate their business outcomes through the use of AI/ML. Outside of work Nick loves to travel, exploring new cuisines and cultures in the process.

Miguel Henriques is a Cloud Application Architect in the AWS Professional Services team with 4 years of experience in the automotive industry delivering cloud native solutions. In his free time, he is constantly looking for advancements in the web development space and searching for the next great pastel de nata.

Clean up your Excel and CSV files without writing code using AWS Glue DataBrew

Post Syndicated from Ismail Makhlouf original https://aws.amazon.com/blogs/big-data/clean-up-your-excel-and-csv-files-without-writing-code-using-aws-glue-databrew/

Managing data within an organization is complex. Handling data from outside the organization adds even more complexity. As the organization receives data from multiple external vendors, it often arrives in different formats, typically Excel or CSV files, with each vendor using their own unique data layout and structure. In this blog post, we’ll explore a solution that streamlines this process by leveraging the capabilities of AWS Glue DataBrew.

DataBrew is an excellent tool for data quality and preprocessing. You can use its built-in transformations, recipes, as well as integrations with the AWS Glue Data Catalog and Amazon Simple Storage Service (Amazon S3) to preprocess the data in your landing zone, clean it up, and send it downstream for analytical processing.

In this post, we demonstrate the following:

  • Extracting non-transactional metadata from the top rows of a file and merging it with transactional data
  • Combining multi-line rows into single-line rows
  • Extracting unique identifiers from within strings or text

Solution overview

For this use case, imagine you’re a data analyst working at your organization. The sales leadership have requested a consolidated view of the net sales they are making from each of the organization’s suppliers. Unfortunately, this information is not available in a database. The sales data comes from each supplier in layouts like the following example.

However, with hundreds of resellers, manually extracting the information at the top is not feasible. Your goal is to clean up and flatten the data into the following output layout.

image2

To achieve this, you can use pre-built transformations in DataBrew to quickly get the data in the layout you want.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Connect to the dataset

The first thing we need to do is upload the input dataset to Amazon S3. Create an S3 bucket for the project and create a folder to upload the raw input data. The output data will be stored in another folder in a later step.

Next, we need to connect DataBrew to our CSV file. We create what we call a dataset, which
is an artifact that points to whatever data source we will be using. Navigate to “Datasets” on
the left hand menu.

Ensure the Column header values field is set to Add default header. The input CSV has an irregular format, so the first row will not have the needed column values.

Create a project

To create a new project, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose Create project.
  3. For Project name, enter FoodMartSales-AllUpProject.
  4. For Attached recipe, choose Create new recipe.
  5. For Recipe name, enter FoodMartSales-AllUpProject-recipe.
  6. For Select a dataset, select My datasets.
  7. Select the FoodMartSales-AllUp dataset.
  8. Under Permissions, for Role name, choose the IAM role you created as a prerequisite or create a new role.
  9. Choose Create project.

After the project is opened, an interactive session is created where you can author transformations on a sample of the data.

Extract non-transactional metadata from within the contents of the file and merge it with transactional data

In this section, we consider data that has metadata on the first few rows of the file, followed by transactional data. We walk through how to extract data relevant to the whole file from the top of the document and combine it with the transactional data into one flat table.

Extract metadata from the header and remove invalid rows

Complete the following steps to extract metadata from the header:

  1. Choose Conditions and then choose IF.
  2. For Matching conditions, choose Match all conditions.
  3. For Source, choose Value of and Column_1.
  4. For Logical condition, choose Is exactly.
  5. For Enter a value, choose Enter custom value and enter RESELLER NAME.
  6. For Flag result value as, choose Custom value.
  7. For Value if true, choose Select source column and set Value of to Column_2.
  8. For Value if false, choose Enter custom value and enter INVALID.
  9. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Name value extracted to a column by itself.

Next, you remove invalid rows and fill the rows with the Reseller Name value.

  1. Choose Clean and then choose Custom values.
  2. For Source column, choose ResellerName.
  3. For Specify values to remove, choose Custom value.
  4. For Values to remove, choose Invalid.
  5. For Apply transform to, choose All rows.
  6. Choose Apply.
  7. Choose Missing and then choose Fill with most frequent value.
  8. For Source column, choose FirstTransactionDate.
  9. For Missing value action, choose Fill with most frequent value.
  10. For Apply transform to, choose All rows.
  11. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Name value extracted to a column by itself.

Repeat the same steps in this section for the rest of the metadata, including Reseller Email Address, Reseller ID, and First Transaction Date.

Promote column headers and clean up data

To promote column headers, complete the following steps:

  1. Reorder the columns to put the metadata columns to the left of the dataset by choosing Column, Move column, and Start of the table.
  2. Rename the columns with the appropriate names.

Now you can clean up some columns and rows.

  1. Delete unnecessary columns, such as Column_7.

You can also delete invalid rows by filtering out records that don’t have a transaction date value.

  1. Choose the ABC icon on the menu of the Transaction_Date column and choose date.

  2. For Handle invalid values, select Delete rows, then choose Apply.

The dataset should now have the metadata extracted and the column headers promoted.

Combine multi-line rows into single-line rows

The next issue to address is transactions pertaining to the same row that are split across multiple lines. In the following steps, we extract the needed data from the rows and merge it into single-line transactions. For this example specifically, the Reseller Margin data is split across two lines.


Complete the following steps to get the Reseller Margin value on the same line as the corresponding transaction. First, we identify the Reseller Margin rows and store them in a temporary column.

  1. Choose Conditions and then choose IF.
  2. For Matching conditions, choose Match all conditions.
  3. For Source, choose Value of and Transaction_ID.
  4. For Logical condition, choose Contains.
  5. For Enter a value, choose Enter custom value and enter Reseller Margin.
  6. For Flag result value as, choose Custom value.
  7. For Value if true, choose Select source column set Value of to TransactionAmount.
  8. For Value if false, choose Enter custom value and enter Invalid.
  9. For Destination column, choose ResellerMargin_Temp.
  10. Choose Apply.

Next, you shift the Reseller Margin value up one row.

  1. Choose Functions and then choose NEXT.
  2. For Source column, choose ResellerMargin_Temp.
  3. For Number of rows, enter 1.
  4. For Destination column, choose ResellerMargin.
  5. For Apply transform to, choose All rows.
  6. Choose Apply.

Next, delete the invalid rows.

  1. Choose Missing and then choose Remove missing rows.
  2. For Source column, choose TransactionDate.
  3. For Missing value action, choose Delete rows with missing values.
  4. For Apply transform to, choose All rows.
  5. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Margin value extracted to a column by itself.

With the data structured properly, we can move on to mining the cleaned data.

Extract unique identifiers from within strings and text

Many types of data contain important information stored as unstructured text in a cell. In this section, we look at how to extract this data. Within the sample dataset, the BankTransferText column has valuable information around our resellers’ registered bank account numbers as well as the currency of the transaction, namely IBAN, SWIFT Code, and Currency.

Complete the following steps to extract IBAN, SWIFT code, and Currency into separate columns. First, you extract the IBAN number from the text using a regular expression (regex).

  1. Choose Extract and then choose Custom value or pattern.
  2. For Create column options, choose Extract values.
  3. For Source column, choose BankTransferText.
  4. For Extract options, choose Custom value or pattern.
  5. For Values to extract, enter [a-zA-Z][a-zA-Z][0-9]{2}[A-Z0-9]{1,30}.
  6. For Destination column, choose IBAN.
  7. For Apply transform to, choose All rows.
  8. Choose Apply.
  9. Extract the SWIFT code from the text using a regex following the same steps used to extract the IBAN number, but using the following regex instead: (?!^)(SWIFT Code: )([A-Z]{2}[A-Z0-9]+).

Next, remove the SWIFT Code: label from the extracted text.

  1. Choose Remove and then choose Custom values.
  2. For Source column, choose SWIFT Code.
  3. For Specify values to remove, choose Custom value.
  4. For Apply transform to, choose All rows.
  5. Extract the currency from the text using a regex following the same steps used to extract the IBAN number, but using the following regex instead: (?!^)(Currency: )([A-Z]{3}).
  6. Remove the Currency: label from the extracted text following the same steps used to remove the SWIFT Code: label.

You can clean up by deleting any unnecessary columns.

  1. Choose Column and then choose Delete.
  2. For Source columns, choose BankTransferText.
  3. Choose Apply.
  4. Repeat for any remaining columns.

Your dataset should now look like the following screenshot, with IBAN, SWIFT Code, and Currency extracted to separate columns.

Write the transformed data to Amazon S3

With all the steps captured in the recipe, the last step is to write the transformed data to Amazon S3.

  1. On the DataBrew console, choose Run job.
  1. For Job name, enter FoodMartSalesToDataLake.
  2. For Output to, choose Amazon S3.
  3. For File type, choose CSV.
  4. For Delimiter, choose Comma (,).
  5. For Compression, choose None.
  6. For S3 bucket owners’ account, select Current AWS account.
  7. For S3 location, enter s3://{name of S3 bucket}/clean/.
  8. For Role name, choose the IAM role created as a prerequisite or create a new role.
  9. Choose Create and run job.
  10. Go to the Jobs tab and wait for the job to complete.
  11. Navigate to the job output folder on the Amazon S3 console.
  12. Download the CSV file and view the transformed output.

Your dataset should look similar to the following screenshot.

Clean up

To optimize cost, make sure to clean up the resources deployed for this project by completing the following steps:

  1. Delete every DataBrew project along with their linked recipes.
  2. Delete all the DataBrew datasets.
  3. Delete the contents in your S3 bucket.
  4. Delete the S3 bucket.

Conclusion

The reality of exchanging data with suppliers is that we can’t always control the shape of the input data. With DataBrew, we can use a list of pre-built transformations and repeatable steps to transform incoming data into a desired layout and extract relevant data and insights from Excel or CSV files. Start using DataBrew today and transform 3 rd party files into structured datasets ready for consumption by your business.


About the Author

Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily works with direct-to-consumer platform companies in the ecommerce, FinTech, PropTech, and HealthTech space to achieve their business objectives with well-architected data platforms.

Connect your data for faster decisions with AWS

Post Syndicated from Rahul Pathak original https://aws.amazon.com/blogs/big-data/connect-your-data-for-faster-decisions-with-aws/

The most impactful data-driven insights come from connecting the dots between all your data sources—across departments, services, on-premises tools, and third-party applications. But typically, connecting data requires complex extract, transform, and load (ETL) pipelines, taking hours or days. That’s too slow for decision-making speed. ETL needs to be easier and sometimes eliminated.

AWS is investing in addressing this in several ways. First, for common use cases where ETL is repeated with little value-add, we’re integrating services to decrease or eliminate the need for ETL. Second, organizations still need transformations like cleansing, deduplication, and combining datasets for analysis and machine learning (ML). For these, AWS Glue provides fast, scalable data transformation. Third, AWS continues adding support for more data sources including connections to software as a service (SaaS) applications, on-premises applications, and other clouds so organizations can act on their data.

In this post, we discuss how we’re delivering on these investments with a number of data integration innovations that span AWS databases, analytics, business intelligence (BI), and ML services.

Amazon Aurora MySQL zero-ETL integration with Amazon Redshift is now generally available

In June 2023, we announced the public preview of Amazon Aurora MySQL-Compatible Edition zero-ETL integration with Amazon Redshift. We are thrilled to announce that this zero-ETL integration is now generally available. Amazon Aurora MySQL zero-ETL integration with Amazon Redshift processes over 1 million transactions per minute, enabling near-real-time analytics. Within seconds of new data coming into Amazon Aurora MySQL, the data is replicated to Amazon Redshift. Updates in Amazon Aurora MySQL are automatically and continuously propagated to Amazon Redshift. Customers and partners can derive tremendous time savings by reducing traditional ETL bottlenecks. They can now analyze business metrics in near-real time and make data-driven decisions faster than ever before.

In the retail industry, for example, Infosys wanted to gain faster insights about their business, such as best-selling products and high-revenue stores, based on transactions in a store management system. They used Amazon Aurora MySQL zero-ETL integration with Amazon Redshift to achieve this. With this integration, Infosys replicated Aurora data to Amazon Redshift and created Amazon QuickSight dashboards for product managers and channel leaders in just a few seconds, instead of several hours. Now, as part of Infosys Cobalt and Infosys Topaz blueprints, enterprises can have near real-time analytics on transactional data, which can help them make informed decisions related to store management.

– Sunil Senan, SVP and Global Head of Data, Analytics, and AI, Infosys

Amazon SageMaker Canvas integration with Amazon QuickSight

We are empowering business analysts to create predictive, interactive dashboards by connecting Amazon SageMaker Canvas, our no-code ML service, with Amazon QuickSight, our BI service. Business analysts use SageMaker Canvas to build ML models and generate predictions without needing to write code. They can then seamlessly integrate these predictions in QuickSight to create interactive dashboards that can be shared across their organization. This enables democratization of predictive insights for better decision-making.

Moreover, we have enabled deep, bidirectional integration between SageMaker Canvas and QuickSight. Business analysts can send ML models from SageMaker Canvas to QuickSight and run predictions from within QuickSight. Analysts can now also directly send data from QuickSight to SageMaker Canvas with just a few clicks to rapidly build ML models using a simple point-and-click interface, without needing to create or maintain complex data pipelines between the two services. This integration empowers users to go from data to predictions and visualizations faster than ever.

Connecting to SaaS applications

AWS services already connect to hundreds of AWS and third-party data sources. Data engineers can use services such as Amazon AppFlow and AWS Glue to make data quickly accessible from diverse sources. This enables organizations to derive unified insights across siloed datasets. We recently added new Amazon AppFlow and AWS Glue integrations to our existing portfolio.

Amazon AppFlow now supports concurrent processing for data transfers from SAP applications

Amazon AppFlow, a fully managed integration service that helps you securely transfer data between AWS services and SaaS applications, now supports concurrent processing and configurable page sizes for faster data transfers from SAP. This reduces the time taken to move SAP data into AWS data and artificial intelligence (AI) services.

Google BigQuery connectivity for AWS Glue for Apache Spark is now generally available

AWS Glue for Apache Spark has added native connectivity to Google BigQuery, enabling reading and writing of BigQuery data directly without the need to install or manage libraries. You can now add BigQuery as a source or target in AWS Glue Studio’s visual interface or directly in an AWS Glue ETL script.

Summary

The data integration innovations we have highlighted show our commitment to empowering organizations to easily connect their data. Whether it’s achieving near-real-time insights, democratizing predictive analytics, or connecting diverse data sources, we are focused on helping you derive more value from your data. With the new capabilities of Amazon Aurora MySQL, Amazon Redshift, SageMaker Canvas, QuickSight, Amazon AppFlow, and AWS Glue, data engineers and business analysts can break down data silos to uncover insights.

Visit Data integration with AWS to learn more.


About the authors

Rahul Pathak is VP of Relational Database Engines, leading Amazon Aurora, Amazon Redshift, and Amazon QLDB. Prior to his current role, he was VP of Analytics at AWS, where he worked across the entire AWS database portfolio. He has co-founded two companies, one focused on digital media analytics and the other on IP-geolocation.

G2 Krishnamoorthy is VP of Analytics, leading AWS data lake services, data integration, Amazon OpenSearch Service, and Amazon QuickSight. Prior to his current role, G2 built and ran the Analytics and ML Platform at Facebook/Meta, and built various parts of the SQL Server database, Azure Analytics, and Azure ML at Microsoft.

Unlock scalable analytics with AWS Glue and Google BigQuery

Post Syndicated from Kartikay Khator original https://aws.amazon.com/blogs/big-data/unlock-scalable-analytics-with-aws-glue-and-google-bigquery/

Data integration is the foundation of robust data analytics. It encompasses the discovery, preparation, and composition of data from diverse sources. In the modern data landscape, accessing, integrating, and transforming data from diverse sources is a vital process for data-driven decision-making. AWS Glue, a serverless data integration and extract, transform, and load (ETL) service, has revolutionized this process, making it more accessible and efficient. AWS Glue eliminates complexities and costs, allowing organizations to perform data integration tasks in minutes, boosting efficiency.

This blog post explores the newly announced managed connector for Google BigQuery and demonstrates how to build a modern ETL pipeline with AWS Glue Studio without writing code.

Overview of AWS Glue

AWS Glue is a serverless data integration service that makes it easier to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration, so you can start analyzing your data and putting it to use in minutes instead of months. AWS Glue provides both visual and code-based interfaces to make data integration easier. Users can more easily find and access data using the AWS Glue Data Catalog. Data engineers and ETL (extract, transform, and load) developers can visually create, run, and monitor ETL workflows in a few steps in AWS Glue Studio. Data analysts and data scientists can use AWS Glue DataBrew to visually enrich, clean, and normalize data without writing code.

Introducing Google BigQuery Spark connector

To meet the demands of diverse data integration use cases, AWS Glue now offers a native spark connector for Google BigQuery. Customers can now use AWS Glue 4.0 for Spark to read from and write to tables in Google BigQuery. Additionally, you can read an entire table or run a custom query and write your data using direct and indirect writing methods. You connect to BigQuery using service account credentials stored securely in AWS Secrets Manager.

Benefits of Google BigQuery Spark connector

  • Seamless integration: The native connector offers an intuitive and streamlined interface for data integration, reducing the learning curve.
  • Cost efficiency: Building and maintaining custom connectors can be expensive. The native connector provided by AWS Glue is a cost-effective alternative.
  • Efficiency: Data transformation tasks that previously took weeks or months can now be accomplished within minutes, optimizing efficiency.

Solution overview

In this example, you create two ETL jobs using AWS Glue with the native Google BigQuery connector.

  1. Query a BigQuery table and save the data into Amazon Simple Storage Service (Amazon S3) in Parquet format.
  2. Use the data extracted from the first job to transform and create an aggregated result to be stored in Google BigQuery.

solution architecture

Prerequisites

The dataset used in this solution is the NCEI/WDS Global Significant Earthquake Database, with a global listing of over 5,700 earthquakes from 2150 BC to the present. Copy this public data into your Google BigQuery project or use your existing dataset.

Configure BigQuery connections

To connect to Google BigQuery from AWS Glue, see Configuring BigQuery connections. You must create and store your Google Cloud Platform credentials in a Secrets Manager secret, then associate that secret with a Google BigQuery AWS Glue connection.

Set up Amazon S3

Every object in Amazon S3 is stored in a bucket. Before you can store data in Amazon S3, you must create an S3 bucket to store the results.

To create an S3 bucket:

  1. On the AWS Management Console for Amazon S3, choose Create bucket.
  2. Enter a globally unique Name for your bucket; for example, awsglue-demo.
  3. Choose Create bucket.

Create an IAM role for the AWS Glue ETL job

When you create the AWS Glue ETL job, you specify an AWS Identity and Access Management (IAM) role for the job to use. The role must grant access to all resources used by the job, including Amazon S3 (for any sources, targets, scripts, driver files, and temporary directories), and Secrets Manager.

For instructions, see Configure an IAM role for your ETL job.

Solution walkthrough

Create a visual ETL job in AWS Glue Studio to transfer data from Google BigQuery to Amazon S3

  1. Open the AWS Glue console.
  2. In AWS Glue, navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Enter a Name for your AWS Glue job, for example, bq-s3-dataflow.
  4. Select Google BigQuery as the data source.
    1. Enter a name for your Google BigQuery source node, for example, noaa_significant_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Enter a Parent project, for example, bigquery-public-datasources.
    4. Select Choose a single table for the BigQuery Source.
    5. Enter the table you want to migrate in the form [dataset].[table], for example, noaa_significant_earthquakes.earthquakes.
      big query data source for bq to amazon s3 dataflow
  5. Next, choose the data target as Amazon S3.
    1. Enter a Name for the target Amazon S3 node, for example, earthquakes.
    2. Select the output data Format as Parquet.
    3. Select the Compression Type as Snappy.
    4. For the S3 Target Location, enter the bucket created in the prerequisites, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    5. You should replace <YourBucketName> with the name of your bucket.
      s3 target node for bq to amazon s3 dataflow
  6. Next go to the Job details. In the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for bq to amazon s3 dataflow
  7. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue will run the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

run and monitor bq to amazon s3 dataflow

Data validation

  1. After the job has run successfully, validate the data in your S3 bucket to ensure it matches your expectations. You can see the results using Amazon S3 Select.

review results in amazon s3 from the bq to s3 dataflow run

Automate and schedule

  1. If needed, set up job scheduling to run the ETL process regularly. You can use AWS to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

You’ve successfully configured an AWS Glue ETL job to transfer data from Google BigQuery to Amazon S3. Next, you create the ETL job to aggregate this data and transfer it to Google BigQuery.

Finding earthquake hotspots with AWS Glue Studio Visual ETL.

  1. Open AWS Glue console.
  2. In AWS Glue navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Provide a name for your AWS Glue job, for example, s3-bq-dataflow.
  4. Choose Amazon S3 as the data source.
    1. Enter a Name for the source Amazon S3 node, for example, earthquakes.
    2. Select S3 location as the S3 source type.
    3. Enter the S3 bucket created in the prerequisites as the S3 URL, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    4. You should replace <YourBucketName> with the name of your bucket.
    5. Select the Data format as Parquet.
    6. Select Infer schema.
      amazon s3 source node for s3 to bq dataflow
  5. Next, choose Select Fields transformation.
    1. Select earthquakes as Node parents.
    2. Select fields: id, eq_primary, and country.
      select field node for amazon s3 to bq dataflow
  6. Next, choose Aggregate transformation.
    1. Enter a Name, for example Aggregate.
    2. Choose Select Fields as Node parents.
    3. Choose eq_primary and country as the group by columns.
    4. Add id as the aggregate column and count as the aggregation function.
      aggregate node for amazon s3 to bq dataflow
  7. Next, choose RenameField transformation.
    1. Enter a name for the source Amazon S3 node, for example, Rename eq_primary.
    2. Choose Aggregate as Node parents.
    3. Choose eq_primary as the Current field name and enter earthquake_magnitude as the New field name.
      rename eq_primary field for amazon s3 to bq dataflow
  8. Next, choose RenameField transformation
    1. Enter a name for the source Amazon S3 node, for example, Rename count(id).
    2. Choose Rename eq_primary as Node parents.
    3. Choose count(id) as the Current field name and enter number_of_earthquakes as the New field name.
      rename cound(id) field for amazon s3 to bq dataflow
  9. Next, choose the data target as Google BigQuery.
    1. Provide a name for your Google BigQuery source node, for example, most_powerful_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Select Parent project, for example, bigquery-public-datasources.
    4. Enter the name of the Table you want to create in the form [dataset].[table], for example, noaa_significant_earthquakes.most_powerful_earthquakes.
    5. Choose Direct as the Write method.
      bq destination for amazon s3 to bq dataflow
  10. Next go to the Job details tab and in the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for amazon s3 to bq dataflow
  11. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue runs the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

monitor and run for amazon s3 to bq dataflow

Data validation

  1. After the job has run successfully, validate the data in your Google BigQuery dataset. This ETL job returns a list of countries where the most powerful earthquakes have occurred. It provides these by counting the number of earthquakes for a given magnitude by country.

aggregated results for amazon s3 to bq dataflow

Automate and schedule

  1. You can set up job scheduling to run the ETL process regularly. AWS Glue allows you to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

That’s it! You’ve successfully set up an AWS Glue ETL job to transfer data from Amazon S3 to Google BigQuery. You can use this integration to automate the process of data extraction, transformation, and loading between these two platforms, making your data readily available for analysis and other applications.

Clean up

To avoid incurring charges, clean up the resources used in this blog post from your AWS account by completing the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. From the list of jobs, select the job bq-s3-data-flow and delete it.
  3. From the list of jobs, select the job s3-bq-data-flow and delete it.
  4. On the AWS Glue console, choose Connections in the navigation pane under Data Catalog.
  5. Choose the BiqQuery connection you created and delete it.
  6. On the Secrets Manager console, choose the secret you created and delete it.
  7. On the IAM console, choose Roles in the navigation pane, then select the role you created for the AWS Glue ETL job and delete it.
  8. On the Amazon S3 console, search for the S3 bucket you created, choose Empty to delete the objects, then delete the bucket.
  9. Clean up resources in your Google account by deleting the project that contains the Google BigQuery resources. Follow the documentation to clean up the Google resources.

Conclusion

The integration of AWS Glue with Google BigQuery simplifies the analytics pipeline, reduces time-to-insight, and facilitates data-driven decision-making. It empowers organizations to streamline data integration and analytics. The serverless nature of AWS Glue means no infrastructure management, and you pay only for the resources consumed while your jobs are running. As organizations increasingly rely on data for decision-making, this native spark connector provides an efficient, cost-effective, and agile solution to swiftly meet data analytics needs.

If you’re interested to see how to read from and write to tables in Google BigQuery in AWS Glue, take a look at step-by-step video tutorial. In this tutorial, we walk through the entire process, from setting up the connection to running the data transfer flow. For more information on AWS Glue, visit AWS Glue.

Appendix

If you are looking to implement this example, using code instead of the AWS Glue console, use the following code snippets.

Reading data from Google BigQuery and writing data into Amazon S3

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Big Query Table 
noaa_significant_earthquakes_node1697123333266 = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "sourceType": "table",
            "table": "noaa_significant_earthquakes.earthquakes",
        },
        transformation_ctx="noaa_significant_earthquakes_node1697123333266",
    )
)
# STEP-2 Write the data read from Big Query Table into S3
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697157772747 = glueContext.write_dynamic_frame.from_options(
    frame=noaa_significant_earthquakes_node1697123333266,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/",
        "partitionKeys": [],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="earthquakes_node1697157772747",
)

job.commit()

Reading and aggregating data from Amazon S3 and writing into Google BigQuery

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame
from pyspark.sql import functions as SqlFuncs

def sparkAggregate(
    glueContext, parentFrame, groups, aggs, transformation_ctx
) -> DynamicFrame:
    aggsFuncs = []
    for column, func in aggs:
        aggsFuncs.append(getattr(SqlFuncs, func)(column))
    result = (
        parentFrame.toDF().groupBy(*groups).agg(*aggsFuncs)
        if len(groups) > 0
        else parentFrame.toDF().agg(*aggsFuncs)
    )
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Amazon S3 bucket
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697218776818 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/"
        ],
        "recurse": True,
    },
    transformation_ctx="earthquakes_node1697218776818",
)

# STEP-2 Select fields
SelectFields_node1697218800361 = SelectFields.apply(
    frame=earthquakes_node1697218776818,
    paths=["id", "eq_primary", "country"],
    transformation_ctx="SelectFields_node1697218800361",
)

# STEP-3 Aggregate data
Aggregate_node1697218823404 = sparkAggregate(
    glueContext,
    parentFrame=SelectFields_node1697218800361,
    groups=["eq_primary", "country"],
    aggs=[["id", "count"]],
    transformation_ctx="Aggregate_node1697218823404",
)

Renameeq_primary_node1697219483114 = RenameField.apply(
    frame=Aggregate_node1697218823404,
    old_name="eq_primary",
    new_name="earthquake_magnitude",
    transformation_ctx="Renameeq_primary_node1697219483114",
)

Renamecountid_node1697220511786 = RenameField.apply(
    frame=Renameeq_primary_node1697219483114,
    old_name="`count(id)`",
    new_name="number_of_earthquakes",
    transformation_ctx="Renamecountid_node1697220511786",
)

# STEP-1 Write the aggregated data in Google BigQuery
most_powerful_earthquakes_node1697220563923 = (
    glueContext.write_dynamic_frame.from_options(
        frame=Renamecountid_node1697220511786,
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "writeMethod": "direct",
            "table": "noaa_significant_earthquakes.most_powerful_earthquakes",
        },
        transformation_ctx="most_powerful_earthquakes_node1697220563923",
    )
)

job.commit()


About the authors

Kartikay Khator is a Solutions Architect in Global Life Sciences at Amazon Web Services (AWS). He is passionate about building innovative and scalable solutions to meet the needs of customers, focusing on AWS Analytics services. Beyond the tech world, he is an avid runner and enjoys hiking.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Anshul SharmaAnshul Sharma is a Software Development Engineer in AWS Glue Team. He is driving the connectivity charter which provide Glue customer native way of connecting any Data source (Data-warehouse, Data-lakes, NoSQL etc) to Glue ETL Jobs. Beyond the tech world, he is a cricket and soccer lover.

Create, train, and deploy Amazon Redshift ML model integrating features from Amazon SageMaker Feature Store

Post Syndicated from Anirban Sinha original https://aws.amazon.com/blogs/big-data/create-train-and-deploy-amazon-redshift-ml-model-integrating-features-from-amazon-sagemaker-feature-store/

Amazon Redshift is a fast, petabyte-scale, cloud data warehouse that tens of thousands of customers rely on to power their analytics workloads. Data analysts and database developers want to use this data to train machine learning (ML) models, which can then be used to generate insights on new data for use cases such as forecasting revenue, predicting customer churn, and detecting anomalies. Amazon Redshift ML makes it easy for SQL users to create, train, and deploy ML models using SQL commands familiar to many roles such as executives, business analysts, and data analysts. We covered in a previous post how you can use data in Amazon Redshift to train models in Amazon SageMaker, a fully managed ML service, and then make predictions within your Redshift data warehouse.

Redshift ML currently supports ML algorithms such as XGBoost, multilayer perceptron (MLP), KMEANS, and Linear Learner. Additionally, you can import existing SageMaker models into Amazon Redshift for in-database inference or remotely invoke a SageMaker endpoint.

Amazon SageMaker Feature Store is a fully managed, purpose-built repository to store, share, and manage features for ML models. However, one challenge in training a production-ready ML model using SageMaker Feature Store is access to a diverse set of features that aren’t always owned and maintained by the team that is building the model. For example, an ML model to identify fraudulent financial transactions needs access to both identifying (device type, browser) and transaction (amount, credit or debit, and so on) related features. As a data scientist building an ML model, you may have access to the identifying information but not the transaction information, and having access to a feature store solves this.

In this post, we discuss the combined feature store pattern, which allows teams to maintain their own local feature stores using a local Redshift table while still being able to access shared features from the centralized feature store. In a local feature store, you can store sensitive data that can’t be shared across the organization for regulatory and compliance reasons.

We also show you how to use familiar SQL statements to create and train ML models by combining shared features from the centralized store with local features and use these models to make in-database predictions on new data for use cases such as fraud risk scoring.

Overview of solution

For this post, we create an ML model to predict if a transaction is fraudulent or not, given the transaction record. To build this, we need to engineer features that describe an individual credit card’s spending pattern, such as the number of transactions or the average transaction amount, and also information about the merchant, the cardholder, the device used to make the payment, and any other data that may be relevant to detecting fraud.

To get started, we need an Amazon Redshift Serverless data warehouse with the Redshift ML feature enabled and an Amazon SageMaker Studio environment with access to SageMaker Feature Store. For an introduction to Redshift ML and instructions on setting it up, see Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML.

We also need an offline feature store to store features in feature groups. The offline store uses an Amazon Simple Storage Service (Amazon S3) bucket for storage and can also fetch data using Amazon Athena queries. For an introduction to SageMaker Feature Store and instructions on setting it up, see Getting started with Amazon SageMaker Feature Store.

The following diagram illustrates solution architecture.

The workflow contains the following steps:

  1. Create the offline feature group in SageMaker Feature Store and ingest data into the feature group.
  2. Create a Redshift table and load local feature data into the table.
  3. Create an external schema for Amazon Redshift Spectrum to access the offline store data stored in Amazon S3 using the AWS Glue Data Catalog.
  4. Train and validate a fraud risk scoring ML model using local feature data and external offline feature store data.
  5. Use the offline feature store and local store for inference.

Dataset

To demonstrate this use case, we use a synthetic dataset with two tables: identity and transactions. They can both be joined by the TransactionID column. The transaction table contains information about a particular transaction, such as amount, credit or debit card, and so on, and the identity table contains information about the user, such as device type and browser. The transaction must exist in the transaction table, but might not always be available in the identity table.

The following is an example of the transactions dataset.

The following is an example of the identity dataset.

Let’s assume that across the organization, data science teams centrally manage the identity data and process it to extract features in a centralized offline feature store. The data warehouse team ingests and analyzes transaction data in a Redshift table, owned by them.

We work through this use case to understand how the data warehouse team can securely retrieve the latest features from the identity feature group and join it with transaction data in Amazon Redshift to create a feature set for training and inferencing a fraud detection model.

Create the offline feature group and ingest data

To start, we set up SageMaker Feature Store, create a feature group for the identity dataset, inspect and process the dataset, and ingest some sample data. We then prepare the transaction features from the transaction data and store it in Amazon S3 for further loading into the Redshift table.

Alternatively, you can author features using Amazon SageMaker Data Wrangler, create feature groups in SageMaker Feature Store, and ingest features in batches using an Amazon SageMaker Processing job with a notebook exported from SageMaker Data Wrangler. This mode allows for batch ingestion into the offline store.

Let’s explore some of the key steps in this section.

  1. Download the sample notebook.
  2. On the SageMaker console, under Notebook in the navigation pane, choose Notebook instances.
  3. Locate your notebook instance and choose Open Jupyter.
  4. Choose Upload and upload the notebook you just downloaded.
  5. Open the notebook sagemaker_featurestore_fraud_redshiftml_python_sdk.ipynb.
  6. Follow the instructions and run all the cells up to the Cleanup Resources section.

The following are key steps from the notebook:

  1. We create a Pandas DataFrame with the initial CSV data. We apply feature transformations for this dataset.
    identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
    transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))
    
    identity_data = identity_data.round(5)
    transaction_data = transaction_data.round(5)
    
    identity_data = identity_data.fillna(0)
    transaction_data = transaction_data.fillna(0)
    
    # Feature transformations for this dataset are applied 
    # One hot encode card4, card6
    encoded_card_bank = pd.get_dummies(transaction_data["card4"], prefix="card_bank")
    encoded_card_type = pd.get_dummies(transaction_data["card6"], prefix="card_type")
    
    transformed_transaction_data = pd.concat(
        [transaction_data, encoded_card_type, encoded_card_bank], axis=1
    )

  2. We store the processed and transformed transaction dataset in an S3 bucket. This transaction data will be loaded later in the Redshift table for building the local feature store.
    transformed_transaction_data.to_csv("transformed_transaction_data.csv", header=False, index=False)
    s3_client.upload_file("transformed_transaction_data.csv", default_s3_bucket_name, prefix + "/training_input/transformed_transaction_data.csv")

  3. Next, we need a record identifier name and an event time feature name. In our fraud detection example, the column of interest is TransactionID.EventTime can be appended to your data when no timestamp is available. In the following code, you can see how these variables are set, and then EventTime is appended to both features’ data.
    # record identifier and event time feature names
    record_identifier_feature_name = "TransactionID"
    event_time_feature_name = "EventTime"
    
    # append EventTime feature
    identity_data[event_time_feature_name] = pd.Series(
        [current_time_sec] * len(identity_data), dtype="float64"
    )

  4. We then create and ingest the data into the feature group using the SageMaker SDK FeatureGroup.ingest API. This is a small dataset and therefore can be loaded into a Pandas DataFrame. When we work with large amounts of data and millions of rows, there are other scalable mechanisms to ingest data into SageMaker Feature Store, such as batch ingestion with Apache Spark.
    identity_feature_group.create(
        s3_uri=<S3_Path_Feature_Store>,
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=<role_arn>,
        enable_online_store=False,
    )
    
    identity_feature_group_name = "identity-feature-group"
    
    # load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
    identity_feature_group.load_feature_definitions(data_frame=identity_data)
    identity_feature_group.ingest(data_frame=identity_data, max_workers=3, wait=True)
    

  5. We can verify that data has been ingested into the feature group by running Athena queries in the notebook or running queries on the Athena console.

At this point, the identity feature group is created in an offline feature store with historical data persisted in Amazon S3. SageMaker Feature Store automatically creates an AWS Glue Data Catalog for the offline store, which enables us to run SQL queries against the offline data using Athena or Redshift Spectrum.

Create a Redshift table and load local feature data

To build a Redshift ML model, we build a training dataset joining the identity data and transaction data using SQL queries. The identity data is in a centralized feature store where the historical set of records are persisted in Amazon S3. The transaction data is a local feature for training data that needs to made available in the Redshift table.

Let’s explore how to create the schema and load the processed transaction data from Amazon S3 into a Redshift table.

  1. Create the customer_transaction table and load daily transaction data into the table, which you’ll use to train the ML model:
    DROP TABLE customer_transaction;
    CREATE TABLE customer_transaction (
      TransactionID INT,    
      isFraud INT,  
      TransactionDT INT,    
      TransactionAmt decimal(10,2), 
      card1 INT,    
      card2 decimal(10,2),card3 decimal(10,2),  
      card4 VARCHAR(20),card5 decimal(10,2),    
      card6 VARCHAR(20),    
      B1 INT,B2 INT,B3 INT,B4 INT,B5 INT,B6 INT,
      B7 INT,B8 INT,B9 INT,B10 INT,B11 INT,B12 INT,
      F1 INT,F2 INT,F3 INT,F4 INT,F5 INT,F6 INT,
      F7 INT,F8 INT,F9 INT,F10 INT,F11 INT,F12 INT,
      F13 INT,F14 INT,F15 INT,F16 INT,F17 INT,  
      N1 VARCHAR(20),N2 VARCHAR(20),N3 VARCHAR(20), 
      N4 VARCHAR(20),N5 VARCHAR(20),N6 VARCHAR(20), 
      N7 VARCHAR(20),N8 VARCHAR(20),N9 VARCHAR(20), 
      card_type_0  boolean,
      card_type_credit boolean,
      card_type_debit  boolean,
      card_bank_0  boolean,
      card_bank_american_express boolean,
      card_bank_discover  boolean,
      card_bank_mastercard  boolean,
      card_bank_visa boolean  
    );

  2. Load the sample data by using the following command. Replace your Region and S3 path as appropriate. You will find the S3 path in the S3 Bucket Setup For The OfflineStore section in the notebook or by checking the dataset_uri_prefix in the notebook.
    COPY customer_transaction
    FROM '<s3path>/transformed_transaction_data.csv' 
    IAM_ROLE default delimiter ',' 
    region 'your-region';

Now that we have created a local feature store for the transaction data, we focus on integrating a centralized feature store with Amazon Redshift to access the identity data.

Create an external schema for Redshift Spectrum to access the offline store data

We have created a centralized feature store for identity features, and we can access this offline feature store using services such as Redshift Spectrum. When the identity data is available through the Redshift Spectrum table, we can create a training dataset with feature values from the identity feature group and customer_transaction, joining on the TransactionId column.

This section provides an overview of how to enable Redshift Spectrum to query data directly from files on Amazon S3 through an external database in an AWS Glue Data Catalog.

  1. First, check that the identity-feature-group table is present in the Data Catalog under the sagemamker_featurestore database.
  2. Using Redshift Query Editor V2, create an external schema using the following command:
    CREATE EXTERNAL SCHEMA sagemaker_featurestore
    FROM DATA CATALOG
    DATABASE 'sagemaker_featurestore'
    IAM_ROLE default
    create external database if not exists;

All the tables, including identity-feature-group external tables, are visible under the sagemaker_featurestore external schema. In Redshift Query Editor v2, you can check the contents of the external schema.

  1. Run the following query to sample a few records—note that your table name may be different:
    Select * from sagemaker_featurestore.identity_feature_group_1680208535 limit 10;

  2. Create a view to join the latest data from identity-feature-group and customer_transaction on the TransactionId column. Be sure to change the external table name to match your external table name:
    create or replace view public.credit_fraud_detection_v
    AS select  "isfraud",
            "transactiondt",
            "transactionamt",
            "card1","card2","card3","card5",
             case when "card_type_credit" = 'False' then 0 else 1 end as card_type_credit,
             case when "card_type_debit" = 'False' then 0 else 1 end as card_type_debit,
             case when "card_bank_american_express" = 'False' then 0 else 1 end as card_bank_american_express,
             case when "card_bank_discover" = 'False' then 0 else 1 end as card_bank_discover,
             case when "card_bank_mastercard" = 'False' then 0 else 1 end as card_bank_mastercard,
             case when "card_bank_visa" = 'False' then 0 else 1 end as card_bank_visa,
            "id_01","id_02","id_03","id_04","id_05"
    from public.customer_transaction ct left join sagemaker_featurestore.identity_feature_group_1680208535 id
    on id.transactionid = ct.transactionid with no schema binding;

Train and validate the fraud risk scoring ML model

Redshift ML gives you the flexibility to specify your own algorithms and model types and also to provide your own advanced parameters, which can include preprocessors, problem type, and hyperparameters. In this post, we create a customer model by specifying AUTO OFF and the model type of XGBOOST. By turning AUTO OFF and using XGBoost, we are providing the necessary inputs for SageMaker to train the model. A benefit of this can be faster training times. XGBoost is as open-source version of the gradient boosted trees algorithm. For more details on XGBoost, refer to Build XGBoost models with Amazon Redshift ML.

We train the model using 80% of the dataset by filtering on transactiondt < 12517618. The other 20% will be used for inference. A centralized feature store is useful in providing the latest supplementing data for training requests. Note that you will need to provide an S3 bucket name in the create model statement. It will take approximately 10 minutes to create the model.

CREATE MODEL frauddetection_xgboost
FROM (select  "isfraud",
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05"
from credit_fraud_detection_v where transactiondt < 12517618
)
TARGET isfraud
FUNCTION ml_fn_frauddetection_xgboost
IAM_ROLE default
AUTO OFF
MODEL_TYPE XGBOOST
OBJECTIVE 'binary:logistic'
PREPROCESSORS 'none'
HYPERPARAMETERS DEFAULT EXCEPT(NUM_ROUND '100')
SETTINGS (S3_BUCKET <s3_bucket>);

When you run the create model command, it will complete quickly in Amazon Redshift while the model training is happening in the background using SageMaker. You can check the status of the model by running a show model command:

show model frauddetection_xgboost;

The output of the show model command shows that the model state is TRAINING. It also shows other information such as the model type and the training job name that SageMaker assigned.
After a few minutes, we run the show model command again:

show model frauddetection_xgboost;

Now the output shows the model state is READY. We can also see the train:error score here, which at 0 tells us we have a good model. Now that the model is trained, we can use it for running inference queries.

Use the offline feature store and local store for inference

We can use the SQL function to apply the ML model to data in queries, reports, and dashboards. Let’s use the function ml_fn_frauddetection_xgboost created by our model against our test dataset by filtering where transactiondt >=12517618, to predict whether a transaction is fraudulent or not. SageMaker Feature Store can be useful in supplementing data for inference requests.

Run the following query to predict whether transactions are fraudulent or not:

select  "isfraud" as "Actual",
        ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") as "Predicted"
from credit_fraud_detection_v where transactiondt >= 12517618;

For binary and multi-class classification problems, we compute the accuracy as the model metric. Accuracy can be calculated based on the following:

accuracy = (sum (actual == predicted)/total) *100

Let’s apply the preceding code to our use case to find the accuracy of the model. We use the test data (transactiondt >= 12517618) to test the accuracy, and use the newly created function ml_fn_frauddetection_xgboost to predict and take the columns other than the target and label as the input:

-- check accuracy 
WITH infer_data AS (
SELECT "isfraud" AS label,
ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") AS predicted,
CASE 
   WHEN label IS NULL
       THEN 0
   ELSE label
   END AS actual,
CASE 
   WHEN actual = predicted
       THEN 1::INT
   ELSE 0::INT
   END AS correct
FROM credit_fraud_detection_v where transactiondt >= 12517618),
aggr_data AS (
SELECT SUM(correct) AS num_correct,
COUNT(*) AS total
FROM infer_data) 

SELECT (num_correct::FLOAT / total::FLOAT) AS accuracy FROM aggr_data;

Clean up

As a final step, clean up the resources:

  1. Delete the Redshift cluster.
  2. Run the Cleanup Resources section of your notebook.

Conclusion

Redshift ML enables you to bring machine learning to your data, powering fast and informed decision-making. SageMaker Feature Store provides a purpose-built feature management solution to help organizations scale ML development across business units and data science teams.

In this post, we showed how you can train an XGBoost model using Redshift ML with data spread across SageMaker Feature Store and a Redshift table. Additionally, we showed how you can make inferences on a trained model to detect fraud using Amazon Redshift SQL commands.


About the authors

Anirban Sinha is a Senior Technical Account Manager at AWS. He is passionate about building scalable data warehouses and big data solutions working closely with customers. He works with large ISVs customers, in helping them build and operate secure, resilient, scalable, and high-performance SaaS applications in the cloud.

Phil Bates is a Senior Analytics Specialist Solutions Architect at AWS. He has more than 25 years of experience implementing large-scale data warehouse solutions. He is passionate about helping customers through their cloud journey and using the power of ML within their data warehouse.

Gaurav Singh is a Senior Solutions Architect at AWS, specializing in AI/ML and Generative AI. Based in Pune, India, he focuses on helping customers build, deploy, and migrate ML production workloads to SageMaker at scale. In his spare time, Gaurav loves to explore nature, read, and run.

Unstructured data management and governance using AWS AI/ML and analytics services

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/unstructured-data-management-and-governance-using-aws-ai-ml-and-analytics-services/

Unstructured data is information that doesn’t conform to a predefined schema or isn’t organized according to a preset data model. Unstructured information may have a little or a lot of structure but in ways that are unexpected or inconsistent. Text, images, audio, and videos are common examples of unstructured data. Most companies produce and consume unstructured data such as documents, emails, web pages, engagement center phone calls, and social media. By some estimates, unstructured data can make up to 80–90% of all new enterprise data and is growing many times faster than structured data. After decades of digitizing everything in your enterprise, you may have an enormous amount of data, but with dormant value. However, with the help of AI and machine learning (ML), new software tools are now available to unearth the value of unstructured data.

In this post, we discuss how AWS can help you successfully address the challenges of extracting insights from unstructured data. We discuss various design patterns and architectures for extracting and cataloging valuable insights from unstructured data using AWS. Additionally, we show how to use AWS AI/ML services for analyzing unstructured data.

Why it’s challenging to process and manage unstructured data

Unstructured data makes up a large proportion of the data in the enterprise that can’t be stored in a traditional relational database management systems (RDBMS). Understanding the data, categorizing it, storing it, and extracting insights from it can be challenging. In addition, identifying incremental changes requires specialized patterns and detecting sensitive data and meeting compliance requirements calls for sophisticated functions. It can be difficult to integrate unstructured data with structured data from existing information systems. Some view structured and unstructured data as apples and oranges, instead of being complementary. But most important of all, the assumed dormant value in the unstructured data is a question mark, which can only be answered after these sophisticated techniques have been applied. Therefore, there is a need to being able to analyze and extract value from the data economically and flexibly.

Solution overview

Data and metadata discovery is one of the primary requirements in data analytics, where data consumers explore what data is available and in what format, and then consume or query it for analysis. If you can apply a schema on top of the dataset, then it’s straightforward to query because you can load the data into a database or impose a virtual table schema for querying. But in the case of unstructured data, metadata discovery is challenging because the raw data isn’t easily readable.

You can integrate different technologies or tools to build a solution. In this post, we explain how to integrate different AWS services to provide an end-to-end solution that includes data extraction, management, and governance.

The solution integrates data in three tiers. The first is the raw input data that gets ingested by source systems, the second is the output data that gets extracted from input data using AI, and the third is the metadata layer that maintains a relationship between them for data discovery.

The following is a high-level architecture of the solution we can build to process the unstructured data, assuming the input data is being ingested to the raw input object store.

Unstructured Data Management - Block Level Architecture Diagram

The steps of the workflow are as follows:

  1. Integrated AI services extract data from the unstructured data.
  2. These services write the output to a data lake.
  3. A metadata layer helps build the relationship between the raw data and AI extracted output. When the data and metadata are available for end-users, we can break the user access pattern into additional steps.
  4. In the metadata catalog discovery step, we can use query engines to access the metadata for discovery and apply filters as per our analytics needs. Then we move to the next stage of accessing the actual data extracted from the raw unstructured data.
  5. The end-user accesses the output of the AI services and uses the query engines to query the structured data available in the data lake. We can optionally integrate additional tools that help control access and provide governance.
  6. There might be scenarios where, after accessing the AI extracted output, the end-user wants to access the original raw object (such as media files) for further analysis. Additionally, we need to make sure we have access control policies so the end-user has access only to the respective raw data they want to access.

Now that we understand the high-level architecture, let’s discuss what AWS services we can integrate in each step of the architecture to provide an end-to-end solution.

The following diagram is the enhanced version of our solution architecture, where we have integrated AWS services.

Unstructured Data Management - AWS Native Architecture

Let’s understand how these AWS services are integrated in detail. We have divided the steps into two broad user flows: data processing and metadata enrichment (Steps 1–3) and end-users accessing the data and metadata with fine-grained access control (Steps 4–6).

  1. Various AI services (which we discuss in the next section) extract data from the unstructured datasets.
  2. The output is written to an Amazon Simple Storage Service (Amazon S3) bucket (labeled Extracted JSON in the preceding diagram). Optionally, we can restructure the input raw objects for better partitioning, which can help while implementing fine-grained access control on the raw input data (labeled as the Partitioned bucket in the diagram).
  3. After the initial data extraction phase, we can apply additional transformations to enrich the datasets using AWS Glue. We also build an additional metadata layer, which maintains a relationship between the raw S3 object path, the AI extracted output path, the optional enriched version S3 path, and any other metadata that will help the end-user discover the data.
  4. In the metadata catalog discovery step, we use the AWS Glue Data Catalog as the technical catalog, Amazon Athena and Amazon Redshift Spectrum as query engines, AWS Lake Formation for fine-grained access control, and Amazon DataZone for additional governance.
  5. The AI extracted output is expected to be available as a delimited file or in JSON format. We can create an AWS Glue Data Catalog table for querying using Athena or Redshift Spectrum. Like the previous step, we can use Lake Formation policies for fine-grained access control.
  6. Lastly, the end-user accesses the raw unstructured data available in Amazon S3 for further analysis. We have proposed integrating Amazon S3 Access Points for access control at this layer. We explain this in detail later in this post.

Now let’s expand the following parts of the architecture to understand the implementation better:

  • Using AWS AI services to process unstructured data
  • Using S3 Access Points to integrate access control on raw S3 unstructured data

Process unstructured data with AWS AI services

As we discussed earlier, unstructured data can come in a variety of formats, such as text, audio, video, and images, and each type of data requires a different approach for extracting metadata. AWS AI services are designed to extract metadata from different types of unstructured data. The following are the most commonly used services for unstructured data processing:

  • Amazon Comprehend – This natural language processing (NLP) service uses ML to extract metadata from text data. It can analyze text in multiple languages, detect entities, extract key phrases, determine sentiment, and more. With Amazon Comprehend, you can easily gain insights from large volumes of text data such as extracting product entity, customer name, and sentiment from social media posts.
  • Amazon Transcribe – This speech-to-text service uses ML to convert speech to text and extract metadata from audio data. It can recognize multiple speakers, transcribe conversations, identify keywords, and more. With Amazon Transcribe, you can convert unstructured data such as customer support recordings into text and further derive insights from it.
  • Amazon Rekognition – This image and video analysis service uses ML to extract metadata from visual data. It can recognize objects, people, faces, and text, detect inappropriate content, and more. With Amazon Rekognition, you can easily analyze images and videos to gain insights such as identifying entity type (human or other) and identifying if the person is a known celebrity in an image.
  • Amazon Textract – You can use this ML service to extract metadata from scanned documents and images. It can extract text, tables, and forms from images, PDFs, and scanned documents. With Amazon Textract, you can digitize documents and extract data such as customer name, product name, product price, and date from an invoice.
  • Amazon SageMaker – This service enables you to build and deploy custom ML models for a wide range of use cases, including extracting metadata from unstructured data. With SageMaker, you can build custom models that are tailored to your specific needs, which can be particularly useful for extracting metadata from unstructured data that requires a high degree of accuracy or domain-specific knowledge.
  • Amazon Bedrock – This fully managed service offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Stability AI, and Amazon with a single API. It also offers a broad set of capabilities to build generative AI applications, simplifying development while maintaining privacy and security.

With these specialized AI services, you can efficiently extract metadata from unstructured data and use it for further analysis and insights. It’s important to note that each service has its own strengths and limitations, and choosing the right service for your specific use case is critical for achieving accurate and reliable results.

AWS AI services are available via various APIs, which enables you to integrate AI capabilities into your applications and workflows. AWS Step Functions is a serverless workflow service that allows you to coordinate and orchestrate multiple AWS services, including AI services, into a single workflow. This can be particularly useful when you need to process large amounts of unstructured data and perform multiple AI-related tasks, such as text analysis, image recognition, and NLP.

With Step Functions and AWS Lambda functions, you can create sophisticated workflows that include AI services and other AWS services. For instance, you can use Amazon S3 to store input data, invoke a Lambda function to trigger an Amazon Transcribe job to transcribe an audio file, and use the output to trigger an Amazon Comprehend analysis job to generate sentiment metadata for the transcribed text. This enables you to create complex, multi-step workflows that are straightforward to manage, scalable, and cost-effective.

The following is an example architecture that shows how Step Functions can help invoke AWS AI services using Lambda functions.

AWS AI Services - Lambda Event Workflow -Unstructured Data

The workflow steps are as follows:

  1. Unstructured data, such as text files, audio files, and video files, are ingested into the S3 raw bucket.
  2. A Lambda function is triggered to read the data from the S3 bucket and call Step Functions to orchestrate the workflow required to extract the metadata.
  3. The Step Functions workflow checks the type of file, calls the corresponding AWS AI service APIs, checks the job status, and performs any postprocessing required on the output.
  4. AWS AI services can be accessed via APIs and invoked as batch jobs. To extract metadata from different types of unstructured data, you can use multiple AI services in sequence, with each service processing the corresponding file type.
  5. After the Step Functions workflow completes the metadata extraction process and performs any required postprocessing, the resulting output is stored in an S3 bucket for cataloging.

Next, let’s understand how can we implement security or access control on both the extracted output as well as the raw input objects.

Implement access control on raw and processed data in Amazon S3

We just consider access controls for three types of data when managing unstructured data: the AI-extracted semi-structured output, the metadata, and the raw unstructured original files. When it comes to AI extracted output, it’s in JSON format and can be restricted via Lake Formation and Amazon DataZone. We recommend keeping the metadata (information that captures which unstructured datasets are already processed by the pipeline and available for analysis) open to your organization, which will enable metadata discovery across the organization.

To control access of raw unstructured data, you can integrate S3 Access Points and explore additional support in the future as AWS services evolve. S3 Access Points simplify data access for any AWS service or customer application that stores data in Amazon S3. Access points are named network endpoints that are attached to buckets that you can use to perform S3 object operations. Each access point has distinct permissions and network controls that Amazon S3 applies for any request that is made through that access point. Each access point enforces a customized access point policy that works in conjunction with the bucket policy that is attached to the underlying bucket. With S3 Access Points, you can create unique access control policies for each access point to easily control access to specific datasets within an S3 bucket. This works well in multi-tenant or shared bucket scenarios where users or teams are assigned to unique prefixes within one S3 bucket.

An access point can support a single user or application, or groups of users or applications within and across accounts, allowing separate management of each access point. Every access point is associated with a single bucket and contains a network origin control and a Block Public Access control. For example, you can create an access point with a network origin control that only permits storage access from your virtual private cloud (VPC), a logically isolated section of the AWS Cloud. You can also create an access point with the access point policy configured to only allow access to objects with a defined prefix or to objects with specific tags. You can also configure custom Block Public Access settings for each access point.

The following architecture provides an overview of how an end-user can get access to specific S3 objects by assuming a specific AWS Identity and Access Management (IAM) role. If you have a large number of S3 objects to control access, consider grouping the S3 objects, assigning them tags, and then defining access control by tags.

S3 Access Points - Unstructured Data Management - Access Control

If you are implementing a solution that integrates S3 data available in multiple AWS accounts, you can take advantage of cross-account support for S3 Access Points.

Conclusion

This post explained how you can use AWS AI services to extract readable data from unstructured datasets, build a metadata layer on top of them to allow data discovery, and build an access control mechanism on top of the raw S3 objects and extracted data using Lake Formation, Amazon DataZone, and S3 Access Points.

In addition to AWS AI services, you can also integrate large language models with vector databases to enable semantic or similarity search on top of unstructured datasets. To learn more about how to enable semantic search on unstructured data by integrating Amazon OpenSearch Service as a vector database, refer to Try semantic search with the Amazon OpenSearch Service vector engine.

As of writing this post, S3 Access Points is one of the best solutions to implement access control on raw S3 objects using tagging, but as AWS service features evolve in the future, you can explore alternative options as well.


About the Authors

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Bhavana Chirumamilla is a Senior Resident Architect at AWS with a strong passion for data and machine learning operations. She brings a wealth of experience and enthusiasm to help enterprises build effective data and ML strategies. In her spare time, Bhavana enjoys spending time with her family and engaging in various activities such as traveling, hiking, gardening, and watching documentaries.

Sheela Sonone is a Senior Resident Architect at AWS. She helps AWS customers make informed choices and trade-offs about accelerating their data, analytics, and AI/ML workloads and implementations. In her spare time, she enjoys spending time with her family—usually on tennis courts.

Daniel Bruno is a Principal Resident Architect at AWS. He had been building analytics and machine learning solutions for over 20 years and splits his time helping customers build data science programs and designing impactful ML products.

Processing large records with Amazon Kinesis Data Streams

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/

In today’s digital era, data is abundant and constantly flowing. Businesses across industries are seeking ways to harness this wealth of information to gain valuable insights and make real-time decisions. To meet this need, AWS offers Amazon Kinesis Data Streams, a powerful and scalable real-time data streaming service. With Kinesis Data Streams, you can effortlessly collect, process, and analyze streaming data in real time at any scale. This service seamlessly integrates into your data architecture, allowing you to tap into the full potential of your data for informed decision-making.

Data streaming technologies like Kinesis Data Streams are designed to efficiently process and manage continuous streams of data in real time at large scale. The individual pieces of data within these streams are often referred to as records. In scenarios like large file processing or performing image, audio, or video analytics, your record may exceed 1 MB. You may struggle to ingest such a large record with Kinesis Data Streams because, as of this writing, the service has a 1 MB upper limit for maximum data record size.

In this post, we show you some different options for handling large records within Kinesis Data Streams and the benefits and disadvantages of each approach. We provide some sample code for each option to help you get started with any of these approaches with your own workloads.

Understanding the default behavior of Kinesis Data Streams

You can send records to Kinesis Data Streams using the PutRecord or PutRecords API calls. These APIs include a mandatory field known as PartitionKey, where you must provide a specific value. This partition key is used by the service to map records with the same partition keys to the same shard to ensure ordering and locality for consumption. Locality means that you want the same consumer to process all records for a given partition key. This helps ensure that data with the same partition key stays together within the same shard, maintaining data order.

Each shard, which holds your data, can handle writing up to 1 MB per second. Let’s consider a scenario where you define a partition key and attempt to send a data record that exceeds 1 MB in size. Based on the explanation so far, the service will reject this request because the record size is over 1 MB. To help you understand better, we experimented by trying to send a record of 1.5 MB to a stream, and the outcome was the following exception message:

import json
import boto3
client = boto3.client('kinesis', region_name='ap-southeast-2')

def lambda_handler(event, context):
    try:
        response = client.put_record(
            StreamName='test',
            Data=b'Sample 1 MB....',
            PartitionKey='string'
            #StreamARN='string'
        )
    
    except Exception as e:
        print (e)

START RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb Version: $LATEST An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value at 'data' failed to satisfy constraint: Member must have length less than or equal to 1048576 END RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb

Strategies for handling large records

Now that we understand the behavior of the PutRecord and PutRecords APIs, let’s discuss strategies you can use to overcome this situation. One thing to keep in mind is that there is no single best solution; in the following sections, we discuss some of the approaches that you can evaluate based on your use case:

  • Store large records in Amazon Simple Storage Service (Amazon S3) with a reference in Kinesis Data Streams
  • Split one large record into multiple records
  • Compress your large records

Let’s discuss these points one by one.

Store large records in Amazon S3 with a reference in Kinesis Data Streams

A useful approach for storing large records involves utilizing an alternative storage solution while employing a reference within Kinesis Data Streams. In this context, Amazon S3 stands out as an excellent choice due to its exceptional durability and cost-effectiveness. The procedure involves uploading the record as an object to an S3 bucket and subsequently writing a reference entry in Kinesis Data Streams. This entry incorporates an attribute that serves as a pointer, indicating the location of the object within Amazon S3.

With this approach, you can generate a pre-signed URL associated with the S3 object’s location. This link can be shared with the requester, offering them direct access to the object without the need for intermediary server-side data transfers.

The following diagram illustrates the architecture of this solution.

The following is the sample code to write data to Kinesis Data Streams using this approach:

import json
import boto3
import random

def lambda_handler(event, context):
    try:
        s3 = boto3.client('s3', region_name='ap-southeast-2')
        kds = boto3.client('kinesis', region_name='ap-southeast-2')
        expiration=3600
        pk=str(random.randint(100,100000000))
        bucket_name = 'MY_BUCKET'
        object_key = 'air/' + pk + '.txt'
        file_content = b'LARGE OBJECT'
        response = s3.put_object(Bucket=bucket_name, Key=object_key, Body=file_content)
        presigned_url = s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': bucket_name, 'Key': object_key},
            ExpiresIn=expiration
        )
        
        kdata = {'message': presigned_url}
        response = kds.put_record(
            StreamName='test',
            Data=json.dumps(kdata),
            PartitionKey=pk
        )
        print (response)
    except Exception as e:
        print (e)

If you are using an AWS Lambda consumer to process this data, you can now decode the record to get the S3 pre-signed URL to efficiently retrieve the object from Amazon S3. Then you can implement your business logic to effectively process the data. The following is sample code for reference:

import json
import base64
import json

def lambda_handler(event, context):
    item = None
    decoded_record_data = [base64.b64decode(record['kinesis']['data']).decode().replace('\n','') for record in event['Records']]
    deserialized_data = [json.loads(decoded_record) for decoded_record in decoded_record_data]
    
    
    for item in deserialized_data:
        LOB=(item['message'])
        #process LOB implementing your business logic

An inherent benefit of adopting this technique is the capability to store data in Amazon S3, accommodating an extensive range of sizes per individual object. This method helps you reduce the costs of using Kinesis Data Streams because it uses less storage space and requires fewer read and write throughput for item access. This optimization is achieved by storing just the URL within Kinesis Data Streams. However, it’s important to acknowledge that accessing the sizable object necessitates an additional call to Amazon S3, thereby introducing higher latency for clients as they manage the additional request.

Split one large record into multiple records

Splitting large records into smaller ones in Kinesis Data Streams brings advantages like faster processing, improved throughput, efficient resource use, and more straightforward error handling. Let’s say you have a large record that you want to split into smaller chunks before sending them to a Kinesis data stream. First, you need to set up a Kinesis producer. Suppose you have a large record as a string. You can split it into smaller chunks of a predefined size. For this example, let’s say you’re splitting the record into chunks of 100 characters each. After you split that, loop through the record chunks and send each chunk as a separate message to a Kinesis data stream. The following is the sample code:

import boto3
kinesis = boto3.client('kinesis', region_name='ap-southeast-2')  

def split_record(record, chunk_size):
    chunks = [record[i:i + chunk_size] for i in range(0, len(record), chunk_size)]
    return chunks

def send_to_kinesis(stream_name, record):
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=record,
        PartitionKey= '100'
    )
    return response

def main():
    stream_name = 'test'  
    large_record = 'Your large record'  # Replace with your actual record
    chunk_size = 100  

    record_chunks = split_record(large_record, chunk_size)

    for chunk in record_chunks:
        response = send_to_kinesis(stream_name, chunk)
        print(f"Record sent: {response['SequenceNumber']}")

if __name__ == "__main__":
    main()

Ensure that all chunks of a given message are directed to a single partition, thereby guaranteeing the preservation of their order. In the final chunk, include metadata within the header indicating the conclusion of the message during production. This enables consumers to identify the ultimate chunk and facilitates seamless message reconstruction. The drawback of this method is that it adds complexity to the client-side tasks of dividing and putting back together the different parts. Therefore, these functions need thorough testing to prevent any loss of data.

Compress your large records

Applying data compression prior to transmitting it to Kinesis Data Streams has numerous advantages. This approach not only reduces the data’s size, enabling swifter travel and more efficient utilization of network resources, but also leads to cost savings in terms of storage expenses while optimizing overall resource consumption. Additionally, this practice simplifies storage and data retention. By using compression algorithms such as GZIP, Snappy, or LZ4, you can achieve substantial reduction in the size of large records. Compression brings the benefit of simplicity because it’s implemented seamlessly without requiring the caller to make changes to the item or use extra AWS services to support storage. However, compression introduces additional CPU overhead and latency on the producer side, and its impact on the compression ratio and efficiency can vary depending on the data type and format. Also, compression can enhance consumer throughput at the expense of some decompression overhead.

Conclusion

For real-time data streaming use cases, it’s essential to carefully consider the handling of large records when using Kinesis Data Streams. In this post, we discussed the challenges associated with managing large records and explored strategies such as utilizing Amazon S3 references, record splitting, and compression. Each approach has its own set of benefits and drawbacks, so it’s crucial to evaluate the nature of your data and the tasks you need to perform. Select the most suitable approach based on your data’s characteristics and your processing task requirements.

We encourage you to try out the approaches discussed in this post and share your thoughts in the comments section.


About the author

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Orchestrate Amazon EMR Serverless jobs with AWS Step functions

Post Syndicated from Naveen Balaraman original https://aws.amazon.com/blogs/big-data/orchestrate-amazon-emr-serverless-jobs-with-aws-step-functions/

Amazon EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks. You can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless automatically scales resources up and down to provide just the right amount of capacity for your application, and you only pay for what you use.

AWS Step Functions is a serverless orchestration service that enables developers to build visual workflows for applications as a series of event-driven steps. Step Functions ensures that the steps in the serverless workflow are followed reliably, that the information is passed between stages, and errors are handled automatically.

The integration between AWS Step Functions and Amazon EMR Serverless makes it easier to manage and orchestrate big data workflows. Before this integration, you had to manually poll for job statuses or implement waiting mechanisms through API calls. Now, with the support for “Run a Job (.sync)” integration, you can more efficiently manage your EMR Serverless jobs. Using .sync allows your Step Functions workflow to wait for the EMR Serverless job to complete before moving on to the next step, effectively making job execution part of your state machine. Similarly, the “Request Response” pattern can be useful for triggering a job and immediately getting a response back, all within the confines of your Step Functions workflow. This integration simplifies your architecture by eliminating the need for additional steps to monitor job status, making the whole system more efficient and easier to manage.

In this post, we explain how you can orchestrate a PySpark application using Amazon EMR Serverless and AWS Step Functions. We run a Spark job on EMR Serverless that processes Citi Bike dataset data in an Amazon Simple Storage Service (Amazon S3) bucket and stores the aggregated results in Amazon S3.

Solution Overview

We demonstrate this solution with an example using the Citi Bike dataset. This dataset includes numerous parameters such as Rideable type, Start station, Started at, End station, Ended at, and various other elements about Citi Bikers ride. Our objective is to find the minimum, maximum, and average bike trip duration in a given month.

In this solution, the input data is read from the S3 input path, transformations and aggregations are applied with the PySpark code, and the summarized output is written to the S3 output path s3://<bucket-name>/serverlessout/.

The solution is implemented as follows:

  • Creates an EMR Serverless application with Spark runtime. After the application is created, you can submit the data-processing jobs to that application. This API step waits for Application creation to complete.
  • Submits the PySpark job and waits for its completion with the StartJobRun (.sync) API. This allows you to submit a job to an Amazon EMR Serverless application and wait until the job completes.
  • After the PySpark job completes, the summarized output is available in the S3 output directory.
  • If the job encounters an error, the state machine workflow will indicate a failure. You can inspect the specific error within the state machine. For a more detailed analysis, you can also check the EMR job failure logs in the EMR studio console.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • An AWS account
  • An IAM user with administrator access
  • An S3 bucket

Solution Architecture

To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration and Amazon EMR Serverless for data transformations. Summarized output is then written to Amazon S3 bucket.

The following diagram illustrates the architecture for this use case

Deployment steps

Before beginning this tutorial, ensure that the role being used to deploy has all the relevant permissions to create the required resources as part of the solution. The roles with the appropriate permissions will be created through a CloudFormation template using the following steps.

Step 1: Create a Step Functions state machine

You can create a Step Functions State Machine workflow in two ways— either through the code directly or through the Step Functions studio graphical interface. To create a state machine, you can follow the steps from either option 1 or option 2 below.

Option 1: Create the state machine through code directly

To create a Step Functions state machine along with the necessary IAM roles, complete the following steps:

  1. Launch the CloudFormation stack using this link. On the Cloud Formation console, provide a stack name and accept the defaults to create the stack. Once the CloudFormation deployment completes, the following resources are created, in addition EMR Service Linked Role will be automatically created by this CloudFormation stack to access EMR Serverless:
    • S3 bucket to upload the PySpark script and write output data from EMR Serverless job. We recommend enabling default encryption on your S3 bucket to encrypt new objects, as well as enabling access logging to log all requests made to the bucket. Following these recommendations will improve security and provide visibility into access of the bucket.
    • EMR Serverless Runtime role that provides granular permissions to specific resources that are required when EMR Serverless jobs run.
    • Step Functions Role to grant AWS Step Functions permissions to access the AWS resources that will be used by its state machines.
    • State Machine with EMR Serverless steps.

  1. To prepare the S3 bucket with PySpark script, open AWS Cloudshell from the toolbar on the top right corner of AWS console and run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/bikeaggregator.py s3://serverless-<<ACCOUNT-ID>>-blog/scripts/

  1. To prepare the S3 bucket with Input data, run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/201306-citibike-tripdata.csv s3://serverless-<<ACCOUNT-ID>>-blog/data/ --copy-props none

Option 2: Create the Step Functions state machine through Workflow Studio

Prerequisites

Before creating the State Machine though Workshop Studio, please ensure that all the relevant roles and resources are created as part of the solution.

  1. To deploy the necessary IAM roles and S3 bucket into your AWS account, launch the CloudFormation stack using this link. Once the CloudFormation deployment completes, the following resources are created:
    • S3 bucket to upload the PySpark script and write output data. We recommend enabling default encryption on your S3 bucket to encrypt new objects, as well as enabling access logging to log all requests made to the bucket. Following these recommendations will improve security and provide visibility into access of the bucket.
    • EMR Serverless Runtime role that provides granular permissions to specific resources that are required when EMR Serverless jobs run.
    • Step Functions Role to grant AWS Step Functions permissions to access the AWS resources that will be used by its state machines.

  1. To prepare the S3 bucket with PySpark script, open AWS Cloudshell from the toolbar on the top right of the AWS console and run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/bikeaggregator.py s3://serverless-<<ACCOUNT-ID>>-blog/scripts/

  1. To prepare the S3 bucket with Input data, run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/201306-citibike-tripdata.csv s3://serverless-<<ACCOUNT-ID>>-blog/data/ --copy-props none

To create a Step Functions state machine, complete the following steps:

  1. On the Step Functions console, choose Create state machine.
  2. Keep the Blank template selected, and click Select.
  3. In the Actions Menu on the left, Step Functions provides a list of AWS services APIs that you can drag and drop into your workflow graph in the design canvas. Type EMR Serverless in the search and drag the Amazon EMR Serverless CreateApplication state to the workflow graph:

  1. In the canvas, select Amazon EMR Serverless CreateApplication state to configure its properties. The Inspector panel on the right shows configuration options. Provide the following Configuration values:
    • Change the State name to Create EMR Serverless Application
    • Provide the following values to the API Parameters. This creates an EMR Serverless Application with Apache Spark based on Amazon EMR release 6.12.0 using default configuration settings.
      {
          "Name": "ServerlessBikeAggr",
          "ReleaseLabel": "emr-6.12.0",
          "Type": "SPARK"
      }

    • Click the Wait for task to complete – optional check box to wait for EMR Serverless Application creation state to complete before executing the next state.
    • Under Next state, select the Add new state option from the drop-down.
  2. Drag EMR Serverless StartJobRun state from the left browser to the next state in the workflow.
    • Rename State name to Submit PySpark Job
    • Provide the following values in the API parameters and click Wait for task to complete – optional (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID).
{
"ApplicationId.$": "$.ApplicationId",
    "ExecutionRoleArn": "arn:aws:iam::<<ACCOUNT-ID>>:role/EMR-Serverless-Role-<<ACCOUNT-ID>>",
    "JobDriver": {
        "SparkSubmit": {
            "EntryPoint": "s3://serverless-<<ACCOUNT-ID>>-blog/scripts/bikeaggregator.py",
            "EntryPointArguments": [
                "s3://serverless-<<ACCOUNT-ID>>-blog/data/",
                "s3://serverless-<<ACCOUNT-ID>>-blog/serverlessout/"
            ],
            "SparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
    }
}

  1. Select the Config tab for the state machine from the top and change the following configurations:
    • Change State machine name to EMRServerless-BikeAggr found in Details.
    • In the Permissions section, select StateMachine-Role-<<ACCOUNT-ID>> from the dropdown for Execution role. (Make sure that you replace <<ACCOUNT-ID>> with your AWS Account ID).
  2. Continue to add steps for Check Job Success from the studio as shown in the following diagram.

  1. Click Create to create the Step Functions State Machine for orchestrating the EMR Serverless jobs.

Step 2: Invoke the Step Functions

Now that the Step Function is created, we can invoke it by clicking on the Start execution button:

When the step function is being invoked, it presents its run flow as shown in the following screenshot. Because we have selected Wait for task to complete config (.sync API) for this step, the next step would not start wait until EMR Serverless Application is created (blue represents the Amazon EMR Serverless Application being created).

After successfully creating the EMR Serverless Application, we submit a PySpark Job to that Application.

When the EMR Serverless job completes, the Submit PySpark Job step changes to green. This is because we have selected the Wait for task to complete configuration (using the .sync API) for this step.

The EMR Serverless Application ID as well as PySpark Job run Id from Output tab for Submit PySpark Job step.

Step 3: Validation

To confirm the successful completion of the job, navigate to EMR Serverless console and find the EMR Serverless Application Id. Click the Application Id to find the execution details for the PySpark Job run submitted from the Step Functions.

To verify the output of the job execution, you can check the S3 bucket where the output will be stored in a .csv file as shown in the following graphic.

Cleanup

Log in to the AWS Management Console and delete any S3 buckets created by this deployment to avoid unwanted charges to your AWS account. For example: s3://serverless-<<ACCOUNT-ID>>-blog/

Then clean up your environment, delete the CloudFormation template you created in the Solution configuration steps.

Delete Step function you created as part of this solution.

Conclusion

In this post, we explained how to launch an Amazon EMR Serverless Spark job with Step Functions using Workflow Studio to implement a simple ETL pipeline that creates aggregated output from the Citi Bike dataset and generate reports.

We hope this gives you a great starting point for using this solution with your datasets and applying more complex business rules to solve your transient cluster use cases.

Do you have follow-up questions or feedback? Leave a comment. We’d love to hear your thoughts and suggestions.

References


About the Authors

Naveen Balaraman is a Sr Cloud Application Architect at Amazon Web Services. He is passionate about Containers, Serverless, Architecting Microservices and helping customers leverage the power of AWS cloud.

Karthik Prabhakar is a Senior Big Data Solutions Architect for Amazon EMR at AWS. He is an experienced analytics engineer working with AWS customers to provide best practices and technical advice in order to assist their success in their data journey.

Parul Saxena is a Big Data Specialist Solutions Architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where she provides architectural guidance to customers for running complex big data workloads over AWS platform. In her spare time, she enjoys traveling and spending time with her family and friends.