Tag Archives: Analytics

Achieve low-latency data processing with Amazon EMR on AWS Local Zones

Post Syndicated from Gagan Brahmi original https://aws.amazon.com/blogs/big-data/achieve-low-latency-data-processing-with-amazon-emr-on-aws-local-zones/

Enterprises today require both single-digit millisecond latency data processing and data residency compliance for their applications. By deploying Amazon EMR on AWS Local Zones, organizations can achieve single-digit millisecond latency data processing for applications while maintaining data residency compliance. This post demonstrates how to use AWS Local Zones to deploy EMR clusters closer to your users, enabling millisecond-level response times. We use a Secure Web Gateway as an example and implement Amazon EMR with Apache Flink on AWS Local Zones to process network traffic with ultra-low latency. We also go through the process of creating an EMR cluster on AWS Local Zones, highlighting performance optimizations and architecture considerations specific to edge deployments. This approach uses AWS Local Zones to bring Amazon EMR’s data processing capabilities closer to your users and data sources – ideal for security applications or any other latency-sensitive workloads.

Solution overview

The following diagram illustrates the solution architecture.

Sample Architecture for Secure Web Gateway on AWS LocalZones

The solution consists of several key components:

  • AWS Local Zones deployment – Positioned close to corporate offices to minimize latency
  • Network traffic interception – Using AWS Transit Gateway and virtual private cloud (VPC) endpoints
  • Request queuing and rules streaming – Using Apache Kafka on Amazon Elastic Kubernetes Service (Amazon EKS) to queue the incoming and outgoing network requests as well as stream rules as they are updated by the security administrator
  • EMR cluster – Running Flink for real-time stream processing and functions to combine rules
  • Policy management system – For defining and updating security rules
  • Logging – Using Amazon Simple Storage Service (Amazon S3) for visibility, compliance, and data analytics

In this scenario, the Secure Web Gateway is designed to inspect and make decisions on network traffic within single-digit milliseconds. The workflow consists of the following steps:

  1. The corporate office uses AWS Direct Connect to connect to AWS Local Zones.
  2. The security administrator defines the rules from a rules interface running on Kubernetes pods on Amazon EKS. As the rules are added or modified, they are sent to the swg_rules Kafka topic running on Amazon EKS. These rules are stored and processed by Flink running on the EMR cluster.
  3. A corporate user requests for a software as a service (SaaS) application from the corporate office. The request is routed through Direct Connect to the Local Zone.
  4. The Secure Web Gateway proxy service running on Kubernetes pods on Amazon EKS receives the access request, which is sent to the swg_requests Kafka topic.
  5. Flink running on EMR evaluates and consumes the messages from the swg_requests Kafka topic and determines the routing decision, which is sent back to the swg_decisions Kafka topic.
  6. The Secure Web Gateway proxy service consumes the swg_decisions topic and routes the traffic to the SaaS application, if the access request is allowed. If the request is denied, the proxy responds back to the users with the reason or violations details, if any.

Due to the real-time nature of the solution, the security administrator can add, modify, or remove the rules through the swg_rules topic as Flink constantly consumes and evaluates this topic.In the following sections, we discuss the key components of the solution in more detail.

AWS Local Zones: The foundation

AWS Local Zones provide low-latency extensions of AWS Regions positioned near large population and industry centers. For our Secure Web Gateway use case, deploying in a Local Zones offers several advantages:

  • Proximity to corporate offices – Reducing round-trip latency for traffic inspection. AWS Local Zones is designed to provide applications with low latency aiming for single-digit millisecond performance.
  • AWS-native security controls – Using AWS security features.
  • Consistent connectivity – Reliable connection between corporate networks and AWS resources.

The Local Zone hosts our EMR cluster and networking components, making sure traffic inspection through the Secure Web Gateway happens with single-digit millisecond latency. For scenarios where traffic inspection doesn’t require single-digit millisecond latency, deploying hosting the solution on EMR cluster in a Region should work fine.

Amazon EMR with Apache Flink: The decision engine

The core intelligence of our Secure Web Gateway solution is powered by Amazon EMR running Flink for real-time stream processing. With Amazon EMR running on Flink, we take advantage of the optimized real-time stream processing capability offered by Flink. EMR running in AWS Local Zones helps users perform complex data processing closer to their data centers or corporate locations without worrying any potential latency introduced for moving the data to other Regions. In this particular solution, we use Flink’s stateful processing, which allows for maintaining the session context across multiple network requests/packets. The solution also provides a dynamic rules engine that is combined with the real-time stream of requests for network access.

Architectural component choice considerations

Amazon EMR offers several deployment options for different kinds of workloads and use cases, including Amazon EMR on EKS. AWS also provides Amazon Managed Service for Apache Flink, a fully managed service that simplifies the process of building and managing Flink applications. As of this writing, both the EMR on EKS deployment option and Amazon Managed Service for Apache Flink are not available in AWS Local Zones.

Prerequisites

Before proceeding with this deployment, ensure you have:

  • AWS account with AWS IAM permissions for Amazon VPC, EMR, and Local Zones management
  • Basic familiarity with the AWS Management Console

Deploy Amazon EMR on a Local Zone

To deploy Amazon EMR on a Local Zone, you first need to enable the Local Zone for the AWS account. For instructions, refer to Step 1 and Step 2 in Getting started with AWS Local Zones.

After you have enabled a Local Zone and created a Local Zone subnet, create your EMR cluster. For instructions, refer to Step 1: Configure data resources and launch an Amazon EMR cluster. You can follow the instructions provided for the AWS Management Console. Make sure you select the appropriate Amazon EMR release version (5.28.0 or later for Local Zone support). Select the applications you need, which in this case is Hadoop and Flink.

A crucial step to launching an EMR cluster in a Local Zone is selecting the Local Zone network configuration. Choose the VPC that contains your Local Zone subnet, and choose the subnet that you created in the Local Zone.

Review all other configurations and settings for your cluster and make any final adjustments as needed, then choose Create cluster to launch your EMR cluster in the Local Zone.

Performance and scaling considerations

The Local Zone EMR deployment can be scaled based on traffic patterns. You can manually scale the EMR cluster horizontally by adding more worker nodes during peak traffics to provide low-latency performance, after you have increased the number of users that access the Secure Web Gateway. Alternatively, you can set up a scheduled action to scale the EMR cluster at predetermined times based on known workload patterns. You can also perform vertical scaling by using Amazon Elastic Compute Cloud (Amazon EC2) instance types with more compute capacity. Consider using the manual resize option for EMR clusters to modify the cluster size based on workload requirements.

Another important performance consideration is to optimize Flink checkpointing for fault tolerance. To learn more, see Optimizing job restart times for task recovery and scaling operations.

Security considerations

Although this architecture prioritizes low-latency performance, implementing proper security controls is essential for production deployments. The solution handles sensitive corporate network traffic that requires protection through encryption, access controls, and monitoring. For comprehensive security guidance specific to EMR deployments, refer to Security in Amazon EMR. Consider the following key areas:

  • Data protection – Enable encryption at rest and in transit using Amazon EMR security configurations, including Amazon S3 encryption and TLS certificates for inter-node communication
  • Access control – Implement AWS Identity and Access Management (IAM) roles with least privilege for Amazon EMR service roles, EC2 instance profiles, and runtime roles to isolate job access
  • Network security – Deploy EMR clusters in private subnets with security groups following least privilege, and enable the Amazon EMR block public access feature

Benefits of Amazon EMR

Using Amazon EMR on AWS Local Zones in this architecture offers several key benefits:

  • Low latency – Providing the compute in AWS Local Zones close to corporate offices helps you achieve low-latency processing.
  • Real-time inspection – Flink’s streaming capabilities unlocks the ability to process real-time inspection for network requests.
  • Complex policy application – With Flink on Amazon EMR, you can build a complex policy application that, for instance, can detect sophisticated access patterns across multiple events and time windows that would be impossible with traditional rule-based systems.
  • Scalability – Amazon EMR provides the flexibility to automatically scale the cluster with a custom policy. Moreover, Amazon EMR release 6.15.0 and higher supports Flink autoscaler, which automatically scales the individual Flink job vertexes based on the job metrics.
  • Compliance – Logging all the events to a durable storage like Amazon S3 helps users improve their security and audit posture.

Clean up

To avoid incurring unnecessary charges, clean up the resources you created during this walkthrough. Follow these steps in order:

Step 1: Terminate the EMR cluster

  • Open the Amazon EMR console
  • Select your EMR cluster from the list
  • Choose Terminate
  • Confirm the termination when prompted
  • Wait for the cluster status to change to “TERMINATED”

Step 2: Clean up VPC resources

  • In the Amazon VPC console, delete the Local Zone subnet you created
  • If you created a custom VPC specifically for this demo, delete any associated:
    • Route tables
    • Internet gateways
    • Security groups (other than default)
    • The VPC itself

Step 3: Disable the Local Zone (optional)

  • In the EC2 console, go to Zones under “Settings”
  • Find your enabled Local Zone
  • Choose Manage and disable the zone if you no longer need it for other workloads

Step 4: Review additional resources Check for and clean up any other resources you may have created:

  • S3 buckets used for logging or EMR storage
  • CloudWatch log groups
  • Any custom IAM roles or policies created specifically for this architecture

Conclusion

This implementation of Amazon EMR on AWS Local Zones demonstrates how enterprises can bring powerful data processing capabilities to the edge while maintaining single-digit millisecond latency. By showcasing a Secure Web Gateway application, we have illustrated just one of many possible use cases where performance-sensitive workloads can benefit from this architecture.As the edge computing landscape evolves, we anticipate organizations will increasingly use this pattern for additional use cases, including:

  • Real-time fraud detection for financial transactions requiring immediate decision-making
  • Connected vehicle applications where processing telemetry data with minimal latency is critical
  • Internet of Things (IoT) sensor analytics that require immediate insights from operational technology environments
  • Augmented reality experiences where processing must happen close to end-users

We encourage you to evaluate your latency-sensitive workloads and consider how AWS Local Zones with Amazon EMR might help you implement architectures previously perceived highly challenging. Start small with a proof of concept like the one outlined here, measure the performance gains, and expand to production use cases with confidence. Implementing a Secure Web Gateway in AWS Local Zones with Amazon EMR and Flink offers enterprises a powerful solution for securing corporate traffic. By using the proximity of Local Zones and the real-time processing capabilities of Flink, organizations can implement sophisticated security policies without the latency penalties traditionally associated with traffic inspection.


About the authors

Gagan Brahmi is a Specialist Senior Solutions Architect at Amazon Web Services (AWS), specializing in Data Analytics and AI/ML solutions. With over 20 years in information technology, he helps customers architect scalable, high-performance analytics platforms using distributed data processing, real-time streaming technologies, and machine learning services on AWS. When not designing cloud solutions, Gagan enjoys exploring new places with his family.

Arun Shanmugam is a Senior Analytics Solutions Architect at AWS, with a focus on building modern data architecture. He has been successfully delivering scalable data analytics solutions for customers across diverse industries. Outside of work, Arun is an avid outdoor enthusiast who actively engages in CrossFit, road biking, and cricket.

George Oakes is a Senior Hybrid Solutions Architect at AWS, with a focus on edge, on-premise, and low latency architectures. He has been successfully delivering scalable hybrid AWS solutions for customers across diverse industries. Outside of work, George is an avid outdoor enthusiast who enjoys hiking and visiting parks and UNESCO sites around.

Transform your data to Amazon S3 Tables with Amazon Athena

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/transform-your-data-to-amazon-s3-tables-with-amazon-athena/

Organizations today manage vast amounts of data, with much of it stored based on initial use cases and business needs. As requirements for this data evolve—whether for real-time reporting, advanced machine learning (ML), or cross-team data sharing—the original storage formats and structures often become a bottleneck. When this happens, data teams frequently find that datasets that worked well for their original purpose now require complex transformations; custom extract, transform, and load (ETL) pipelines; and extensive redesign to unblock new analytical workflows. This creates a significant barrier between valuable data and actionable insights.

Amazon Athena offers a solution through its serverless, SQL-based approach to data transformation. With the CREATE TABLE AS SELECT (CTAS) functionality in Athena, you can transform existing data and create new tables in the process, using standard SQL statements to help reduce the need for custom ETL pipeline development.

This CTAS experience now supports Amazon S3 Tables, which provide built-in optimization, Apache Iceberg support, automatic table maintenance, and ACID transaction capabilities. This combination can help organizations modernize their data infrastructure, achieve improved performance, and reduce operational overhead.

You can use this approach to transform data from commonly used tabular formats, including CSV, TSV, JSON, Avro, Parquet, and ORC. The resulting tables are immediately accessible for querying across Athena, Amazon Redshift, Amazon EMR, and supported third-party applications, including Apache Spark, Trino, DuckDB, and PyIceberg.

This post demonstrates how Athena CTAS simplifies the data transformation process through a practical example: migrating an existing Parquet dataset into S3 Tables.

Solution overview

Consider a global apparel ecommerce retailer processing thousands of daily customer reviews across marketplaces. Their dataset, currently stored in Parquet format in Amazon Simple Storage Service (Amazon S3), requires updates whenever customers modify ratings and review content. The business needs a solution that supports ACID transactions—the ability to atomically insert, update, and delete records while maintaining data consistency—because review data changes frequently as customers edit their feedback.

Additionally, the data team faces operational challenges: manual table maintenance tasks like compaction and metadata management, no built-in support for time travel queries to analyze historical changes, and the need for custom processes to handle concurrent data modifications safely.

These requirements point to a need for an analytics-friendly solution that can handle transactional workloads while providing automated table maintenance, reducing the operational overhead that currently burdens their analysts and engineers.

S3 Tables and Athena provide an ideal solution for these requirements. S3 Tables provide storage optimized for analytics workloads, offering Iceberg support with automatic table maintenance and continuous optimization. Athena is a serverless, interactive query service you can use to analyze data using standard SQL without managing infrastructure. When combined, S3 Tables handle the storage optimization and maintenance automatically, and Athena provides the SQL interface for data transformation and querying. This can help reduce the operational overhead of manual table maintenance while providing efficient data management and optimal performance across supported data processing and query engines.

In the following sections, we show how to use the CTAS functionality in Athena to transform the Parquet-formatted review data into S3 Tables with a single SQL statement. We then demonstrate how to manage dynamic data using INSERT, UPDATE, and DELETE operations, showcasing the ACID transaction capabilities and metadata query features in S3 Tables.

Prerequisites

In this walkthrough, we will be working with synthetic customer review data that we’ve made publicly available at s3://aws-bigdata-blog/generated_synthetic_reviews/data/. To follow along, you must have the following prerequisites:

You will create an S3 table bucket named athena-ctas-s3table-demo as part of this walkthrough. Make sure this name is available in your chosen AWS Region.

Set up a database and tables in Athena

Let’s start by creating a database and source table to hold our Parquet data. This table will serve as the data source for our CTAS operation.

Navigate to the Athena query editor to run the following queries:

CREATE DATABASE IF NOT EXISTS `awsdatacatalog`.`reviewsdb`
CREATE EXTERNAL TABLE IF NOT EXISTS `awsdatacatalog`.`reviewsdb`.`customer_reviews`(
  `marketplace` string, 
  `customer_id` string, 
  `review_id` string, 
  `product_id` string, 
  `product_title` string, 
  `star_rating` bigint, 
  `helpful_votes` bigint, 
  `total_votes` bigint, 
  `insight` string, 
  `review_headline` string, 
  `review_body` string, 
  `review_date` timestamp, 
  `review_year` bigint)
PARTITIONED BY ( 
  `product_category` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://aws-bigdata-blog/generated_synthetic_reviews/data/'

Because the data is partitioned by product category, you must add the partition information to the table metadata using MSCK REPAIR TABLE:

MSCK REPAIR TABLE `awsdatacatalog`.`reviewsdb`.`customer_reviews`

The preview query should return sample review data, confirming the table is ready for transformation:

SELECT * FROM "awsdatacatalog"."reviewsdb"."customer_reviews" limit 10

Create a table bucket

Table buckets are designed to store tabular data and metadata as objects for analytics workloads. Follow these steps to create a table bucket:

  1. Sign in to the console in your preferred Region and open the Amazon S3 console.
  2. In the navigation pane, choose Table buckets.
  3. Choose Create table bucket.
  4. For Table bucket name, enter athena-ctas-s3table-demo.
  5. Select Enable integration for Integration with AWS analytics services if not already enabled.
  6. Leave the encryption option to default.
  7. Choose Create table bucket.

You can now see athena-ctas-s3table-demo listed under Table buckets.

Create a namespace

Namespaces provide logical organization for tables within your S3 table bucket, facilitating scalable table management. In this step, we create a reviews_namespace to organize our customer review tables. Follow these steps to create the table namespace:

  1. In the navigation pane under Table buckets, choose your newly created bucket athena-ctas-s3table-demo.
  2. On the bucket details page, choose Create table with Athena.
  3. Choose Create a namespace for Namespace configuration.
  4. Enter reviews_namespace for Namespace name.
  5. Choose Create namespace.
  6. Choose Create table with Athena to navigate to the Athena query editor.

You should now see your S3 Tables configuration automatically selected under Data, as shown in the following screenshot.

When you enable Integration with AWS analytics services, when creating an S3 table bucket, AWS Glue creates a new catalog called s3tablescatalog in your account’s default Data Catalog specific to your Region. The integration maps the S3 table bucket resources in your account and Region in this catalog.

This configuration makes sure subsequent queries will target your S3 Tables namespace. You’re now ready to create tables using the CTAS functionality.

Create a new S3 table using the customer_reviews table

A table represents a structured dataset consisting of underlying table data and related metadata stored in the Iceberg table format. In the following steps, we transform the customer_reviews table that we created earlier on the Parquet dataset into an S3 table using the Athena CTAS statement. We partition by date using the day() partition transforms from Iceberg.

Run the following CTAS query:

CREATE TABLE "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'day(review_date)' ]
) as
select *
from "awsdatacatalog"."reviewsdb"."customer_reviews"
where review_year >= 2016

This query creates as S3 table with the following optimizations:

  • Parquet format – Efficient columnar storage for analytics
  • Day-level partitioning – Uses Iceberg’s day() transform on review_date for fast queries when filtering on dates
  • Filtered data – Includes only reviews from 2016 onwards to demonstrate selective transformation

You have successfully transformed your Parquet dataset to S3 Tables using a single CTAS statement.

After you create the table, customer_reviews_s3table will appear under Tables in the Athena console. You can also view the table on the Amazon S3 console by choosing the options menu (three vertical dots) next to the table name and choosing View in S3.

Run a preview query to confirm the data transformation:

SELECT * FROM "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table" limit 10;

Next, let’s analyze monthly review trends:

SELECT review_year,
    month(review_date) as review_month,
    COUNT(*) as review_count,
    ROUND(AVG(star_rating), 2) as avg_rating
FROM "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
WHERE review_date >= DATE('2017-01-01')
    and review_date < DATE('2018-01-01')
GROUP BY 1,2
ORDER BY 1,2

The following screenshot shows our output.

ACID operations on S3 Tables

Athena supports standard SQL DML operations (INSERT, UPDATE, DELETE and MERGE INTO) on S3 Tables with full ACID transaction guarantees. Let’s demonstrate these capabilities by adding historical data and performing data quality checks.

Insert more data into the table using INSERT

Use the following query to insert review data from 2014 and 2015 that wasn’t included in the initial CTAS operation:

INSERT INTO "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
select *
from "awsdatacatalog"."reviewsdb"."customer_reviews"
where review_year IN (2014, 2015)

Check which years are now present in the table:

SELECT distinct(review_year)
from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
ORDER BY 1

The following screenshot shows our output.

The results show that you have successfully added 2014 and 2015 data. However, you might also notice some invalid years like 2101 and 2202, which appear to be data quality issues in the source dataset.

Clean invalid data using DELETE

Remove the records with incorrect years using the S3 Tables DELETE capability:

DELETE from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
WHERE review_year IN (2101, 2202)

Confirm the invalid records have been removed.

Update product categories using UPDATE

Let’s demonstrate the UPDATE operation with a business scenario. Imagine the company decides to rebrand the Movies_TV product category to Entertainment_Media to better reflect customer preferences.

First, examine the current product categories and their record counts:

select product_category,
    count(*) review_count
from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
group by 1
order by 1

You should see a record with product_category as Movies_TV with approximately 5,690,101 reviews. Use the following query to update all Movies_TV records to the new category name:

UPDATE "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
SET product_category = 'Entertainment_Media'
WHERE product_category = 'Movies_TV'

Verify the category name change while confirming the record count remains the same:

select product_category,
    count(*) review_count
from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
group by 1
order by 1

The results now show Entertainment_Media with the same record count (5,690,101), confirming that the UPDATE operation successfully modified the category name while preserving data integrity.

These examples demonstrate transactional support in S3 Tables through Athena. Combined with automated table maintenance, this helps you build scalable, transactional data lakes more efficiently with minimal operational overhead.

Additional transformation scenarios using CTAS

The Athena CTAS functionality supports multiple transformation paths to S3 Tables. The following scenarios demonstrate how organizations can use this capability for various data modernization needs:

  • Convert from various data formats – Athena can query data in a wide range of formats as well as federated data sources, and you can convert these queryable sources to an S3 table using CTAS. For example, to create an S3 table from a federated data source, use the following query:
CREATE TABLE "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."<s3table-name>" WITH (
    format = 'parquet'
) AS
SELECT *
FROM <federated-data-source>.<database>.<table>
  • Transform between S3 tables for optimized analytics – Organizations often need to create derived tables from existing S3 tables optimized for specific query patterns. For example, consider a table containing detailed customer reviews that’s partitioned by product category. If your analytics team frequently queries by date ranges, you can use CTAS to create a new S3 table partitioned by date for significantly better performance on time-based queries. For example, the following query creates an aggregated analytics S3 table:
CREATE TABLE "s3tablescatalog/destination-bucket"."namespace"."reviews_by_date" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'month(review_date)' ]
) AS
SELECT *
FROM "s3tablescatalog/source-bucket"."namespace"."reviews_by_category"
WHERE review_date >= DATE('2023-01-01')
  • Transform from self-managed open table formats – Organizations maintaining their own Iceberg tables can transform them into S3 tables to take advantage of automatic optimization and reduce operational overhead:
CREATE TABLE "s3tablescatalog/destination-bucket"."namespace"."managed_reviews" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'day(review_date)' ]
) AS
SELECT *
FROM "icebergdb"."self_managed_reviews_iceberg"
  • Combine multiple source tables – Organizations often need to consolidate data from multiple tables into a single table for simplified analytics. This approach can help reduce query complexity and improve performance by pre-joining related datasets. The following query joins multiple tables using CTAS to create an S3 table:
CREATE TABLE "s3tablescatalog/destination-bucket"."namespace"."enriched_reviews" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'day(review_date)' ]
) AS
SELECT 
    r.*,
    p.product_category,
    p.product_price,
    p.product_brand
FROM "catalog"."database"."reviews" r
JOIN "catalog"."database"."products" p
    ON r.product_id = p.product_id

These scenarios demonstrate the flexibility of Athena CTAS for various data modernization needs, from simple format conversions to complex data consolidation projects.

Clean up

To avoid ongoing charges, clean up the resources created during this walkthrough. Complete these steps in the specified order to facilitate proper resource deletion. You might need to add respective delete permissions for databases, table buckets, and tables if your IAM user or role doesn’t already have them.

  1. Delete the S3 table created through CTAS:
    DROP TABLE IF EXISTS `reviews_namespace`.`customer_reviews_s3table`

  2. Remove the namespace from the table bucket:
    DROP DATABASE `reviews_namespace`

  3. Delete the table bucket.
  4. Remove the database and table created for the synthetic dataset:
    DROP TABLE `reviewsdb`.`customer_reviews`

    DROP DATABASE `reviewsdb`

  5. Delete any created IAM roles or policies.
  6. Delete the Athena query result location in Amazon S3 if you stored results in an S3 location.

Conclusion

This post demonstrated how the CTAS functionality in Athena simplifies data transformation to S3 Tables using standard SQL statements. We covered the complete transformation process, including format conversions, ACID operations, and various data transformation scenarios. The solution delivers simplified data transformation through single SQL statements, automatic maintenance, and seamless integration of S3 Tables with AWS analytics services and third-party tools. Organizations can modernize their data infrastructure while achieving enterprise-grade performance.

To get started, begin by identifying datasets that could benefit from optimization or transformation, then refer to Working with Amazon S3 Tables and table buckets and Register S3 table bucket catalogs and query Tables from Athena to implement the transformation patterns demonstrated in this walkthrough. The combination of the serverless capabilities of Athena with the automatic optimizations in S3 Tables can provide a powerful foundation for modern data analytics.


About the authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS Analytics services.

Aritra Gupta is a Senior Technical Product Manager on the Amazon S3 team at Amazon Web Services. He helps customers build and scale data lakes. Based in Seattle, he likes to play chess and badminton in his spare time.

Export JMX metrics from Kafka connectors in Amazon Managed Streaming for Apache Kafka Connect with a custom plugin

Post Syndicated from Jaydev Nath original https://aws.amazon.com/blogs/big-data/export-jmx-metrics-from-kafka-connectors-in-amazon-managed-streaming-for-apache-kafka-connect-with-a-custom-plugin/

Organizations use streaming applications to process and analyze data in real time and adopt the Amazon MSK Connect feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK) to run fully managed Kafka Connect workloads on AWS. Message brokers like Apache Kafka allow applications to handle large volumes and diverse types of data efficiently and enable timely decision-making and instant insights. It’s crucial to monitor the performance and health of each component to help ensure the seamless operation of data streaming pipelines.

Amazon MSK is a fully managed service that simplifies the deployment and operation of Apache Kafka clusters on AWS. It simplifies building and running applications that use Apache Kafka to process streaming data. Amazon MSK Connect simplifies the deployment, monitoring, and automatic scaling of connectors that transfer data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. Amazon MSK Connect is fully compatible with Kafka Connect and supports Amazon MSK, Apache Kafka, and Apache Kafka compatible clusters. Amazon MSK Connect uses a custom plugin as the container for connector implementation logic.

Custom MSK connect plugins use Java Management Extensions (JMX) to expose runtime metrics. While Amazon MSK Connect sends a set of connect metrics to Amazon CloudWatch, it currently does not support exporting the JMX metrics emitted by the connector plugins natively. These metrics can be exported by modifying the custom connect plugin code directly, but it requires maintenance overhead because the plugin code needs to be modified every time it’s updated. In this post, we demonstrate an optimal approach by extending a custom connect plugin with additional modules to export JMX metrics and publish them to CloudWatch as custom metrics. These additional JMX metrics emitted by the custom connectors provide rich insights into their performance and health of the connectors. In this post, we demonstrate how you can export the JMX metrics for Debezium connector when used with MSK Connect.

Understanding JMX

Before we dive deep into exporting JMX metrics, let’s understand how JMX works. JMX is a technology that you can use to monitor and manage Java applications. Key components involved in JMX monitoring are:

  • Managed beans (MBeans) are Java objects that represent the metrics of the Java application being monitored. They contain the actual data points of the resources being monitored.
  • JMX server creates and registers the MBeans with the PlatformMBeanServer. The Java application that is being monitored acts as the JMX server and exposes the MBeans.
  • MBeanServer or JMX registry is the central registry that keeps track of all the registered MBeans in the JMX server. It is the access point for all the MBeans within the Java virtual machine (JVM).
  • JMXConnectorServer acts as a bridge between the JMX client and the JMX server and enables remote access to the exposed MBeans. JMXConnectorServerFactory creates and manages the JMXConnectorServer. It allows for the customization of the server’s properties and uses the JMXServiceURL to define the endpoint where the JMX client can connect to the JMX server.
  • JMXServiceURL provides the necessary information such as the protocol, host, and port for the client to connect to the JMX server and access the desired MBeans.
  • JMX client is an external application or tool that connect to the JMX server to access and monitor the exposed metrics.

JMX monitoring involves the steps shown in the following figure:

JMX architecture diagram showing connection flow from client to server with MBeans

JMX monitoring steps include:

  1. The Java application acting as the JMX server creates and configures MBeans for the desired metrics.
  2. JMX server registers the MBeans with the JMX registry.
  3. JMXConnectorServerFactory creates the JMXConnectorServer that defines the JMXServiceURL that provides the entry point details for the JMX client.
  4. JMXClient connects to the JMX registry in the JMX server using the JMXServiceURL and the JMXConnectorServer.
  5. The JMX server handles client requests, interacting with the JMX registry to retrieve the MBean data.

Solution overview

This method of wrapping supported Kafka connectors with custom code that exposes connector-specific operational metrics enables teams to get better insights by correlating various connector metrics with cloud-centered metrics in monitoring systems such as Amazon CloudWatch. This approach enables consistent monitoring across different components of the change data capture (CDC) pipeline, ultimately feeding metrics into unified dashboards while respecting each connector’s architectural philosophy. The consolidated metrics can be delivered to CloudWatch or the monitoring tool of your choice including partner specific application performance management (APM) tools such as Datadog, New Relic, and so on.

We have the working implementation of this same approach with two popular connectors: Debezium source connector and MongoDB Sink Connector. You can find the Github sample and ready to use plugins built for each in the repository. Review the README file for this custom implementation for more details.

For example, our custom implementation for the MongoDB Sink Connector adds a metrics export layer that calculates critical performance indicators such as latest-kafka-time-difference-ms – which measures the latency between Kafka message timestamps and connector processing time by subtracting the connector’s current clock time from the last received record’s timestamp. This custom wrapper around the MongoDB Sink Connector enables exporting relevant JMX metrics and publishing them as custom metrics to CloudWatch. We’ve open sourced this solution on GitHub, along with a ready-to-use plugin and detailed configuration guidance in the README.

CDC is the process of identifying and capturing changes made in a database and delivering those changes in real time to a downstream system. Debezium is an open source distributed platform built on top of Apache Kafka that provides CDC functionality. It provides a set of connectors to track and stream changes from databases to Kafka.

In the next section, we dive deep into the implementation details of how to export JMX metrics from Debezium MySQL Connector deployed as a custom plugin in Amazon MSK Connect. The connector plugin takes care of creating and configuring the MBeans and registering them with the JMX registry.

The following diagram shows the workflow of using Debezium MySQL Connector as a custom plugin in Amazon MSK Connect for CDC from an Amazon Aurora MySQL-Compatible Edition data source.

Data flow diagram illustrating custom Amazon MSK Connect plugin integrating Aurora, Kafka, and CloudWatch metrics

  1. MySQL binary log (binlog) is enabled in Amazon Aurora for MySQL to record all the operations in the order in which they are committed to the database.
  2. The Debezium connector plugin component of the MSK Connect custom plugin continuously monitors the MySQL database, captures the row-level changes by reading the MySQL bin logs, and streams them as change events to Kafka topics in Amazon MSK.
  3. We’ll build a custom module to enable JMX monitoring on the Debezium connector. This module will act as a JMX client to retrieve the JMX metrics from the connector and publish them as custom metrics to CloudWatch.

The Debezium connector provides three types of metrics in addition to the built-in support for default Kafka and Kafka Connect JMX metrics.

  • Snapshot metrics provide information about connector operation while performing a snapshot.
  • Streaming metrics provide information about connector operation when the connector is reading the binlog.
  • Schema history metrics provide information about the status of the connector’s schema history.

In this solution, we export the MilliSecondsBehindSource streaming metrics emitted by the Debezium MySQL connector. This metric provides the number of milliseconds that the connector is lagging behind the change events in the database.

Prerequisites

Following are the prerequisites you need:

  • Access to the AWS account where you want to set up this solution.
  • You have set up the source database and MSK cluster by following this setup instructions in the MSK Connect workshop.

Create a custom plugin

Creating a custom plugin for Amazon MSK Connect for the solution involves the following steps:

  1. Create a custom module: Create a new Maven module or project that will contain your custom code to:
    1. Enable JMX monitoring in the connector application by starting the JMX server.
    2. Create a Remote Method Invocation (RMI) registry to enable the access to the JMX metrics to the clients.
    3. Create a JMX metrics exporter to query the JMX metrics by connecting to the JMX server and push the metrics to CloudWatch as custom metrics.
    4. Schedule to run the JMX metrics exporter at a configured interval.
  2. Package and deploy the custom module as an MSK Connect custom plugin.
  3. Create a connector using the custom plugin to capture CDC from the source, stream it and validate the metrics in Amazon CloudWatch.

This custom module extends the connector functionality to export the JMX metrics without requiring any changes in the underlying connector implementation. This helps ensure that upgrading the custom plugin requires only upgrading the plugin version in the pom.xml of the custom module.

Let’s deep dive and understand the implementation of each step mentioned above.

1. Create a custom module

Create a new Maven project with dependencies on Debezium MySQL Connector to enable JMX monitoring, Kafka Connect API for configuration, and CloudWatch AWS SDK to push the metrics to CloudWatch.
Set up a JMX connector server to enable JMX monitoring: To enable JMX monitoring, the JMX server needs to be started at the time of initializing the connector. This is usually done by setting the environment variables with JMX options as described in Monitoring Debezium. In the case of an Amazon MSK Connect custom plugin, JMX monitoring is enabled programmatically at the time of connector plugin initialization. To achieve this:

  • Extend the MySqlConnector class and override the start which is the connector’s entry point to execute custom code.
public class DebeziumMySqlMetricsConnector extends MySqlConnector{
@Override
	public void start(Map<String, String> props) {
  • In the start method of the custom connector class (DebeziumMySqlMetricsConnector) that we are creating, set the following parameters to allow customization of the JMX Server properties by retrieving connector configuration from a config file.

connect.jmx.port – The port number on which the RMI registry needs to be created. JMXConnectorServer would listen to the incoming connections on this port.

database.server.name – Name of the database that is the source for the CDC.

It also retrieves the CloudWatch configuration related properties that will be used while pushing the JMX metrics to CloudWatch.

cloudwatch.namespace.name – CloudWatch NameSpace to which the metrics need to be pushed as custom metrics

cloudwatch.region – CloudWatch Region where the custom namespace is created in your AWS account

connectJMXPort = Integer.parseInt(props.getOrDefault(CONNECT_JMX_PORT_KEY, String.valueOf(DEFAULT_JMX_PORT)));
databaseServerName = props.getOrDefault(DATABASE_SERVER_NAME_KEY, "");
cwNameSpace = props.getOrDefault(CW_NAMESPACE_KEY, DEFAULT_CW_NAMESPACE);
cwRegion = props.getOrDefault(CW_REGION_KEY, null);
  • Create an RMI registry on the specified port (connectJMXPort). This registry is used by the JMXConnectorServer to store the RMI objects corresponding to the MBeans in the JMX registry. This allows the JMX clients to look up and access the MBeans on the PlatformMBeanServer.

LocateRegistry.createRegistry(connectJMXPort);

  • Retrieve the PlatformMBeanServer and construct the JMXServiceURL which is in the format service:jmx:rmi://localhost/jndi/rmi://localhost:<<jmx.port>>/jmxrmi. Create a new JMXConnectorServer instance using the JMXConnectorServerFactory and the JMXServiceURL and start the JMXConnectorServer instance.
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
String jmxServiceURL = String.format(JMX_URL_TEMPLATE, connectJMXPort);
JMXServiceURL url = new JMXServiceURL(jmxServiceURL);
JMXConnectorServer svr = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);
svr.start();

Implement JMX metrics exporter: Create a JMX client to connect to the JMX server, query the MilliSecondBehindSource metric from the JMX server, convert it into the required format, and export it to CloudWatch.

  • Connect to the JMX Server using the JMXConnectorFactory and JMXServiceURL
JMXServiceURL jmxUrl = new JMXServiceURL(String.format(JMX_URL_TEMPLATE,DebeziumMySqlMetricsConnector.getConnectJMXPort()));
JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null);
jmxConnector.connect();
  • Query the MBean object that holds the corresponding metric, for example, MilliSecondsBehindSource, and retrieve the metric value using sample code provided in msk-connect-custom-plugin-jmx. (you can choose one or more metrics).
  • Schedule the execution of your JMX metrics exporter at regular intervals.

getScheduler().schedule(new JMXMetricsExporter(), SCHEDULER_INITIAL_DELAY, SCHEDULER_PERIOD);

Export metrics to CloudWatch: Implement the logic to push relevant JMX metrics to CloudWatch. You can use the AWS SDK for Java to interact with the CloudWatch PutMetricData API or use the CloudWatch Logs subscription filter to ingest the metrics from a dedicated Kafka topic.

Dimension dimension = Dimension.builder()
.name("DBServerName")
.value(DebeziumMySqlMetricsConnector.getDatabaseServerName())
.build();
MetricDatum datum = MetricDatum.builder()
	     .metricName("MilliSecondsBehindSource")
	     .unit(StandardUnit.NONE)
	     .value(Double.valueOf(msBehindSource))
	     .timestamp(instant)
	     .dimensions(dimension).build();
PutMetricDataRequest request = PutMetricDataRequest.builder()
	  .namespace(DebeziumMySqlMetricsConnector.getCWNameSpace())
	  .metricData(datum).build();
cw.putMetricData(request);

For more information, see the sample implementation for the custom module in aws-samples in GitHub. This sample also provides custom plugins packaged with two different versions of Debezium MySQL connector (debezium-connector-mysql-2.5.2.Final-plugin and debezium-connector-mysql-2.7.3.Final-plugin) and the following steps would explain the steps to build a custom plugin using your custom code.

2. Package the custom module and Debezium MySQL connector as a custom plugin

Build and package the Maven project with the custom code as a JAR file and include the JAR file in the debezium-connector-mysql-2.5.2.Final-plugin folder downloaded from maven repo. Package the updated debezium-connector-mysql-2.5.2.Final-plugin as a ZIP file (Amazon MSK Connect accepts custom plugins in ZIP or JAR format). Alternatively, you can use the prebuiltcustom-debezium-mysql-connector-plugin.zip available in GitHub.

Choose the Debezium connector version (2.5 or 2.7) that fits your requirement.

When you have to upgrade to a new version of the Debezium MySQL connector, you can update the version of the dependency and build the custom module and deploy it. By doing this, you can maintain the custom plugin without modifying the original connector code. The GitHub samples provide ready-to-use plugins for two Debezium connector versions. However, you can follow the same approach to upgrade to the latest connector version as well.

Create a custom plugin in Amazon MSK

  1. If you have set up your AWS resources by following the Getting Started lab, open Amazon S3 console and locate the bucket msk-lab-${ACCOUNT_ID}-plugins-bucket/debezium .
  2. Upload the custom plugin created in the previous section custom-debezium-mysql-connector-plugin.zip to msk-lab-${ACCOUNT_ID}-plugins-bucket/debezium, as shown in the following figure.

msk-lab-s3-plugin-bucket

  1. Switch to the Amazon MSK console and choose Custom plugins in the navigation pane. Choose Create custom plugin and, browse the S3 bucket that you created above and select the custom plugin ZIP file you just uploaded.

custom-connector-plugin-s3-object

  1. Enter custom-debezium-mysql-connector-plugin for the plugin name. Optionally, enter a description and choose Create Custom Plugin.

msk-connect-create-custom-plugin-console

  1. After a few seconds you should see the plugin is created and the status is Active.
  2. Customize the worker configuration for the connector by following the instructions in the Customize worker configuration lab.

3. Create an Amazon MSK connector

The next step is to create an MSK connector.

  1. From the MSK section choose Connectors, then choose Create connector. Choose custom-debezium-mysql-connector-plugin from the list of Custom plugins, then choose Next.

msk-plugin-create

  1. Enter custom-debezium-mysql-connector in the Name textbox, and a description for the connector.

connector-properties-console-in-MSK-connect

  1. Select the MSKCluster-msk-connect-lab from the listed MSK clusters. From the Authentication dropdown, select IAM.
  2. Copy the following configuration and paste it in the connector configuration textbox.
  • Replace the <Your Aurora MySQL database endpoint>, <Your Database Password>, <Your MSK Bootstrap Server Address>, and <Your CloudWatch Region>placeholders with the corresponding details for the resources in your account.
  • Review the topic.prefix, database.user, topic.prefix, database.server.id, database.server.name, database.port, database.include.listparameters in the configuration. These parameters are configured with the values used in the workshop. Update them with the details corresponding to your configuration if you have customized it in your account.
  • Note that the connector.classparameter is updated with the qualified name of the subclass of MySqlConnector class that you created in the custom module.
  • The connect.jmx.portparameter specifies the default port to start the JMX server. You can configure this to any available port.
connector.class=com.amazonaws.msk.debezium.mysql.connect.DebeziumMySqlMetricsConnector tasks.max=1
include.schema.changes=true
topic.prefix=salesdb
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
database.user=master
database.server.id=123456
database.server.name=salesdb
database.port=3306
key.converter.schemas.enable=false
database.hostname=<Your Aurora MySQL database endpoint>
database.password=<Your Database Password>
value.converter.schemas.enable=false
database.include.list=salesdb
schema.history.internal.kafka.topic=internal.dbhistory.salesdb
schema.history.internal.kafka.bootstrap.servers=<Your MSK Bootstrap Server Address>
schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.security.protocol=SASL_SSL
schema.history.internal.producer.security.protocol=SASL_SSL
connect.jmx.port=7098
cloudwatch.namespace.name=MSK_Connect
cloudwatch.region=<Your CloudWatch Region>

connector-properties-configuration-settings

5. Follow the remaining instructions from the Create MSK Connector lab and create the connector. Verify that the connector status changes to Running.

Debezium MySQL custom connector version (2.7.3) provides additional flexibility to configure optional properties that can be added to your MSK connector configuration and selectively include and exclude metrics to emit to CloudWatch. The following are the example configuration properties that can be used with version 2.7.3 :

  • cloudwatch.debezium.streaming.metrics.include – A comma-separated list of streaming metrics type that must be exported to CloudWatch as custom metrics.
  • cloudwatch.debezium.streaming.metrics.exclude – Specify a comma-separated list of streaming metrics types to exclude from being sent to CloudWatch as custom metrics.
  • Similarly include and exclude properties for snapshot metrics type are cloudwatch.debezium.snapshot.metrics.include and cloudwatch.debezium.snapshot.metrics.exclude
  • Include and exclude properties for schema history metrics type are cloudwatch.debezium.schema.history.metrics.include and cloudwatch.debezium.schema.history.metrics.exclude

The following is a sample configuration excerpt.

  "cloudwatch.debezium.streaming.metrics.include": "LastTransactionId, TotalNumberOfEventsSeen, MilliSecondsBehindSource,CapturedTables",
  "cloudwatch.debezium.streaming.metrics.exclude": "LastTransactionId",
  "cloudwatch.debezium.schema.history.metrics.exclude": "MilliSecondsSinceLastAppliedChange",

Review the GitHub README file for more details on the use of these properties with MSK connector configurations.

Verify the replication in the Kafka cluster and CloudWatch metrics

Follow the instructions in the Verify the replication in the Kafka cluster lab to set up a client and make changes to the source DB and verify that the changes are captured and sent to Kafka topics by the connector.

To verify that the connector has published the JMX metrics to CloudWatch, go to the CloudWatch console and choose Metrics in the navigation pane, then choose All Metrics. Under Custom namespace, you can see MSK_Connect with database name as the dimension. Select the database name to view the metrics.

Amazon CloudWatch interface with time series graph and MSK Connect metric details

Select the MilliSecondBehindSource metric with statistic as Average in the Graphed Metric to plot the graph. You can verify that the MilliSecondBehindSource metric value is greater than zero whenever any operation is being performed on the source database and returns to zero during the idle time.

 Amazon CloudWatch console showing custom metric visualization with detailed controls and timeline analysis

Clean up

Delete the resources that you created such as the Aurora DB, Amazon MSK Cluster and connectors by following the instructions at Cleanup in the Amazon MSK Connect lab if you have been following along to set up the solution on your account.

Conclusion

In this post, we showed you how to extend the Debezium MySQL connector plugin with an additional module to export the JMX metrics to CloudWatch as custom metrics. As a next step, you can create a CloudWatch alarm to monitor the metrics and take remediation actions when the alarm is triggered. In addition to exporting the JMX metrics to CloudWatch, you can export these metrics to third-party applications such as Prometheus or DataDog using CloudWatch Metric Streams. You can follow a similar approach to export the JMX metrics of other connectors from MSK Connect. You can learn more about creating your own connectors by visiting the Connector Developer Guide and how to deploy them as custom plugins in the MSK Connect documentation.


About the authors

Jaydev NathJaydev Nath is a Solutions Architect at AWS, where he works with ISV customers to build secure, scalable, reliable, and cost-efficient cloud solutions. He brings strong expertise in building SaaS architecture on AWS with a focus on Generative AI and data analytics technologies to help deliver practical, valuable business outcomes for customers.

David John Chakram is a Principal Solutions Architect at AWS. He specializes in building data platforms and architecting seamless data ecosystems. With a profound passion for databases, data analytics, and machine learning, he excels at transforming complex data challenges into innovative solutions and driving businesses forward with data-driven insights.

Sharmila Shanmugam is a Solutions Architect at Amazon Web Services. She is passionate about solving the customers’ business challenges with technology and automation and reduce the operational overhead. In her current role, she helps customers across industries in their digital transformation journey and build secure, scalable, performant and optimized workloads on AWS.

Enhance Amazon EMR observability with automated incident mitigation using Amazon Bedrock and Amazon Managed Grafana

Post Syndicated from Yu-Ting Su original https://aws.amazon.com/blogs/big-data/enhance-amazon-emr-observability-with-automated-incident-mitigation-using-amazon-bedrock-and-amazon-managed-grafana/

Maintaining high availability and quick incident response for Amazon EMR clusters is important in data analytics environments. In this post, we show you how to build an automated observability system that combines Amazon Managed Grafana with Amazon Bedrock to detect and remediate EMR cluster issues. We demonstrate how to integrate real-time monitoring with AI-powered remediation suggestions, combining Amazon Managed Grafana for visualization, Amazon Bedrock for intelligent response recommendations, and AWS Systems Manager for automated remediation actions on Amazon Web Services (AWS).

Solution overview

This solution helps you improve EMR cluster observability through a comprehensive four-layer architecture—comprising monitoring, notification, remediation, and knowledge management—to provide the following features:

  • Real-time monitoring of EMR clusters using Amazon Managed Service for Prometheus and Amazon Managed Grafana
  • Automated first-aid remediation through Systems Manager
  • AI-powered incident response suggestions using Amazon Bedrock
  • Integration with the AWS Premium Support knowledge base
  • Historical incident data archival and analysis

The implementation of this architecture delivers the following key benefit:

  • Reduced Mean time to resolution (MTTR)
  • Proactive incident prevention
  • Automated first-response actions
  • Knowledge base enrichment through machine learning

The following diagram illustrates the solution architecture.

End-to-end AWS monitoring solution diagram integrating Knowledge Center, Support, CloudWatch metrics with EventBridge rules and Lambda processing

The architecture comprises the following core components:

  • Monitoring layer – The monitoring layer uses Amazon Managed Service for Prometheus and Amazon CloudWatch to capture real-time metrics from EMR clusters. Amazon Managed Grafana serves as the visualization layer, offering comprehensive dashboards for Apache YARN, HDFS, Apache HBase, and Apache Hudi performance monitoring. Advanced alerting mechanisms trigger notifications based on predefined query results.
  • Notification layer – To provide timely and reliable alert delivery, the notification layer uses Amazon Simple Notification Service (Amazon SNS) for distribution and Amazon Simple Queue Service (Amazon SQS) for message queuing. This architecture prevents message delays and provides a robust trigger mechanism for AWS Lambda functions.
  • Remediation layer – The remediation layer enables automatic issue resolution through:
    • Lambda functions for orchestration
    • Systems Manager for script execution
    • Amazon Bedrock (amazon.nova-lite-v1:0) for generating intelligent response recommendations
  • Knowledge management layer – To maintain an up-to-date knowledge base, the solution:

We provide an AWS CloudFormation template to deploy the solution resources.

Prerequisites

Before starting this walkthrough, make sure you have access to the following AWS resources and configurations:

  • An AWS account
  • Access to the US East (N. Virginia) AWS Region
    • Add access to Amazon Bedrock foundation models (amazon.nova-lite-v1:0)

  • Amazon EMR version 6.15.0 (used in this demo)
  • Archived technical or troubleshooting articles
  • AWS IAM Identity Center enabled with at least one role that can become a Grafana administrator
  • (Optional) AWS Premium Support with a business support plan or higher for enhanced troubleshooting capabilities

Throughout this walkthrough, we provide detailed instructions to set up and configure these prerequisites if you haven’t already done so.

Configure resources using AWS CloudFormation

Complete the following steps to configure your resources:

  1. Launch the CloudFormation stack:

launch stack

  1. Provide emrobservability as the stack name.
  2. Select a virtual private cloud (VPC) and assign a public subnet.
  3. For EMRClusterName, enter a name for your cluster (default: emrObservability).
  4. Enter an existing Amazon S3 location as the Apache HBase root directory location (for example, s3://mybucket/my/hbase/rootdir/).
  5. For MasterInstanceType and CoreInstanceType, enter your instance types (default: m5.xlarge for both).
  6. For CoreInstanceCount, enter your instance count (default: 2).
  7. For SSHIPRange, use CheckIp and enter your IP (for example, 10.1.10/32).
  8. Choose the release label (default: 6.15.0).
  9. For KeyName, enter a key name to SSH to Amazon Elastic Compute Cloud (Amazon EC2) instances.
  10. For LatestAmiId, enter your AMI (default: /aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2).
  11. For KBS3Bucket, enter a name for your S3 bucket (for example, mykbbucket).
  12. For SubscriptionEndpoint, enter an email address to receive notifications and responses (for example, [email protected]).

Accept subscription confirmation

Accept the subscription confirmation sent to the email address you specified in the CloudFormation stack parameters. The following screenshot shows an example of the email you receive.

AWS email confirmation for SNS topic subscription to QA Lambda function responses with opt-out instructions

Prepare the knowledge base

Complete the following steps to populate the S3 bucket with archived technical articles and cases:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose the function CustomFunctionCopyKCArticlesToS3Bucket.

AWS Lambda console displaying Functions page with CustomFunctionCopyKCArticlesToS3Bucket function details

  1. Manually invoke the function by choosing Test on the Test tab.

AWS Lambda Test tab interface with event configuration options

  1. Verify successful execution by checking the CloudWatch logs.

AWS Lambda successful function execution result with null output

  1. Repeat the process for the Lambda function CustomFunctionCopyCasesToS3Bucket.

Lambda function interface displaying CustomFunctionCopyCasesToS3Bucket configuration with CloudFormation ID and description panel

AWS Lambda test interface showing Test event configuration options and action buttons

AWS Lambda function execution success message with null response and SHA-256 code

  1. Confirm the S3 bucket has been populated with archived technical articles and cases.

Amazon S3 bucket interface showing two folders with action buttons and search functionality

Sync data to the Amazon Bedrock knowledge base

Complete the following steps to sync the data to your knowledge base:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose the function KBDataSourceSync.

AWS Lambda console displaying filtered functions with CloudFormation tags, Python runtime versions, and modification timestamps

  1. Manually invoke the function by choosing Test on the Test tab.

This task might take 10–15 minutes to complete.

AWS Lambda console test configuration panel with CloudWatch integration and event creation controls

  1. Verify successful execution by checking the CloudWatch logs.

Lambda function execution results showing successful completion status and details

Configure your Amazon Managed Grafana workspace

Complete the following steps to configure your Amazon Managed Grafana workspace:

  1. On the Amazon Managed Grafana console, choose Workspaces in the navigation pane.
  2. Open your workspace.
  3. Choose Assign new user or group.

Amazon Grafana workspace showing IAM configuration notice and user assignment button

  1. Select your IAM Identity Center role and choose Assign users and groups.

Amazon Grafana IAM Identity Center user assignment panel with search and selection controls

  1. On the Admin dropdown menu, choose Make admin.

Amazon Grafana user list showing assigned viewer with admin action options

  1. Enable Grafana alerting, then choose Save changes.

Amazon Grafana alerting configuration panel showing disabled status with navigation tabs and edit button

Amazon Grafana configuration panel showing enabled alerting and plugin management settings

  1. Wait 10 minutes for the workspace to become active.
  2. When it’s active, sign in to the Grafana workspace. (For more information, refer to Connect to your workspace.)

Configure data sources

Add and configure the following data sources:

  1. For Service, choose CloudWatch, then select your Region and add CloudWatch as a data source.

  1. Choose Amazon Managed Service for Prometheus as a second data source and select your Region.

  1. Validate CloudWatch connectivity:
    1. Run test queries (for example, Namespace: AWS/EC2, Metric name: CPUUtilization, Statistic: Maximum).
      Amazon Managed Gragana interface showing CPU utilization query setup for EC2 instance.
    2. Verify CloudWatch metric retrieval.
      Line graph showing CPU utilization over time with peak at 40%.
  1. Validate Amazon Managed Service for Prometheus connectivity:
    1. Run test queries (for example, Metric: hadoop_hbase_numregionservers, Label filters: cluster_id = <Amazon EMR cluster ID>).
      Amazon Managed Grafana query interface showing Hadoop HBase metric configuration.
    2. Verify Prometheus metric retrieval.
      Amazon Managed Grafana monitoring dashboard showing a graph with HBase Region Server amount from 0 to 2

Confirm SNS notification channels

Complete the following steps to confirm your SNS notification is set up:

  1. On the Amazon SNS console, choose Topics in the navigation pane.
  2. Locate and note the ARNs for -LambdaFunctionTopic and -QALambdaFunctionTopic.

AWS SNS Topics list showing 4 topics with names, types, and ARNs

AWS SNS Topics console showing filtered search results for "LambdaFunctionTopic"

AWS SNS Topics console showing filtered search results for "QALambdaFunctionTopic"

  1. Choose Contact points under Alerting.

  1. Create the first contact point:
    1. For Name, enter SNS_SSM.
    2. For Integration, choose AWS SNS.
    3. For Topic, enter the ARN for LambdaFunctionTopic.
    4. For Auth Provider, choose Workspace IAM role.
    5. For Alert Message format, choose JSON.

  1. Create the second contact point:
    1. For Name, enter SNS_QA.
    2. For Integration, choose AWS SNS.
    3. For Topic, enter the ARN for QALambdaFunctionTopic.
    4. For Auth Provider, choose Workspace IAM role.
    5. For Alert Message format, choose JSON.

Create alert rules

Complete the following steps to set up two critical alert rules:

  1. Choose Alert rules under Alerting.

  1. Set up alerting if the Apache HBase region server status is abnormal:
    1. For Alert name, enter HBase region server down.
    2. For Data source, choose Amazon Managed Service for Prometheus.
    3. For Metric, choose hadoop_hbase_numregionservers.
      Alert rule configuration interface for HBase region server monitoring
    4. For Threshold, configure to alert if the region server count is less than 2 for 3 minutes.
      Amazon Managed Grafana alert rule configuration interface with expressions setup
    5. For Evaluation interval, set to 1 minute.
      New evaluation group creation modal showing P0_RegionServer name input and 1m interval settingHBase alert configuration panel showing P0_RegionServer group and 3m pending period
    6. For Contact point, choose SNS_SSM.
      Amazon Managed Grafana alert configuration interface showing labels and notifications setup with AWS SNS integration
  1. Create a second alert for if Amazon EC2 CPU utilization is abnormal:
    1. For Alert name, enter EC2 CPU utilization too high.
    2. For Data source, choose Amazon CloudWatch.
    3. For Namespace, choose AWS/EC2.
    4. For Metric name, choose CPUUtilization
    5. For Statistic, choose Maximum.
      Amazon CloudWatch query interface for setting up EC2 CPU utilization alert conditions
    6. For Threshold, configure to alert if CPU utilization is more than 95% for 3 minutes.
      Amazon Managed Grafana alert interface with Reduce and Threshold expressions for alert condition management
    7. For Evaluation interval, configure to 1 minute.
      New evaluation group configuration modal showing CPU utilization monitoring setup with 1-minute interval
      AWS Managed Grafana alert rule configuration screen showing evaluation behavior settings
    8. For Contact point, choose SNS_QA.Amazon Managed Grafana alert configuration showing customizable labels, contact point selection for SNS_QA integration
  1. On the alert rule creation page, scroll to 5. Add annotations and for Summary, add a clear description of the alert, for example, CPU utilization on EC2 instance is too high.

Alert configuration summary field with "CPU utilization on EC2 instance is too high" warning message

Apache HBase region server incident test

To confirm the system is working as expected, complete the following Apache HBase region server incident test:

  1. SSH into an EMR core instance.
  2. Stop the Apache HBase region server using systemctl:
 # Stop HBase region server service 
 sudo systemctl stop hbase-regionserver.service 

  1. Verify the service status:
 # Check the current state of HBase region server service 
 sudo systemctl status hbase-regionserver.service
  1. Observe Amazon Managed Grafana alert progression:
    1. Monitor alert status changes.
      Alert dashboard showing HBase region server alert status in pending state
      Alert dashboard showing HBase region server alert in firing state
    2. Verify SNS message generation.
    3. Confirm SQS message queuing.
    4. Track the Lambda function triggered for remediation.

Terminal output showing HBase RegionServer service status and daemon processes

HBase monitoring interface displaying region server status with health indicators and action buttons

CPU utilization stress test

Complete the following CPU utilization stress test:

  1. SSH into the EMR primary instance.
  2. Install stress testing tools:
 sudo amazon-linux-extras install epel -y
 sudo yum install stress -y 

  1. Verify the installation:
 stress --version 

  1. Generate high CPU load using the stress command and the following command structure:
 sudo stress [options] 

For our Amazon EMR test, use the following command:

 # For m5.xlarge instances (4 vCPUs) sudo stress --cpu 4 

-c 4 in the command creates 4 CPU-bound processes (one for each vCPU).The following are instance type vCPUs for your reference:

  • m5.xlarge: 4 vCPUs
  • m5.2xlarge: 8 vCPUs
  • m5.4xlarge: 16 vCPUs
  1. Monitor system response:
    1. Observe Amazon Managed Grafana alert status changes.
      Amazon Managed Grafana dashboard header showing rules status
    2. Verify Amazon Bedrock recommendation generation.
    3. Check SNS email notification delivery.
      AWS SNS notification email showing troubleshooting steps for high CPU usageCode snippet showing CPU usage troubleshooting steps in red text

Best practices and considerations

Monitoring infrastructure requires precise alert prioritization and threshold configuration. Alert aggregation techniques prevent notification overload by consolidating event streams and reducing redundant alerts. Operational teams must maintain dashboards through consistent updates and metric integration, providing real-time visibility into system performance and health.

Security implementations focus on least-privilege AWS Identity and Access Management (IAM) roles, restricting access to critical resources and minimizing potential breach vectors. Data protection strategies involve encryption protocols for information at rest and in transit, using AES-256 standards. Automated security audit processes scan automation scripts, identifying potential vulnerabilities through code analysis and runtime inspection.

Performance optimization in serverless architectures uses Lambda extensions to cache knowledge base content, reducing latency and improving response times. Retry mechanisms for API calls implement exponential backoff strategies, mitigating transient network exceptions and enhancing system resilience. Execution time monitoring of Lambda functions enables detection of anomalies through statistical analysis, providing insights into potential system-wide incidents or performance degradations.

Clean up

To avoid incurring future charges, delete the resources by deleting the parent stack on the AWS CloudFormation console.

Conclusion

This solution provides a robust framework for automated EMR cluster monitoring and incident response. By combining real-time monitoring with AI-powered remediation suggestions and automated execution, organizations can significantly reduce MTTR for common Amazon EMR issues while building a knowledge base for future incident response.

Try out this solution for your own use case, and leave your feedback in the comments section.


About the authors

Author Yu-ting Su, Sr. Hadoop System Engineer, AWS Support Engineering. Yu-Ting is a Sr. Hadoop Systems Engineer at Amazon Web Services (AWS). Her expertise is in Amazon EMR and Amazon OpenSearch Service. She’s passionate about distributing computation and helping people to bring their ideas to life.

Build data pipelines with dbt in Amazon Redshift using Amazon MWAA and Cosmos

Post Syndicated from Cindy Li original https://aws.amazon.com/blogs/big-data/build-data-pipelines-with-dbt-in-amazon-redshift-using-amazon-mwaa-and-cosmos/

Effective collaboration and scalability are essential for building efficient data pipelines. However, data modeling teams often face challenges with complex extract, transform, and load (ETL) tools, requiring programming expertise and a deep understanding of infrastructure. This complexity can lead to operational inefficiencies and challenges in maintaining data quality at scale.

dbt addresses these challenges by providing a simpler approach where data teams can build robust data models using SQL, a language they’re already familiar with. When integrated with modern development practices, dbt projects can use version control for collaboration, incorporate testing for data quality, and utilize reusable components through macros. dbt also automatically manages dependencies, making sure data transformations execute in the correct sequence.

In this post, we explore a streamlined, configuration-driven approach to orchestrate dbt Core jobs using Amazon Managed Workflows for Apache Airflow (Amazon MWAA) and Cosmos, an open source package. These jobs run transformations on Amazon Redshift, a fully managed data warehouse that enables fast, scalable analytics using standard SQL. With this setup, teams can collaborate effectively while maintaining data quality, operational efficiency, and observability. Key steps covered include:

  • Creating a sample dbt project
  • Enabling auditing within the dbt project to capture runtime metrics for each model
  • Creating a GitHub Actions workflow to automate deployments
  • Setting up Amazon Simple Notification Service (Amazon SNS) to proactively alert on failures

These enhancements enable model-level auditing, automated deployments, and real-time failure alerts. By the end of this post, you will have a practical and scalable framework for running dbt Core jobs with Cosmos on Amazon MWAA, so your team can ship reliable data workflows faster.

Solution overview

The following diagram illustrates the solution architecture.

The workflow contains the following steps:

  1. Analytics engineers manage their dbt project in their version control tool. In this post, we use GitHub as an example.
  2. We configure an Apache Airflow Directed Acyclic Graph (DAG) to use the Cosmos library to create an Airflow task group that contains all the dbt models as part of the dbt project.
  3. We use a GitHub Actions workflow to sync the dbt project files and the DAG to an Amazon Simple Storage Service (Amazon S3) bucket.
  4. During the DAG run, dbt converts the models, tests, and macros to Amazon Redshift SQL statements, which run directly on the Redshift cluster.
  5. If a task in the DAG fails, the DAG invokes an AWS Lambda function to send out a notification using Amazon SNS.

Prerequisites

You must have the following prerequisites:

Create a dbt project

A dbt project is structured to facilitate modular, scalable, and maintainable data transformations. The following code is a sample dbt project structure that this post will follow:

MY_SAMPLE_DBT_PROJECT
├── .github
│   └── workflows
│       └── publish_assets.yml
└── src
    ├── dags
    │   └── dbt_sample_dag.py
    └── my_sample_dbt_project
        ├── macros
        ├── models
        └── dbt_project.yml

dbt uses the following YAML files:

  • dbt_project.yml –  Serves as the main configuration for your project. Objects in this project will inherit settings defined here unless overridden at the model level. For example:
# Name your project! Project names should contain only lowercase characters
# and underscores. 
name: 'my_sample_dbt_project'
version: '1.0.0'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. 
model-paths: ["models"]
macro-paths: ["macros"]

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
  my_sample_dbt_project:
    # Config indicated by + and applies to files under models/example/
    example:
      +materialized: view
      
on-run-end:
# add run results to audit table 
  - "{{ log_audit_table(results) }}" 
  • sources.yml – Defines the external data sources that your dbt models will reference. For example:
sources:
  - name: sample_source
    database: sample_database
    schema: sample_schema
    tables:
      - name: sample_table
  • schema.yml – Outlines the schema of your models and data quality tests. In the following example, we have defined two columns, full_name for the model model1 and sales_id for model2. We have declared them as the primary key and defined data quality tests to check if the two columns are unique and not null.
version: 2

models:
  - name: model1
    config: 
      contract: {enforced: true}

    columns:
      - name: full_name
        data_type: varchar(100)
        constraints:
          - type: primary_key
        tests:
          - unique
          - not_null

  - name: model2
    config: 
      contract: {enforced: true}

    columns:
      - name: sales_id
        data_type: varchar(100)
        constraints:
          - type: primary_key
        tests:
          - unique
          - not_null

Enable auditing within dbt project

Enabling auditing within your dbt project is crucial for facilitating transparency, traceability, and operational oversight across your data pipeline. You can capture run metrics at the model level for each execution in an audit table. By capturing detailed run metrics such as load identifier, runtime, and number of rows affected, teams can systematically monitor the health and performance of each load, quickly identify issues, and trace changes back to specific runs.

The audit table consists of the following attributes:

  • load_id – An identifier for each model run executed as part of the load
  • database_name – The name of the database within which data is being loaded
  • schema_name – The name of the schema within which data is being loaded
  • name – The name of the object within which data is being loaded
  • resource_type – The type of object to which data is being loaded
  • execution_time – The time duration taken for each dbt model to complete execution as part of each load
  • rows_affected – The number of rows affected in the dbt model as part of the load

Complete the following steps to enable auditing within your dbt project:

  1. Navigate to the models directory (src/my_sample_dbt_project/models) and create the audit_table.sql model file:
{%- set run_date = "CURRENT_DATE" -%}
{{
    config(
        materialized='incremental',
        incremental_strategy='append',
        tags=["audit"]
    )
}}

with empty_table as (
    select
        'test_load_id'::varchar(200) as load_id,
        'test_invocation_id'::varchar(200) as invocation_id,
        'test_database_name'::varchar(200) as database_name,
        'test_schema_name'::varchar(200) as schema_name,
        'test_model_name'::varchar(200) as name,
        'test_resource_type'::varchar(200) as resource_type,
        'test_status'::varchar(200) as status,
        cast('12122012' as float) as execution_time,
        cast('100' as int) as rows_affected,
        {{run_date}} as model_execution_date
)

select * from empty_table
-- This is a filter so we will never actually insert these values
where 1 = 0
  1. Navigate to the macros directory (src/my_sample_dbt_project/macros) and create the parse_dbt_results.sql macro file:
{% macro parse_dbt_results(results) %}
    -- Create a list of parsed results
    {%- set parsed_results = [] %}
    -- Flatten results and add to list
    {% for run_result in results %}
        -- Convert the run result object to a simple dictionary
        {% set run_result_dict = run_result.to_dict() %}
        -- Get the underlying dbt graph node that was executed
        {% set node = run_result_dict.get('node') %}
        {% set rows_affected = run_result_dict.get(
        'adapter_response', {}).get('rows_affected', 0) %}
        {%- if not rows_affected -%}
            {% set rows_affected = 0 %}
        {%- endif -%}
        {% set parsed_result_dict = {
                'load_id': invocation_id ~ '.' ~ node.get('unique_id'),
                'invocation_id': invocation_id,
                'database_name': node.get('database'),
                'schema_name': node.get('schema'),
                'name': node.get('name'),
                'resource_type': node.get('resource_type'),
                'status': run_result_dict.get('status'),
                'execution_time': run_result_dict.get('execution_time'),
                'rows_affected': rows_affected
                }%}
        {% do parsed_results.append(parsed_result_dict) %}
    {% endfor %}
    {{ return(parsed_results) }}
{% endmacro %}
  1. Navigate to the macros directory (src/my_sample_dbt_project/macros) and create the log_audit_table.sql macro file:
{% macro log_audit_table(results) %}
    -- depends_on: {{ ref('audit_table') }}
    {%- if execute -%}
        {{ print("Running log_audit_table Macro") }}
        {%- set run_date = "CURRENT_DATE" -%}
        {%- set parsed_results = parse_dbt_results(results) -%}
        {%- if parsed_results | length  > 0 -%}
            {% set allowed_columns = ['load_id', 'invocation_id', 'database_name', 
            'schema_name', 'name', 'resource_type', 'status', 'execution_time', 
            'rows_affected', 'model_execution_date'] -%}
            {% set insert_dbt_results_query -%}
                insert into {{ ref('audit_table') }}
                    (
                        load_id,
                        invocation_id,
                        database_name,
                        schema_name,
                        name,
                        resource_type,
                        status,
                        execution_time,
                        rows_affected,
                        model_execution_date
                ) values
                    {%- for parsed_result_dict in parsed_results -%}
                        (
                            {%- for column, value in parsed_result_dict.items() %}
                                {% if column not in allowed_columns %}
                                    {{ exceptions.raise_compiler_error("Invalid
                                     column") }}
                                {% endif %}
                                {% set sanitized_value = value | replace("'", "''") %}
                                '{{ sanitized_value }}'
                                {%- if not loop.last %}, {% endif %}
                            {%- endfor -%}
                        )
                        {%- if not loop.last %}, {% endif %}
                    {%- endfor -%}
            {%- endset -%}
            {%- do run_query(insert_dbt_results_query) -%}
        {%- endif -%}
    {%- endif -%}
    {{ return ('') }}
{% endmacro %}
  1. Append the following lines to the dbt_project.yml file:
on-run-end:
  - "{{ log_audit_table(results) }}" 

Create a GitHub Actions workflow

This step is optional. If you prefer, you can skip it and instead upload your files directly to your S3 bucket.

The following GitHub Actions workflow automates the deployment of dbt project files and DAG file to Amazon S3. Replace the placeholders {s3_bucket_name}, {account_id}, {role_name}, and {region} with your S3 bucket name, account ID, IAM role name, and AWS Region in the workflow file.

To enhance security, it’s recommended to use OpenID Connect (OIDC) for authentication with IAM roles in GitHub Actions instead of relying on long-lived access keys.

name: Sync dbt Project with S3

on:
  workflow_dispatch:
  push:
    branches: [ main ]
    paths:
      - "src/**"

permissions:
  id-token: write   # This is required for requesting the JWT
  contents: read    # This is required for actions/checkout
  pull-requests: write

jobs:
  sync-dev:
    runs-on: ubuntu-latest
    environment: dev
    defaults:
      run:
        shell: bash
    steps:
      - uses: actions/checkout@v4
      - name: Assume AWS IAM Role
        uses: aws-actions/[email protected]
        with:
          aws-region: {region}
          role-to-assume: arn:aws:iam::{account_id}:role/{role_name}
          role-session-name: my_sample_dbt_project_${{ github.run_id }}
          role-duration-seconds: 3600 # 1 hour

      - run: aws sts get-caller-identity

      - name: Sync dbt Model files
        id: dbt_project_files
        working-directory: src/my_sample_dbt_project
        run: aws s3 sync . s3://{s3_bucket_name}/dags/dbt/my_sample_dbt_project 
        --delete
        continue-on-error: false

      - name: Sync DAG files
        id: dag_file
        working-directory: src/dags
        run: aws s3 sync . s3://{s3_bucket_name}/dags

GitHub has the following security requirements:

  • Branch protection rules – Before proceeding with the GitHub Actions workflow, make sure branch protection rules are in place. These rules enforce required status checks before merging code into protected branches (such as main).
  • Code review guidelines – Implement code review processes to make sure changes undergo review. This can include requiring at least one approving review before code is merged into the protected branch.
  • Incorporate security scanning tools – This can help detect vulnerabilities in your repository.

Make sure you are also adhering to dbt-specific security best practices:

  • Pay attention to dbt macros with variables and validate their inputs.
  • When adding new packages to your dbt project, evaluate their security, compatibility, and maintenance status to make sure they don’t introduce vulnerabilities or conflicts into your project.
  • Review dynamically generated SQL to safeguard against issues like SQL injection.

Update the Amazon MWAA instance

Complete the following steps to update the Amazon MWAA instance:

  1. Install the Cosmos library on Amazon MWAA by adding astronomer-cosmos in the requirements.txt file. Make sure to check for version compatibility for Amazon MWAA and the Cosmos library.
  2. Add the following entries in your startup.sh script:
    1. In the following code, DBT_VENV_PATH specifies the location where the Python virtual environment for dbt will be created. DBT_PROJECT_PATH points to the location of your dbt project inside Amazon MWAA.
      #!/bin/sh
      export DBT_VENV_PATH="${AIRFLOW_HOME}/dbt_venv"
      export DBT_PROJECT_PATH="${AIRFLOW_HOME}/dags/dbt"

    2. The following code creates a Python virtual environment at the path ${DBT_VENV_PATH} and installs the dbt-redshift adapter to run dbt transformations on Amazon Redshift:
      python3 -m venv "${DBT_VENV_PATH}"
      ${DBT_VENV_PATH}/bin/pip install dbt-redshift

Create a dbt user in Amazon Redshift and store credentials

To create dbt models in Amazon Redshift, you must set up a native Redshift user with the necessary permissions to access source tables and create new tables. It is essential to create separate database users with minimal permissions to follow the principle of least privilege. The dbt user should not be granted admin privileges, instead, it should only have access to the specific schemas required for its tasks.

Complete the following steps:

  1. Open the Amazon Redshift console and connect as an admin (for more details, refer to Connecting to an Amazon Redshift database).
  2. Run the following command in the query editor v2 to create a native user, and note down the values for dbt_user_name and password_value:
    create user {dbt_user_name} password 'sha256|{password_value}';

  3. Run the following commands in the query editor v2 to grant permissions to the native user:
    1. Connect to the database where you want to source tables from and run the following commands:
      grant usage on schema {schema_name} to {dbt_user_name};
      grant select on all tables in schema {schema_name} to {dbt_user_name};

    2. To allow the user to create tables within a schema, run the following command:
      grant create on schema {schema_name} to {dbt_user_name};

  4. Optionally, create a secret in AWS Secrets Manager and store the values for dbt_user_name and password_value from the previous step as plaintext:
{
    "username":"dbt_user_name",
    "password":"password_value"
}

Creating a Secrets Manager entry is optional, but recommended for securely storing your credentials instead of hardcoding them. To learn more, refer to AWS Secrets Manager best practices.

Create a Redshift connection in Amazon MWAA

We create one Redshift connection in Amazon MWAA for each Redshift database, making sure that each data pipeline (DAG) can only access one database. This approach provides distinct access controls for each pipeline, helping prevent unauthorized access to data. Complete the following steps:

  1. Log in to the Amazon MWAA UI.
  2. On the Admin menu, choose Connections.
  3. Choose Add a new record.
  4. For Connection Id, enter a name for this connection.
  5. For Connection Type, choose Amazon Redshift.
  6. For Host, enter the endpoint of the Redshift cluster without the port and database name (for example, redshift-cluster-1.xxxxxx.us-east-1.redshift.amazonaws.com).
  7. For Database, enter the database of the Redshift cluster.
  8. For Port, enter the port of the Redshift cluster.

Set up an SNS notification

Setting up SNS notifications is optional, but they can be a useful enhancement to receive alerts on failures. Complete the following steps:

  1. Create an SNS topic.
  2. Create a subscription to the SNS topic.
  3. Create a Lambda function with the Python runtime.
  4. Modify the function code in your Lambda function, and replace {topic_arn} with your SNS topic Amazon Resource Name (ARN):
import json

sns_client = boto3.client('sns')

def lambda_handler(event, context):
     try:
        # Extract DAG name from event
        failed_dag = event['dag_name']
        
        # Send notification 
        sns_client.publish(
            TopicArn={topic_arn}, 
            Subject="Data modelling dags - WARNING", 
            Message=json.dumps({'default': json.dumps(f"Data modelling DAG - 
            {failed_dag} has failed, please inform the data modelling team")}),
            MessageStructure='json'
        )
        
    except KeyError as e:
        # Handle missing 'dag_name' in the event
        logger.error(f"KeyError: invalid payload - dag_name not present")

Configure a DAG

The following sample DAG orchestrates a dbt workflow for processing and auditing data models in Amazon Redshift. It retrieves credentials from Secrets Manager, runs dbt tasks in a virtual environment, and sends an SNS notification if a failure occurs. The workflow consists of the following steps:

  1. It starts with the audit_dbt_task task group, which creates the audit model.
  2. The transform_data task group executes the other dbt models, excluding the audit-tagged one. Inside the transform_data group, there are two dbt models, model1 and model2, and each is followed by a corresponding test task that runs data quality tests defined in the schema.yml file.
  3. To properly detect and handle failures, the DAG includes a dbt_check Python task that runs a custom function, check_dbt_failures. This is important because when using DbtTaskGroup, individual model-level failures inside the group don’t automatically propagate to the task group level. As a result, downstream tasks (such as the Lambda operator sns_notification_for_failure) configured with trigger_rule='one_failed' will not be triggered unless a failure is explicitly raised.

The check_dbt_failures function addresses this by inspecting the results of each dbt model and test, and raising an AirflowException if a failure is found. When an AirflowException is raised, the sns_notification_for_failure task is triggered.

  1. If a failure occurs, the sns_notification_for_failure task invokes a Lambda function to send an SNS notification. If no failures are detected, this task is skipped.

The following diagram illustrates this workflow.

Configure DAG variables

To customize this DAG for your environment, configure the following variables:

  • project_name – Make sure the project_name matches the S3 prefix of your dbt project
  • secret_name – Provide the name of the secret that stores dbt user credentials
  • target_database and target_schema – Update these variables to reflect where you want to land your dbt models in Amazon Redshift
  • redshift_connection_id – Set this to match the connection configured in Amazon MWAA for this Redshift database
  • sns_lambda_function_name – Provide the Lambda function name to send SNS notifications
  • dag_name – Provide the DAG name that will be passed to the SNS notification Lambda function
import os
import json
import boto3
from airflow import DAG
from cosmos import (
    DbtTaskGroup, ProfileConfig, ProjectConfig,
    ExecutionConfig, RenderConfig
)
from cosmos.constants import ExecutionMode, LoadMode
from cosmos.profiles import RedshiftUserPasswordProfileMapping
from pendulum import datetime
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.operators.lambda_function import (
    LambdaInvokeFunctionOperator
)
from airflow.exceptions import AirflowException

# project name - should match the s3 prefix of your dbt project
project_name = "my_sample_dbt_project"
# name of the secret that stores dbt user credentials 
secret_name = "dbt_user_credentials_secret"
# target database to land dbt models
target_database = "sample_database"
# target schema to land dbt models
target_schema = "sample_schema"
# Redshift connection name from MWAA
redshift_connection_id = "my_sample_dbt_project_connection"
# sns lambda function name
sns_lambda_function_name = "sns_notification"
# dag name - this will be passed to SNS for notification
payload = json.dumps({
            "dag_name": "my_sample_dbt_project_dag"
        })

Incorporate DAG components

After setting the variables, you can now incorporate the following components to complete the DAG.

Secrets Manager

The DAG retrieves dbt user credentials from Secrets Manager:

sm_client = boto3.client('secretsmanager')

def get_secret(secret_name):
    try:
        get_secret_value_response = sm_client.get_secret_value(SecretId=secret_name)
        return json.loads(get_secret_value_response["SecretString"])
    except Exception as e:
        raise

secret_value = get_secret(secret_name)
username = secret_value["username"]
password = secret_value["password"]

Redshift connection configuration

It uses RedshiftUserPasswordProfileMapping to authenticate:

profile_config = ProfileConfig(
    profile_name="redshift",
    target_name=target_database,
    profile_mapping=RedshiftUserPasswordProfileMapping(
        conn_id=redshift_connection_id,
        profile_args={"schema": target_schema,
                      "user": username, "password": password}
    ),
)

dbt execution setup

This code contains the following variables:

  • dbt executable path – Uses a virtual environment
  • dbt project path – Is located in the environment variable DBT_PROJECT_PATH under your project
execution_config = ExecutionConfig(
    dbt_executable_path=f"{os.environ['DBT_VENV_PATH']}/bin/dbt",
    execution_mode=ExecutionMode.VIRTUALENV,
)

project_config = ProjectConfig(
    dbt_project_path=f"{os.environ['DBT_PROJECT_PATH']}/{project_name}",
)

Tasks and execution flow

This step includes the following components:

  • Audit dbt task group (audit_dbt_task) – Runs the dbt model tagged with audit
  • dbt task group (transform_data) – Runs the dbt models tagged with operations, excluding the audit model

In dbt, tags are labels that you can assign to models, tests, seeds, and other dbt resources to organize and selectively run subsets of your dbt project. In your render_config, you have exclude=["tag:audit"]. This means dbt will exclude models that have the tag audit, because the audit model runs separately.

  • Failure check (dbt_check) – Checks for dbt model failures, raises an AirflowException if upstream dbt tasks fail
  • SNS notification on failure (sns_notification_for_failure) – Invokes a Lambda function to send an SNS notification upon a dbt task failure (for example, a dbt model in the task group)
def check_dbt_failures(**kwargs):
    if kwargs['ti'].state == 'failed':
        raise AirflowException('Failure in dbt task group')

with DAG(
    dag_id="my_sample_dbt_project_dag",
    start_date=datetime(2025, 4, 2),
    schedule_interval="@daily",
    catchup=False,
    tags=["dbt"]
):

    audit_dbt_task = DbtTaskGroup(
        group_id="audit_dbt_task",
        execution_config=execution_config,
        profile_config=profile_config,
        project_config=project_config,
        operator_args={
            "install_deps": True,
        },
        render_config= RenderConfig(
            select=["tag:audit"],
            load_method=LoadMode.DBT_LS
        )
    )

    transform_data = DbtTaskGroup(
        group_id="transform_data",
        execution_config=execution_config,
        profile_config=profile_config,
        project_config=project_config,
        operator_args={
            "install_deps": True,
            # install necessary dependencies before running dbt command
        },
        render_config= RenderConfig(
            exclude=["tag:audit"],
            load_method=LoadMode.DBT_LS
        )
    )

    dbt_check = PythonOperator(
        task_id='dbt_check', 
        python_callable=check_dbt_failures,
        provide_context=True,
    )

    sns_notification_for_failure = LambdaInvokeFunctionOperator(
        task_id="sns_notification_for_failure",
        function_name=sns_lambda_function_name,
        payload=payload,
        trigger_rule='one_failed'
    )

    audit_dbt_task >> transform_data >> dbt_check >> sns_notification_for_failure

The sample dbt orchestrates a dbt workflow in Amazon Redshift, starting with an audit task and followed by a task group that processes data models. It includes a failure handling mechanism that checks for failures and raises an exception to trigger an SNS notification using Lambda if a failure occurs. If no failures are detected, the SNS notification task is skipped.

Clean up

If you no longer need the resources you created, delete them to avoid additional charges. This includes the following:

  • Amazon MWAA environment
  • S3 bucket
  • IAM role
  • Redshift cluster or serverless workgroup
  • Secrets Manager secret
  • SNS topic
  • Lambda function

Conclusion

By integrating dbt with Amazon Redshift and orchestrating workflows using Amazon MWAA and the Cosmos library, you can simplify data transformation workflows while maintaining robust engineering practices. The sample dbt project structure, combined with automated deployments through GitHub Actions and proactive monitoring using Amazon SNS, provides a foundation for building reliable data pipelines. The addition of audit logging facilitates transparency across your transformations, so teams can maintain high data quality standards.

You can use this solution as a starting point for your own dbt implementation on Amazon MWAA. The approach we outlined emphasizes SQL-based transformations while incorporating essential operational capabilities like deployment automation and failure alerting. Get started by adapting the configuration to your environment, and build upon these practices as your data needs evolve.

For more resources, refer to Manage data transformations with dbt in Amazon Redshift and Redshift setup.


About the authors

Cindy Li is an Associate Cloud Architect at AWS Professional Services, specialising in Data Analytics. Cindy works with customers to design and implement scalable data analytics solutions on AWS. When Cindy is not diving into tech, you can find her out on walks with her playful toy poodle Mocha.

Akhil B is a Data Analytics Consultant at AWS Professional Services, specializing in cloud-based data solutions. He partners with customers to design and implement scalable data analytics platforms, helping organizations transform their traditional data infrastructure into modern, cloud-based solutions on AWS. His expertise helps organizations optimize their data ecosystems and maximize business value through modern analytics capabilities.

Joao Palma is a Senior Data Architect at Amazon Web Services, where he partners with enterprise customers to design and implement comprehensive data platform solutions. He specializes in helping organizations transform their data into strategic business assets and enabling data-driven decision making.

Harshana Nanayakkara is a Delivery Consultant at AWS Professional Services, where he helps customers tackle complex business challenges using AWS Cloud technology. He specializes in data and analytics, data governance, and AI/ML implementations.

The Amazon SageMaker lakehouse architecture now automates optimization configuration of Apache Iceberg tables on Amazon S3

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/the-amazon-sagemaker-lakehouse-architecture-now-automates-optimization-configuration-of-apache-iceberg-tables-on-amazon-s3/

As organizations increasingly adopt Apache Iceberg tables for their data lake architectures on Amazon Web Services (AWS), maintaining these tables becomes crucial for long-term success. Without proper maintenance, Iceberg tables can face several challenges: degraded query performance, unnecessary retention of old data that should be removed, and a decline in storage cost efficiency. These issues can significantly impact the effectiveness and economics of your data lake. Regular table maintenance operations help ensure your Iceberg tables remain high performing, compliant with data retention policies, and cost-effective for production workloads. To help you manage your Iceberg tables at scale, AWS Glue automated those Iceberg table maintenance operations: compaction with sort and z-order and snapshots expiration and orphan data management. After the launch of the feature, many customers have enabled automated table optimization through AWS Glue Data Catalog to reduce operational burden.

The Amazon SageMaker lakehouse architecture now automates optimization of Iceberg tables stored in Amazon S3 with catalog-level configuration, optimizing storage in your Iceberg tables and improving query performance. Previously, optimizing Iceberg tables in AWS Glue Data Catalog required updating configurations for each table individually. Now, you can enable automatic optimization for new Iceberg tables with one-time Data Catalog configuration. Once enabled, for any new table or updated table, Data Catalog continuously optimizes tables by compacting small files, removing snapshots, and unreferenced files that are no longer needed.

This post demonstrates an end-to-end flow to enable catalog level table optimization setting.

Prerequisites

The following prerequisites are required to use the new catalog-level table optimizations:

Enable table optimizations at the catalog level

The data lake administrator can enable the catalog-level table optimization on the AWS Lake Formation console. Complete the following steps:

  1. On the AWS Lake Formation console, choose Catalogs in the navigation pane.
  2. Select the catalog to be enabled with catalog-level table optimizations.
  3. Choose Table optimizations tab, and choose Edit in Table optimizations, as shown in the following screenshot.

setup-catalog-level-optimizations

  1. In Optimization options, select Compaction, Snapshot retention, and Orphan file deletion, as shown in the following screenshot.

enable-optimizations

  1. Select an IAM role. Refer to Table optimization prerequisites for permissions.
  2. Choose Grant required permissions.
  3. Choose I acknowledge that expired data will be deleted as part of the optimizers.

After you enable the table optimizations at the catalog level, the configuration is displayed on the AWS Lake Formation console, as shown in the following screenshot.

optimizations-configuration

When you select an Iceberg table registered in the catalog, you can confirm that the table optimizations configuration is inherited from the table view because Configuration source shows catalog, as shown in the following screenshot.

catalog-level-optimizations

The table optimizations history is displayed on the table view. The following result shows one of the compaction runs by the table optimizations.

binpack-compaction-result

The catalog-level table optimizations for all databases and Iceberg tables are now enabled.

Customize setting of table optimizations at both the catalog and table-level

Although the catalog-level optimization applies common settings across all databases and Iceberg tables in your catalog, you might want to apply different strategies for specific Iceberg tables. You can use AWS Glue Data Catalog to enable both catalog-level and table-level optimizations based on specific table characteristics and access patterns. For example, in addition to configuring the catalog-level compaction with the bin-pack strategy for general-purpose Iceberg tables, you can apply the sort strategy at the table-level to tables with frequent range queries on timestamp columns.

This section shows configuring catalog-level and table-specific optimizations through a practical scenario. Imagine a real-time analytics table with frequent write operations that generates more orphan files due to constant metadata updates. Users also run selective queries filtering specific columns, which makes sort-order strategy preferable. Complete the following steps:

  1. Select another Iceberg table in the same catalog as before to configure the table-level optimizations on the AWS Lake Formation console. At this point, the catalog-level table optimizations are configured for this table.
  2. Choose Edit in Optimization configuration, as shown in the following screenshot.

new-optimizations-configuration

  1. In Optimization options, choose Compaction, Snapshot retention, and Orphan file deletion.
  2. In Optimization configuration, choose Customize settings.
  3. Select the same IAM role.
  4. In Compaction configuration, select Sort, as shown in the following screenshot. Also configure 80 files to Minimum input files, which is a threshold of the number of files to trigger the compaction. To configure Sort, a sort order needs to be defined in your Iceberg table. You can define the sort order with Spark SQL such as ALTER TABLE db.tbl WRITE ORDERED BY <columns>.

sort-config

  1. In Snapshot retention configuration and Snapshot deletion run rate, select Specify a custom value in hours. Then, configure 12 hours to the interval between two deletion job runs, as shown in the following screenshot.

snapshot-retention

  1. In Orphan file deletion configuration, configure 1 day to Files under the provided Table Location with a creation time older than this number of days will be deleted if they are no longer referenced by the Apache Iceberg Table metadata.

orphan-deletion

  1. Choose Grant required permissions.
  2. Choose I acknowledge that expired data will be deleted as part of the optimizers.
  3. Choose Save.
  4. The Table optimization tab on the AWS Lake Formation console displays the custom setting of table optimizers. In Compaction, Compaction strategy is configured to sort and Minimum input files is also configured to 80 files. In Snapshot retention, Snapshot deletion run rate is configured to 12 hours. In Orphan file deletion, Orphan files will be deleted after is configured to 1 days, as shown in the following screenshot.

new-table-level-optimizations

The compaction history shows sort as its table-level compaction strategy even if the strategy in the catalog-level is configured to binpack, as shown in the following screenshot.

sort-compaction-result

In this scenario, the table-specific optimizations are configured along with the catalog-level optimizations. Combining the table and catalog-level optimizations means you can more flexibly manage your Iceberg table data deletions and compactions.

Conclusion

In this post, we demonstrated how to enable and manage using Amazon SageMaker lakehouse architecture with AWS Glue Data Catalog’s catalog-level table optimization feature for Iceberg tables. This enhancement significantly simplifies the management of Iceberg tables because you can enable automated maintenance operations across all tables with a single setting. Instead of configuring optimization settings for individual tables, you can now maintain your entire data lake more efficiently, reducing operational overhead while ensuring consistent optimization policies. We recommend enabling catalog-level table optimization to help you maintain a well-organized, high-performing, and cost-effective data lake while freeing up your teams to focus on deriving value from your data.

Try out this feature for your own use case and share your feedback and questions in the comments. To learn more about AWS Glue Data Catalog table optimizer, visit Optimizing Iceberg tables.

Acknowledgment: A special thanks to everyone who contributed to the development and launch of catalog level optimization: Siddharth Padmanabhan Ramanarayanan, Dhrithi Chidananda, Noella Jiang, Sangeet Lohariwala, Shyam Rathi, Anuj Jigneshkumar Vakil, and Jeremy Song.


About the authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services (AWS). He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Noritaka Sekiyama is a Principal Big Data Architect with AWS Analytics services. He’s responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Sandeep Adwankar is a Senior Product Manager at Amazon Web Services (AWS). Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products customers can use to improve how they manage, secure, and access data.

Siddharth Padmanabhan Ramanarayanan is a Senior Software Engineer on the AWS Glue and AWS Lake Formation team, where he focuses on building scalable distributed systems for data analytics workloads. He is passionate about helping customers optimize their cloud infrastructure for performance and cost efficiency.

Boosting search relevance: Automatic semantic enrichment in Amazon OpenSearch Serverless

Post Syndicated from Jon Handler original https://aws.amazon.com/blogs/big-data/boosting-search-relevance-automatic-semantic-enrichment-in-amazon-opensearch-serverless/

Traditional search engines rely on word-to-word matching (referred to as lexical search) to find results for queries. Although this works well for specific queries such as television model numbers, it struggles with more abstract searches. For example, when searching for “shoes for the beach,” a lexical search merely matches individual words “shoes,” “beach,” “for,” and “the” in catalog items, potentially missing relevant products like “water-resistant sandals” or “surf footwear” that don’t contain the exact search terms.

Large language models (LLMs) create dense vector embeddings for text that expand retrieval beyond individual word boundaries to include the context in which words are used. Dense vector embeddings capture the relationship between shoes and beaches by learning how often they occur together, enabling better retrieval for more abstract queries through what is called semantic search.

Sparse vectors combine the benefits of lexical and semantic search. The process starts with a WordPiece tokenizer to create a limited set of tokens from text. A transformer model then assigns weights to these tokens. During search, the system calculates the dot-product of the weights on the tokens (from the reduced set) from the query with tokens from the target document. You get a blended score from the terms (tokens) whose weights are high for both the query and the target. Sparse vectors encode semantic information, like dense vectors, and supply word-to-word matching through the dot-product, giving you a hybrid lexical-semantic match. For a detailed understanding of sparse and dense vector embeddings, visit Improving document retrieval with sparse semantic encoders in the OpenSearch blog.

Automatic semantic enrichment for Amazon OpenSearch Serverless makes implementing semantic search with sparse vectors effortless. You can now experiment with search relevance improvements and deploy to production with only a few clicks, requiring no long-term commitment or upfront investment. In this post, we show how automatic semantic enrichment removes friction and makes the implementation of semantic search for text data seamless, with step-by-step instructions to enhance your search functionality.

Automatic semantic enrichment

You could already enhance search relevance scoring beyond OpenSearch’s default lexical scoring with the Okapi BM25 algorithm, integrating dense vector and sparse vector models for semantic search using OpenSearch’s connector framework. However, implementing semantic search in OpenSearch Serverless has been complex and costly, requiring model selection, hosting, and integration with an OpenSearch Serverless collection.

Automatic semantic enrichment lets you automatically encode your text fields in your OpenSearch Serverless collections as sparse vectors by just setting the field type. During ingestion, OpenSearch Serverless automatically processes the data through a service-managed machine learning (ML) model, converting text to sparse vectors in native Lucene format.

Automatic semantic enrichment supports both English-only and multilingual options. The multilingual variant supports the following languages: Arabic, Bengali, Chinese, English, Finnish, French, Hindi, Indonesian, Japanese, Korean, Persian, Russian, Spanish, Swahili, and Telugu.

Model details and performance

Automatic semantic enrichment uses a service-managed, pre-trained sparse model that works effectively without requiring custom fine-tuning. The model analyzes the fields you specify, expanding them into sparse vectors based on learned associations from diverse training data. The expanded terms and their significance weights are stored in native Lucene index format for efficient retrieval. We’ve optimized this process using document-only mode, where encoding happens only during data ingestion. Search queries are merely tokenized rather than processed through the sparse model, making the solution both cost-effective and performant.

Our performance validation during feature development used the MS MARCO passage retrieval dataset, featuring passages averaging 334 characters. For relevance scoring, we measured average Normalized discounted cumulative gain (NDCG) for the first 10 search results (ndcg@10) on the BEIR benchmark for English content and average ndcg@10 on MIRACL for multilingual content. We assessed latency through client-side, 90th-percentile (p90) measurements and search response p90 took values. These benchmarks provide baseline performance indicators for both search relevance and response times.

The following table shows the automatic semantic enrichment benchmark.

Language Relevance improvement P90 search latency
English 20.0% over lexical search 7.7% lower latency over lexical search (bm25 is 26 ms, and automatic semantic enrichment is 24 ms)
Multilingual 105.1% over lexical search 38.4% higher latency over lexical search (bm25 is 26 ms, and automatic semantic enrichment is 36 ms)

Given the unique nature of each workload, we encourage you to evaluate this feature in your development environment using your own benchmarking criteria before making implementation decisions.

Pricing

OpenSearch Serverless bills automatic semantic enrichment based on OpenSearch Compute Units (OCUs) consumed during sparse vector generation at indexing time. You’re charged only for actual usage during indexing. You can monitor this consumption using the Amazon CloudWatch metric SemanticSearchOCU. For specific details about model token limits and volume throughput per OCU, visit Amazon OpenSearch Service Pricing.

Prerequisites

Before you create an automatic semantic enrichment index, verify that you’ve been granted the necessary permissions for the task. Contact an account administrator for assistance if required. To work with automatic semantic enrichment in OpenSearch Serverless, you need the account-level AWS Identity and Access Management (IAM) permissions shown in the following policy. The permissions serve the following purposes:

  • The aoss:*Index IAM permissions is used to create and manage indices.
  • The aoss:APIAccessAll IAM permission is used to perform OpenSearch API operations.
{
"Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
              "aoss:CreateIndex",
              "aoss:GetIndex",
              "aoss:APIAccessAll",
            ],
            "Resource": "<ARN of your Serverless Collection>"
        }
    ]
}

You also need an OpenSearch Serverless data access policy to create and manage Indices and associated resources in the collection. For more information, visit Data access control for Amazon OpenSearch Serverless in the OpenSearch Serverless Developer Guide. Use the following policy:

[
    {
        "Description": "Create index permission",
        "Rules": [
            {
                "ResourceType": "index",
                "Resource": ["index/<collection_name>/*"],
                "Permission": [
                  "aoss:CreateIndex", 
                  "aoss:DescribeIndex",                  
"aoss:ReadDocument",
    "aoss:WriteDocument"
                ]
            }
        ],
        "Principal": [
            "arn:aws:iam::<account_id>:role/<role_name>"
        ]
    },
    {
        "Description": "Create pipeline permission",
        "Rules": [
            {
                "ResourceType": "collection",
                "Resource": ["collection/<collection_name>"],
                "Permission": [
                  "aoss:CreateCollectionItems",
                  "aoss:"
                ]
            }
        ],
        "Principal": [
            "arn:aws:iam::<account_id>:role/<role_name>"
        ]
    },
    {
        "Description": "Create model permission",
        "Rules": [
            {
                "ResourceType": "model",
                "Resource": ["model/<collection_name>/*"],
                "Permission": ["aoss:CreateMLResources"]
            }
        ],
        "Principal": [
            "arn:aws:iam::<account_id>:role/<role_name>"
        ]
    },
]

To access private collections, set up the following network policy:

[
   {
      "Description":"Enable automatic semantic enrichment in private collection",
      "Rules":[
         {
            "ResourceType":"collection",
            "Resource":[
               "collection/<collection_name>"
            ]
         }
      ],
      "AllowFromPublic":false,
      "SourceServices":[
         "aoss.amazonaws.com"
      ],
   }
]

Set up an automatic semantic enrichment index

To set up an automatic semantic enrichment index, follow these steps:

  1. To create an automatic semantic enrichment index using the AWS Command Line Interface (AWS CLI), use the create-index command:
aws opensearchserverless create-index \
    --id <collection_id> \
    --index-name <index_name> \
    --index-schema <index_body>
  1. To describe the created index, use the following command:
aws opensearchserverless create-index \
    --id <collection_id> \
    --index-name <index_name> 

You can also use AWS CloudFormation templates (Type: AWS::OpenSearchServerless::CollectionIndex) or the AWS Management Console to create semantic search during collection provisioning as well as after the collection is created.

Example: Index setup for product catalog search

This section shows how to set up a product catalog search index. You’ll implement semantic search on the title_semantic field (using an English model). For the product_id field, you’ll maintain default lexical search functionality.

In the following index-schema, the title_semantic field has a field type set to text and has parameter semantic_enrichment set to status ENABLED. Setting the semantic_enrichment parameter enables automatic semantic enrichment on the title_semantic field. You can use the language_options field to specify either english or multi-lingual. For this post, we generate a nonsemantic title field named title_non_semantic. Use the following code:

aws opensearchserverless create-index \
    --id XXXXXXXXX \
    --index-name 'product-catalog' \
    --index-schema '{
    "mappings": {
        "properties": {
            "product_id": {
                "type": "keyword"
            },
            "title_semantic": {
                "type": "text",
                "semantic_enrichment": {
                    "status": "ENABLED",
                    "language_options": "english"
                }
            },
            "title_non_semantic": {
                "type": "text"
            }
        }
    }
}'

Data ingestion

After the index is created, you can ingest data through standard OpenSearch mechanisms, including client libraries, REST APIs, or directly through OpenSearch Dashboards. Here’s an example of how to add multiple documents using bulk API in OpenSearch Dashboards Dev Tools:

POST _bulk
{"index": {"_index": "product-catalog"}}
{"title_semantic": "Red shoes", "title_non_semantic": "Red shoes", "product_id": "12345" }
{"index": {"_index": "product-catalog"}}
{"title_semantic": "Black shirt", "title_non_semantic": "Black shirt", "product_id": "6789" }
{"index": {"_index": "product-catalog"}}
{"title_semantic": "Blue hat", "title_non_semantic": "Blue hat", "product_id": "0000" }

Search against automatic semantic enrichment index

After the data is ingested, you can query the index:

POST product-catalog/_search?size=1
{
  "query": {
    "match":{
      "title_semantic":{
        "query": "crimson footwear"
      }
    }
  }
}

The following is the response:

{
    "took": 240,
    "timed_out": false,
    "_shards": {
        "total": 0,
        "successful": 0,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 7.6092715,
        "hits": [
            {
                "_index": "product-catalog",
                "_id": "Q61b35YBAkHYIP5jIOWH",
                "_score": 7.6092715,
                "_source": {
                    "title_semantic": "Red shoes",
                    "title_non_semantic": "Red shoes",
                    "title_semantic_embedding": {
                        "feet": 0.85673976,
                        "dress": 0.48490667,
                        "##wear": 0.26745942,
                        "pants": 0.3588211,
                        "hats": 0.30846077,
                        ...
                    },
                    "product_id": "12345"
                }
            }
        ]
    }
}

The search successfully matched the document with Red shoes despite the query using crimson footwear, demonstrating the power of semantic search. The system automatically generated semantic embeddings for the document (truncated here for brevity) which enable these intelligent matches based on meaning rather than exact keywords.

Comparing search results

By running a similar query against the nonsemantic index title_non_semantic, you can confirm that nonsemantic fields can’t search based on context:

GET product-catalog/_search?size=1
{
  "query": {
    "match":{
      "title_non_semantic":{
        "query": "crimson footwear"
      }
    }
  }
}

The following is the search response:

{
    "took": 398,
    "timed_out": ,
    "_shards": {
        "total": 0,
        "successful": 0,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": ,
        "hits": []
    }
}

Limitations of automatic semantic enrichment

Automatic semantic search is most effective when applied to small-to-medium sized fields containing natural language content, such as movie titles, product descriptions, reviews, and summaries. Although semantic search enhances relevance for most use cases, it might not be optimal for certain scenarios:

  • Very long documents – The current sparse model processes only the first 8,192 tokens of each document for English. For multilingual documents, it’s 512 tokens. For lengthy articles, consider implementing document chunking to ensure complete content processing.
  • Log analysis workloads – Semantic enrichment significantly increases index size, which might be unnecessary for log analysis where exact matching typically suffices. The additional semantic context rarely improves log search effectiveness enough to justify the increased storage requirements.

Consider these limitations when deciding whether to implement automatic semantic enrichment for your specific use case.

Conclusion

Automatic semantic enrichment marks a significant advancement in making sophisticated search capabilities accessible to all OpenSearch Serverless users. By eliminating the traditional complexities of implementing semantic search, search developers can now enhance their search functionality with minimal effort and cost. Our feature supports multiple languages and collection types, with a pay-as-you-use pricing model that makes it economically viable for various use cases. Benchmark results are promising, particularly for English language searches, showing both improved relevance and reduced latency. However, although semantic search enhances most scenarios, certain use cases such as processing extremely long articles or log analysis might benefit from alternative approaches.

We encourage you to experiment with this feature and discover how it can optimize your search implementation so you can deliver better search experiences without the overhead of managing ML infrastructure. Check out the video and tech documentation for additional details.


About the Authors

Jon Handler is Director of Solutions Architecture for Search Services at Amazon Web Services, based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have generative AI, search, and log analytics workloads for OpenSearch. Prior to joining AWS, Jon’s career as a software developer included four years of coding a large-scale, eCommerce search engine. Jon holds a Bachelor of the Arts from the University of Pennsylvania, and a Master of Science and a Ph. D. in Computer Science and Artificial Intelligence from Northwestern University.

Arjun Kumar Giri is a Principal Engineer at AWS working on the OpenSearch Project. He primarily works on OpenSearch’s artificial intelligence and machine learning (AI/ML) and semantic search features. He is passionate about AI, ML, and building scalable systems.

Siddhant Gupta is a Senior Product Manager (Technical) at AWS, spearheading AI innovation within the OpenSearch Project from Hyderabad, India. With a deep understanding of artificial intelligence and machine learning, Siddhant architects features that democratize advanced AI capabilities, enabling customers to harness the full potential of AI without requiring extensive technical expertise. His work seamlessly integrates cutting-edge AI technologies into scalable systems, bridging the gap between complex AI models and practical, user-friendly applications.

Create an OpenSearch dashboard with Amazon OpenSearch Service

Post Syndicated from Smita Singh original https://aws.amazon.com/blogs/big-data/create-an-opensearch-dashboard-with-amazon-opensearch-service/

Effective log analysis is essential for maintaining the health and performance of modern applications. Amazon OpenSearch Service stands out as a powerful, fully managed solution for log analytics and observability. With its advanced indexing, full-text search, and real-time analytics capabilities, OpenSearch Service makes it possible for organizations to seamlessly ingest, process, and search log data across diverse sources—including AWS services like Amazon CloudWatch, VPC Flow Logs, and more.

With OpenSearch Dashboards, you can turn indexed log data into actionable visualizations that reveal insights and help detect anomalies. By querying data stored in OpenSearch Service, you can extract relevant information and display it using a variety of visualization types—such as line charts, bar graphs, pie charts, heatmaps, and more. These tools make it effortless to monitor system behavior, spot trends, and quickly identify issues in your environment.

This post demonstrates how to harness OpenSearch Dashboards to analyze logs visually and interactively. With this solution, IT administrators, developers, and DevOps engineers can create custom dashboards to monitor system behavior, detect anomalies early, and troubleshoot issues faster through interactive charts and graphs.

Solution overview

In this post, we show how to create an index pattern in OpenSearch Dashboards, create two types of visualizations, and display these visualizations on a custom dashboard. We also demonstrate how to export and import visualizations.

Prerequisites

Before diving into log analysis with OpenSearch Dashboards, you must have the following:

  • A properly configured OpenSearch Service domain
  • A working log collection and ingestion pipeline

Amazon OpenSearch Service 101: Create your first search application with OpenSearch guides you through setting up your OpenSearch Service domain and configuring the log ingestion pipeline.

For this post, we work with the following log sources, which have already been ingested into an OpenSearch Service cluster as part of the prerequisite steps:

Access OpenSearch Dashboards

Complete the following steps to access OpenSearch Dashboards:

  1. On the OpenSearch Service console, choose Domains in the navigation pane.
  2. Check if your domain status shows as Active.
  3. Choose your domain to open the domain details page.
  4. Choose the OpenSearch Dashboards URL to open it in a new browser window.

  1. Authenticate into OpenSearch Dashboards using one of the supported methods.

Create an index pattern

After you’re logged in to OpenSearch Dashboards, you must create an index pattern. An index pattern allows OpenSearch Dashboards to locate indexes to search. Complete the following steps

  1. In OpenSearch Dashboards, expand the navigation pane and choose Dashboard Management under Management.
  2. Choose Index patterns in the navigation pane.

  1. Choose Create index pattern.
  2. For Index pattern name, enter a name (for example, log-aws-cloudtrail-*).
  3. Choose Next step.

  1. For Time field¸ choose @timestamp.
  2. Choose Create index pattern.

Create visualizations

Now that the index pattern is created, let’s create some visualizations. For this post, we create a pie chart and an area graph.

Create a pie chart

Complete the following steps to create a pie chart:

  1. In OpenSearch Dashboards, choose Visualize in the navigation pane.

  1. Choose Create visualization.

  1. Choose Pie as the visualization type.
  2. For Source¸ choose log-aws-cloudtrail-*.

  1. Under Buckets¸ choose Add and Split slices.

  1. For Aggregation, choose Terms.

  1. For Field, choose eventName.
  2. For Size, enter 10.

  1. Leave all other parameters as default and choose Update.
  2. Choose Save to save the visualization.

Sample ndjson file for the pie chart – EventNamePie.ndjson

Please refer Export and import visualizations for how to import the samples.

The following screenshot shows our pie chart, which displays different types of events and their occurrence percentage in the last 30 minutes.

Create an area graph

Complete the following steps to create an area graph:

  1. In OpenSearch Dashboards, choose Visualize in the navigation pane.
  2. Choose Create visualization.
  3. Choose Area as the visualization type.

  1. For Source¸ choose log-aws-cloudtrail-*.

  1. Under Buckets¸ choose Add and X-axis.

  1. For Aggregation, choose Date Histogram.
  2. For Field, choose @timestamp.
  3. Leave all other parameters as default and choose Update

  1. Under Advanced¸ choose Add and Split series.

  1. For Aggregation, choose Terms.
  2. For Field, choose eventName.
  3. For Size, enter 10.
  4. Leave all other parameters as default and choose Update.

  1. Choose Save.
  2. Update the time range to Last 60 minutes.
  3. Choose Refresh and Save.

The following screenshot shows an area graph with different types of events and their occurrence count in the last 60 minutes.

Sample ndjson file for Area chart – EventNameArea.ndjson

Please refer Export and import visualizations for how to import the samples.

Create a dashboard

Now we will combine the visualizations we just created into a dashboard. A dashboard serves as a customizable interface that consolidates multiple visualizations, saved searches, and various content into a comprehensive view of data. Users can combine diverse visual elements—including charts, graphs, metrics, and tables—into a single cohesive display that can be arranged and resized on a flexible grid layout. You can simultaneously apply filters and time ranges across multiple visualizations, creating a coordinated analytical experience. Complete the following steps to create a dashboard:

  1. In OpenSearch Dashboards, choose Dashboards in the navigation pane.
  2. Choose Create new dashboard.

  1. Choose Add on the menu bar.

  1. Search for and choose the visualizations you created.

You can resize panels by dragging their corners to adjust dimensions. To modify the layout arrangement, you can drag the top portion of panels, which allows you to organize them horizontally in a row formation. When working with tabular visualizations, the system provides a convenient option to export your results in CSV format for further analysis or reporting purposes.

  1. Choose Save.
  2. Change the time range to Last 60 minutes.
  3. Choose Refresh and Save.

Sample ndjson file for dashboard – CloudTrailSummary.ndjson

Please refer Export and import visualizations for how to import the samples.

The following screenshot shows the CloudTrail dashboard displaying both visualizations.

Export and import visualizations

In OpenSearch, an NDJSON file is used to import and export saved objects, such as dashboards, visualizations, maps, and index template. The NDJSON file provides a streamlined approach for handling large datasets by representing each JSON object on a separate line. This format enables efficient import/export operations, simplified data migration between environments, and seamless sharing of complex dashboard configurations. Organizations can back up and restore critical visualizations, saved searches, and dashboard settings while maintaining their integrity. The format’s structure reduces memory overhead during large transfers and improves processing speed for bulk operations. NDJSON’s human-readable nature also facilitates troubleshooting and manual editing when necessary, making it an invaluable tool for maintaining OpenSearch Dashboards deployments across development, testing, and production environments.

Export a visualization

Complete the following steps to export a visualization:

  1. In OpenSearch Dashboards, choose Saved objects in the navigation pane.
  2. Search for and select your object (in this case, a visualization), then choose Export.

The NDJSON file is downloaded in your local host.

Import a visualization

Complete the following steps to import a visualization:

  1. In OpenSearch Dashboards, choose Saved objects in the navigation pane.
  2. Choose Import.
  3. Choose the first NDJSON file to be imported from your local host.
  4. Select Create new objects with random IDs.
  5. Choose Import.

  1. Choose Done.

  1. Choose Import.

You can now open the imported object.

The following screenshot shows our updated dashboard.

Clean up

To clean up your resources, delete the OpenSearch Service domain and relevant information stored or visualizations created on the domain. You will not be able to recover the data after you delete it.

  1. On the OpenSearch Service console, choose Domains in the navigation pane.
  2. Select the domain you created and choose Delete.

Conclusion

OpenSearch Dashboards is a powerful tool for transforming raw log data into actionable visualizations that drive insights and decision-making. In this post, we’ve shown how to create visualizations like pie charts and area graphs, build comprehensive dashboards, and efficiently export and import your work using NDJSON files. By using the fully managed OpenSearch Service features, organizations can focus on extracting valuable insights rather than managing infrastructure, ultimately enhancing their observability posture and operational efficiency.

To further enhance your OpenSearch proficiency, consider exploring advanced visualization options such as heat maps, gauge charts, and geographic maps that can represent your data in more specialized ways. Implementing automated alerting based on predefined thresholds will help you proactively identify anomalies before they become critical issues. You can also use OpenSearch’s powerful machine learning capabilities for sophisticated anomaly detection and predictive analytics to gain deeper insights from your log data. As your implementation grows, customizing security settings with fine-grained access controls will provide appropriate data visibility across different teams in your organization.

For comprehensive learning resources, refer to the Amazon OpenSearch Service Developer Guide, watch Create your first OpenSearch Dashboard on YouTube, explore best practices in Amazon OpenSearch blog posts, and gain hands-on experience through workshops available in AWS Workshops.


About the Authors

Smita Singh is a Senior Solutions Architect at AWS. She focuses on defining technical strategic vision and works on architecture, design, and implementation of modern, scalable platforms for large-scale global enterprises and SaaS providers. She is a data, analytics, and generative AI enthusiast and is passionate about building innovative, highly scalable, resilient, fault-tolerant, self-healing, multi-tenant platform solutions and accelerators.

Dipayan Sarkar is a Specialist Solutions Architect for Analytics at AWS, where he helps customers modernize their data platform using AWS analytics services. He works with customers to design and build analytics solutions, enabling businesses to make data-driven decisions.

Build a multi-tenant healthcare system with Amazon OpenSearch Service

Post Syndicated from Ezat Karimi original https://aws.amazon.com/blogs/big-data/build-a-multi-tenant-healthcare-system-with-amazon-opensearch-service/

Healthcare systems face significant challenges managing vast amounts of data while maintaining regulatory compliance, security, and performance. This post explores strategies for implementing a multi-tenant healthcare system using Amazon OpenSearch Service.

In this context, tenants are distinct healthcare entities, sharing a common platform while maintaining isolated data environments. Hospital departments (like emergency, radiology, or patient care), clinics, insurance providers, laboratories, and research institutions are examples of these tenants.

In this post, we address common multi-tenancy challenges and provide actionable solutions for security, tenant isolation, workload management, and cost optimization across diverse healthcare tenants.

Understanding multi-tenant healthcare systems

Tenants in healthcare systems are diverse and have distinct requirements. For example, emergency departments need round-the-clock high availability with subsecond response times for patient care, along with strict access controls for sensitive trauma data. Research departments run complex, resource-intensive queries that are less time-sensitive but require robust anonymization protocols to maintain HIPAA compliance when working with patient data. Outpatient clinics operate during business hours with predictable usage patterns and moderate performance requirements. Administrative systems focus on financial data with scheduled batch processing and require access to billing information and insurance details only. Specialty departments like radiology and cardiology have unique requirements specific to the tasks they perform. For example, radiology requires high storage capacity and bandwidth for large medical imaging files, along with specialized indexing for metadata searches.

Understanding tenant requirements is essential for designing an effective multi-tenant architecture that balances resource sharing with appropriate isolation while maintaining regulatory compliance.

Isolation models

OpenSearch’s hierarchical structure consists of four main levels. At the top level is the domain, which contains one or more nodes that store and search data. Within the domain, indexes contain documents and define how they are stored and searched. Documents are individual records or data entries stored within an index, and each document consists of fields, which are individual data elements with specific data types and values.

Indexes include mappings and settings. Mappings define the schema of documents within an index, specifying field names and their data types. Settings configure various operational aspects of an index, such as the number of primary shards and replica shards.

The isolation model in a multi-tenant OpenSearch system can be at domain, index, or document level. The model you select for your multi-tenant healthcare system impacts security, performance, and cost. For healthcare organizations, as depicted in the following diagram, a hybrid approach typically works best, matching isolation levels to tenant requirements.

Multi-Tenancy Isolation Models

Multi-Tenancy Isolation Models

For emergency units, consider domain-based isolation, providing maximum separation by deploying separate OpenSearch domains for each tenant. Although it’s more expensive, it reduces resource contention and provides consistent performance for critical systems. This isolation simplifies compliance by physically separating sensitive patient data.

Similarly, for clinical research tenants, consider domain-based isolation despite its higher cost. Given the resource-intensive nature of research workloads—particularly genomics and population health analytics that process terabytes of data with complex algorithms—separate domains prevent these demanding operations from impacting other tenants.

For specialty departments like cardiology or radiology, where workload patterns are similar but data access patterns are distinct, index-based isolation is a good fit. These departments share a domain but maintain separate indexes. This approach provides strong logical separation while allowing more efficient resource utilization.

For administrative departments where data is less sensitive, a document-based isolation is sufficient, and multiple tenants can share the same indexes.

Data modeling

Effective data modeling is crucial for maintaining performance and manageability in a multi-tenant healthcare system. Implement a consistent index naming convention that incorporates tenant identifiers, data categories, and time periods like {tenant-id}-{data-type}-{time-period}. Tenant-id identifies the entity, for example, cardiology. Examples of the indexes are cardiology-ecg-202505 or radiology-mri-202505. This structured approach simplifies data management, access control, and lifecycle policies.

Consider data access patterns when designing your index strategy. For example, for time-series data like vital signs or telemetry readings, time-based indexes with appropriate rotation policies will improve performance and simplify data lifecycle management.

For shared indexes using document-based isolation, make sure tenant identifiers are consistently applied and indexed for efficient tenant-based filtering.

Tenant management

Effective tenant management prevents resource contention and provides consistent performance across your healthcare system. Implement a hybrid isolation model using a tenant tiering framework based on criticality. The following table outlines the tiering framework.

Tier Tenant Type SLA Resources Operational Limits Behavior
Tier-1 Critical

Emergency departments

ICU/Critical care

Operating rooms

24/7 SLA 99.99%

Sub-second response

RPO: Near zero

RTO: Less than 15 minutes

Guaranteed 50% CPU, 50% memory

Dedicated hot nodes

2 replicas minimum

100 concurrent requests

20 MB request size

30-second timeout

No throttling

Priority query routing

Preemptive scaling

Automatic failover

Tier-2 Urgent

Inpatient units

Specialty departments

Radiology/imaging

24/7 SLA with 99.9% availability

Less than 2-second response time

RPO: Less than 15 minutes

RTO: Less than 1 hour

Guaranteed 30% CPU, 30% memory

Shared hot nodes

1–2 replicas

50 concurrent requests

15 MB request size 60-second timeout

Limited throttling during peak

High-priority query routing

Automatic scaling

Automated recovery

Tier-3 Standard

Outpatient clinics

Primary care

Pharmacy

Laboratory

Business hours SLA (8 AM – 8 PM)

99.5% availability Less than 5-second response time

RPO: Less than 1 hour

RTO: Less than 4 hours

Guaranteed 15% CPU, 15% memory

Shared nodes

1 replica

25 concurrent requests

10 MB request size

120-second timeout

Moderate throttling

Standard query routing

Fair thread allocation

Manual scaling

Business hours optimization

Tier-4 Research

Clinical research

Genomics

Population health

Best-effort

SLA, up to 99% availability

Less than 30-second response time

RPO: Less than 24 hours

RTO:  Less than 24 hours

Guaranteed 5% CPU, 10% memory

Burst capacity during off-hours

0–1 replicas

10 concurrent requests

50 MB request size

300-second timeout

Aggressive throttling during pea

Compute optimized instances

Large heap size

Research-specific plugins

Tier-5 Admin

Billing/finance

HR systems

Inventory management

Business hours SLA (9 AM – 5 PM) 99% availability Less than 10-second response time

RPO: Less than 24 hours

RTO: Less than 48 hours

No guaranteed resources

Burstable capacity

UltraWarm for historical

1 replica

5 concurrent requests

5 MB request size

180-second timeout

Aggressive throttling

Lowest priority query routing

Batch processing preferred

Off-hours scheduling

Cost-optimized storage

Workload management

When you use OpenSearch Service for multi-tenancy, you must balance your tenants’ workloads to make sure you deliver the resources needed for each to ingest, store, and query their data effectively. A multi-layered workload management framework with a rule-based proxy and OpenSearch Service workload management can effectively address these challenges. For details, see this blog post: Workload management in OpenSearch-based multi-tenant centralized logging platforms.

Security framework

Healthcare data requires protection due to its sensitive nature and regulatory requirements. The OpenSearch Service security framework is specifically adaptable to healthcare’s strict security requirements. This framework combines multiple layers of access control, captured in the following diagram.

Multi-tenancy fine-grained access control in Amazon OpenSearch Service

Multi-tenancy fine-grained access control in Amazon OpenSearch Service

An important step in this framework is role mapping, where AWS Identity and Access Management (IAM) roles are mapped to OpenSearch roles for role-based access control (RBAC). For example, emergency departments can implement the ED-Physician role with access to patient history across departments, and the ED-Staff role with access to vital sign and medication data. You can map emergency department roles to OpenSearch roles.

With document-level security (DLS), you can limit emergency department staff to active emergency patients only while restricting access to discharged patient data only to the providers who treat them. With field-level security (FLS), you can allow access to medical fields while masking billing and insurance data. You can also provide attribute-based access control (ABAC) policies to allow access based on patient status.

For research departments, you can create Clinical-Researcher roles with read-only access to datasets. Integrate academic roles to research roles to make sure researchers only access data for studies they’re authorized to conduct. For DLS, implement filters to make sure researchers only access approved documents. Use FLS to anonymize HIPAA identifiers. For research departments, ABAC should evaluate the study phase and researcher’s location.

For outpatient care, you can define Medical-Provider roles with full access to assigned patients’ records and Medical-Assistant roles limited to documenting vitals and preliminary information. For DLS, limit access to patient’s physicians only. For FLS, restrict access to medical data only, while limiting nurses to demographic, vital signs, and medication fields. Implement time-aware ABAC policies that restrict access to patient records outside of business hours unless the provider is on-call.

For administrative departments, you can implement Financial roles with access to charge codes and insurance information but no clinical data. For DLS, make sure financial staff only access billing documents. FLS provides access to billing codes, dates of service, and insurance fields while masking clinical content.

For specialty departments, you can create technician roles like Radiologist and apply DLS filters restricting access to the data to these roles and referring physician. FLS allows technicians to see clinical history and previous findings specific to their specialty.

Enable comprehensive audit logging to track access to protected health information. Configure these logs to capture user identity, accessed data, timestamp, and access context. These audit trails are essential for regulatory compliance and security investigations.

Managing data lifecycle for compliance

Index State Management (ISM) capabilities combined with OpenSearch Service storage tiering enable an elaborate approach to data lifecycle management that can be tailored to diverse tenant needs. ISM provides a robust way to automate the lifecycle of indexes by defining policies that dictate transitions between Hot, UltraWarm, and Cold storage tiers based on criteria like index age or size. This automation can extend to the archive tier by creating snapshots, which are stored in Amazon Simple Storage Service (Amazon S3) and can be further transitioned to Amazon S3 Glacier or Glacier Deep Archive for long-term, cost-effective archiving of data that is rarely accessed.

Frame your ISM policy along the following guidelines:

Keep critical patient data in hot storage for 180 days to support immediate access. Transition to warm storage for the next 12 months, then move to cold storage for years 2–7. After 7 years, archive records.

For research data benefits, use project-based lifecycle policies rather than strictly time-based transitions. Maintain research datasets in hot storage during active project phases, regardless of data age. When projects conclude, transition data to warm storage for 12 months. Move to cold storage for the following 5–10 years based on research significance. Afterward, archive records.

For outpatient clinic data, keep recent patient records in hot storage for 90 days, aligning index rollover with typical follow-up windows. Transition to warm storage for months 4–18, coinciding with common annual visit patterns. Move to cold storage for years 2–7. Archive after 7 years.

For administrative data, maintain current fiscal year data in hot storage with automated transitions at year-end boundaries. Move previous fiscal year data to warm storage for 18 months to support auditing and reporting. Transition to cold storage for years 3–7. Archive financial records after 7 years.

For the specialty department data, keep recent metadata in hot storage for 90 days while moving large files, like images, to warm storage after 30 days. Transition complete records to cold storage after 18 months. Archive after 7 years.

Cost management and optimization

Healthcare organizations must balance performance requirements with budget constraints. Effective cost management strategies are essential for sustainable operations.

Implement comprehensive tagging strategies that mirror your index naming conventions to create a unified approach to resource management and cost tracking. Like the index naming convention, design your tags to identify the tenant, application, and data type (for example, “tenant=cardiology” or “application=ecg“). These tags, combined with AWS Cost Explorer, provide visibility into expenses across organizational boundaries.

Develop cost allocation mechanisms that fairly distribute expenses across different tenants. Consider implementing tiered pricing structures based on data volume, query complexity, and service-level guarantees. This approach aligns costs with value and encourages efficient resource utilization.

Optimize your infrastructure based on tenant-specific metrics and usage patterns. Monitor document counts, indexing rates, and query patterns to right-size your clusters and node types. Use different instance types for different workloads—for example, use compute-optimized instances for query-intensive applications.

Use OpenSearch Service storage tiering to optimize costs. UltraWarm provides significant cost savings for infrequently accessed data while maintaining reasonable query performance. Cold storage offers even greater savings for data that’s rarely accessed but must be retained for compliance purposes.

Conclusion

Building a multi-tenant healthcare system on OpenSearch Service requires careful planning and implementation. By addressing tenant isolation, security, data lifecycle management, workload control, and cost optimization, you can create a platform that delivers improved operational efficiency while maintaining strict compliance with healthcare regulations.


About the Authors

Ezat Karimi is a Senior Solutions Architect at AWS, based in Austin, TX. Ezat specializes in designing and delivering modernization solutions and strategies for database applications. Working closely with multiple AWS teams, Ezat helps customers migrate their database workloads to the AWS Cloud.

Jon Handler is a Senior Principal Solutions Architect at Amazon Web Services based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have vector, search, and log analytics workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included 4 years of coding a large-scale, ecommerce search engine. Jon holds a Bachelor’s of the Arts from the University of Pennsylvania, and a Master’s of Science and a PhD in Computer Science and Artificial Intelligence from Northwestern University.

Develop and deploy a generative AI application using Amazon SageMaker Unified Studio

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/develop-and-deploy-a-generative-ai-application-using-amazon-sagemaker-unified-studio/

Picture this: You’re a financial analyst starting your Monday morning with a steaming cup of coffee, ready to review your investment portfolio. But instead of manually scouring dozens of news websites, financial reports, and industry analyses, you simply ask your AI assistant: “What global events happened over the weekend that might impact my technology stock holdings?” Within seconds, you receive a comprehensive analysis of relevant news, sentiment scores, and potential investment implications—all powered by a sophisticated generative AI application you built yourself.

This scenario isn’t science fiction; it’s the reality that modern financial professionals can create today. In an era where information moves at the speed of light and industry conditions can shift dramatically overnight, staying informed isn’t just an advantage—it’s essential for survival in competitive financial landscapes. The challenge lies in processing the overwhelming volume of global information that could impact investments while distinguishing reliable insights from noise.

Amazon SageMaker – Develop and scale AI use cases with the broadest set of tools

Luckily for us, technology is making this more straightforward. The next generation of Amazon SageMaker with Amazon SageMaker Unified Studio is a single data and AI development environment where you can find and access the data in your organization and act on it using the best tools across different use cases. SageMaker Unified Studio brings together the functionality and tools from existing AWS analytics and artificial intelligence and machine learning (AI/ML) services, including Amazon EMR , AWS Glue, Amazon Athena, Amazon Redshift , Amazon Bedrock, and Amazon SageMaker AI. From within SageMaker Unified Studio, you can find, access, and query data and AI assets across your organization, then work together in projects to securely build and share analytics and AI artifacts, including data, models, and generative AI applications.

With SageMaker Unified Studio, you can efficiently build generative AI applications in a trusted and secure environment using Amazon Bedrock. You can choose from a selection of high-performing foundation models (FMs) and advanced customization capabilities like Amazon Bedrock Knowledge Bases, Amazon Bedrock Guardrails, Amazon Bedrock Agents, and Amazon Bedrock Flows. You can rapidly tailor and deploy generative AI applications and share with the built-in catalog for discovery.

What makes SageMaker Unified Studio particularly powerful for organizations is its integration with Amazon Bedrock Flows to build generative AI workflows, which is changing how organizations think about AI application development.

Amazon Bedrock Flows for generative AI application development

With Amazon Bedrock Flows, you can build and execute complex generative AI workflows without writing code, using an intuitive visual interface that democratizes AI development. This capability is transformative for organizations where speed, accuracy, and adaptability are paramount. It offers the following benefits:

  • Visual workflow development – Users can design AI applications by dragging and dropping components onto a canvas, making AI logic transparent and modifiable
  • Business logic flexibility – The service supports complex business logic through conditional branching, multi-path decision trees, and dynamic routing
  • Democratizing AI development – Business experts can directly contribute to AI application development without requiring extensive technical expertise
  • Seamless integration – Amazon Bedrock Flows integrates with FMs, knowledge bases, guardrails, and other AWS services
  • Reduced development complexity – The service handles infrastructure management and scaling through serverless execution and SDK APIs

Solution overview

In this post, we explore a financial use case, in which we want to stay on top of latest global events and determine our investment or financial exposure based on this. We can use a SageMaker Unified Studio flow application to pull in latest news summaries, derive sentiment based on news summary, and determine their effects on my investments. The following diagram illustrates this use case.

In the following sections, we show how to create a new project and build a flow application using a generative AI profile in SageMaker Unified Studio.

Prerequisites

For this walkthrough, you must have the following prerequisites:

  • A demo project – Create a demo project in your SageMaker Unified Studio domain. For instructions, see Create a project. For this example, we choose All capabilities in the project profile section, which includes the generative AI project profile enabled.

Create new project and build a flow application in SageMaker Unified Studio

In this section, we create a new a flow application that uses an Amazon Bedrock knowledge base to provide information about your personal portfolio. Complete the following steps:

  1. In SageMaker Unified Studio, open the project you created as a prerequisite and choose Build and then Flow.

  1. Drag Knowledge Base from Nodes to the design panel to add a knowledge base that will include the user’s investment portfolio and news articles and other information like earnings call transcripts, financial analyst reports, and so on.

  1. Choose the Knowledge Base node and configure the knowledge base as follows:
  2. Add a name for your knowledge base name (for example, portfolio…).
  3. Choose the model (for example, Claude 3.5 Haiku).

  1. Choose Create new Knowledge Base.
  2. Enter a name for the knowledge base.
  3. Select Project data source.
  4. For Select a data source, choose the Amazon Simple Storage Service (Amazon S3) bucket location where you uploaded your data.
  5. Choose Create.

The knowledge base creation process takes a few minutes to complete.

  1. When the knowledge base is ready, choose Save to save it to the flow.

  1. Choose My components, and on the options menu (three vertical dots), choose Sync to sync the knowledge base.

Make sure the S3 bucket has all the data (user portfolio data and latest news information data) before syncing the knowledge base.

We don’t provide any financial or news information data as part of this post. Upload current events or news data and investment portfolio data from your own data sources.

Test the flow application

After the knowledge base sync is complete, you can return to the flow application and ask questions. Using SageMaker Unified Studio flows, a financial analyst can provide a more personalized and customized financial outlook to their customers using rich internal financial information on their customer’s investment portfolio and latest publicly available current events and news information. The following are some example questions that you can ask to test the knowledge base:

Check if Tesla or Apple is in any of user's investment portfolio

Please check latest news information to provide information if Tesla has positive, negative or neutral outlook in the near future

Flow-based applications offer a visual approach to creating complex AI workflows. By chaining different nodes, each optimized for specific functions, you can create sophisticated solutions that are more reliable, maintainable, and efficient than single-prompt approaches. These flows allow for conditional logic and branching paths, mimicking human decision-making processes and enabling more nuanced responses based on context and intermediate results.

Clean up

To avoid ongoing charges in your AWS account, delete the resources you created during this tutorial:

  1. Delete the project.
  2. Delete the domain created as part of the prerequisites.

Conclusion

In this post, we demonstrated how to use Amazon Bedrock Flows in SageMaker Unified Studio to build a sophisticated generative AI application for financial analysis and investment decision-making without extensive coding knowledge. With this integration, you can create sophisticated financial analysis workflows through an intuitive visual interface, where you can process industry data, analyze news sentiment, and assess investment implications in real time. The solution integrates seamlessly with AWS services and FMs while providing essential features like automatic scaling, compliance controls, and audit capabilities. The implementation process involves setting up a SageMaker Unified Studio domain, configuring knowledge bases with portfolio and news data, and creating visual workflows that can analyze complex financial information. This democratized approach to AI development allows both technical and business teams to collaborate effectively, significantly reducing development time while maintaining the sophisticated capabilities needed for modern financial analysis.

To get started, explore the SageMaker Unified Studio documentation, set up a project in your AWS environment, and discover how this solution can transform your organization’s data analytics capabilities.


About the authors

Amit Maindola is a Senior Data Architect focused on data engineering, analytics, and AI/ML at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Arghya Banerjee is a Sr. Solutions Architect at AWS in the San Francisco Bay Area, focused on helping customers adopt and use the AWS Cloud. He is focused on big data, data lakes, streaming and batch analytics services, and generative AI technologies.

Melody Yang is a Principal Analytics Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Gaurav Parekh is a Solutions Architect at AWS, specializing in generative AI and data analytics, with extensive experience building production AI systems on AWS.

Optimize traffic costs of Amazon MSK consumers on Amazon EKS with rack awareness

Post Syndicated from Austin Groeneveld original https://aws.amazon.com/blogs/big-data/optimize-traffic-costs-of-amazon-msk-consumers-on-amazon-eks-with-rack-awareness/

Are you incurring significant cross Availability Zone traffic costs when running an Apache Kafka client in containerized environments on Amazon Elastic Kubernetes Service (Amazon EKS) that consume data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) topics?

If you’re not familiar with Apache Kafka’s rack awareness feature, we strongly recommend starting with the blog post on how to Reduce network traffic costs of your Amazon MSK consumers with rack awareness for an in-depth explanation of the feature and how Amazon MSK supports it.

Although the solution described in that post uses an Amazon Elastic Compute Cloud (Amazon EC2) instance deployed in a single Availability Zone to consume messages from an Amazon MSK topic, modern cloud-native architectures demand more dynamic and scalable approaches. Amazon EKS has emerged as a leading platform for deploying and managing distributed applications. The dynamic nature of Kubernetes introduces unique implementation challenges compared to static client deployments. In this post, we walk you through a solution for implementing rack awareness in consumer applications that are dynamically deployed across multiple Availability Zones using Amazon EKS.

Here’s a quick recap of some key Apache Kafka terminology from the referenced blog. An Apache Kafka client consumer will register to read against a topic. A topic is the logical data structure that Apache Kafka organizes data into. A topic is segmented into a single or many partitions. Partitions are the unit of parallelism in Apache Kafka. Amazon MSK provides high availability by replicating each partition of a topic across brokers in different Availability Zones. Because there are replicas of each partition that reside across the different brokers that make up your MSK cluster, Amazon MSK also tracks whether a replica partition is in sync with the most recent data for that partition. This means there is one partition that Amazon MSK recognizes as containing the most up-to-date data, and this is known as the leader partition. The collection of replicated partitions is called in-sync replicas. This list of in-sync replicas is used internally when the cluster needs to elect a new leader partition if the current leader were to become unavailable.

When consumer applications read from a topic, the Apache Kafka protocol facilitates a network exchange to determine which broker currently has the leader partition that the consumer needs to read from. This means that the consumer could be told to read from a broker in a different Availability Zone than itself, leading to cross-zone traffic charge in your AWS account. To help optimize this cost, Amazon MSK supports the rack awareness feature, using which clients can ask an Amazon MSK cluster to provide a replica partition to read from, within the same Availability Zone as the client, even if it isn’t the current leader partition. The cluster accomplishes this by checking for an in-sync replica on a broker within the same Availability Zone as the consumer.

The challenge with Kafka clients on Amazon EKS

In Amazon EKS, the underlying units of computes are EC2 instances that are abstracted as Kubernetes nodes. The nodes are organized into node groups for ease of management, scaling, and grouping of applications on certain EC2 instance types. As a best practice for resilience, the nodes in a node group are spread across multiple Availability Zones. Amazon EKS uses the underlying Amazon EC2 metadata about the Availability Zone that it’s located in, and it injects that information into the node’s metadata during node configuration. In particular, the Availability Zone (AZ ID) is injected into the node metadata.

When an application is deployed in a Kubernetes Pod on Amazon EKS, it goes through a process of binding to a node that meets the pod’s requirements. As shown in the following diagram, when you deploy client applications on Amazon EKS, the pod for the application can be bound to a node with available capacity in any Availability Zone. Also, the pod doesn’t automatically inherit the Availability Zone information from the node that it’s bound to, a piece of information necessary for rack awareness. The following architecture diagram illustrates Kafka consumers running on Amazon EKS without rack awareness.

AWS Cloud architecture showing MSK brokers, EKS pods, and EC2 instances in three Availability Zones

To set the client configuration for rack awareness, the pod needs to know what Availability Zone it’s located in, dynamically, as it is bound to a node. During its lifecycle, the same pod can be evicted from the node it was bound to previously and moved to a node in a different Availability Zone, if the matching criteria permit that. Making the pod aware of its Availability Zone dynamically sets the rack awareness parameter client.rack during the initialization of the application container that is encapsulated in the pod.

After rack awareness is enabled on the MSK cluster, what happens if the broker in the same Availability Zone as the client (hosted on Amazon EKS or elsewhere) becomes unavailable? The Apache Kafka protocol is designed to support a distributed data storage system. Assuming customers follow the best practice of implementing a replication factor > 1, Apache Kafka can dynamically reroute the consumer client to the next available in-sync replica on a different broker. This resilience remains consistent even after implementing nearest replica fetching, or rack awareness. Enabling rack awareness optimizes the networking exchange to prefer a partition within the same Availability Zone, but it doesn’t compromise the consumer’s ability to operate if the nearest replica is unavailable.

In this post, we walk you through an example of how to use the Kubernetes metadata label, topology.k8s.aws/zone-id, assigned to each node by Amazon EKS, and use an open source policy engine, Kyverno, to deploy a policy that mutates the pods that are in the binding state to dynamically inject the node’s AZ ID into the pod’s metadata as an annotation, as depicted in the following diagram. This annotation, in turn, is used by the container to create an environment variable that is assigned the pod’s annotated AZ ID information. The environment variable is then used in the container postStart lifecycle hook to generate the Kafka client configuration file with rack awareness setting. The following architecture diagram illustrates Kafka consumers running on Amazon EKS with rack awareness.

AWS architecture with MSK, EKS, Kyverno, and EC2 across three Availability Zones, detailing topology

Solution Walkthrough

Prerequisites

For this walkthrough, we use AWS CloudShell to run the scripts that are provided inline as you progress. For a smooth experience, before getting started, make sure to have kubectl and eksctl installed and configured in the AWS CloudShell environment, following the installation instructions for Linux (amd64). Helm is also required to be install on AWS CloudShell, using the instructions for Linux.

Also, check if the envsubst tool is installed in your CloudShell environment by invoking:

which envsubst

If the tool isn’t installed, you can install it using the command:

sudo dnf -y install gettext-devel

We also assume you already have an MSK cluster deployed in an Amazon Virtual Private Cloud (VPC) in three Availability Zones with the name MSK-AZ-Aware. In this walkthrough, we use AWS Identity and Access Management (IAM) authentication for client access control to the MSK cluster. If you’re using a cluster in your account with a different name, replace the instances of MSK-AZ-Aware in the instructions.

We follow the same MSK cluster configuration mentioned in the Rack Awareness blog mentioned previously, with some modifications. (Ensure you’ve set replica.selector.class = org.apache.kafka.common.replica.RackAwareReplicaSelector for the reasons discussed there). In our configuration, we add one line: num.partitions = 6. Although not mandatory, this ensures that topics that are automatically created will have multiple partitions to support clearer demonstrations in subsequent sections.

Finally, we use the Amazon MSK Data Generator with the following configuration:

{
"name": "msk-data-generator",
    "config": {
    "connector.class": "com.amazonaws.mskdatagen.GeneratorSourceConnector",
    "genkp.MSK-AZ-Aware-Topic.with": "#{Internet.uuid}",
    "genv.MSK-AZ-Aware-Topic.product_id.with": "#{number.number_between '101','200'}",
    "genv.MSK-AZ-Aware-Topic.quantity.with": "#{number.number_between '1','5'}",
    "genv.MSK-AZ-Aware-Topic.customer_id.with": "#{number.number_between '1','5000'}"
    }
}

Running the MSK Data Generator with this configuration will automatically create a six-partition topic named MSK-AZ-Aware-Topic on our cluster for us, and it will push data to that topic. To follow along with the walkthrough, we recommend and assume that you deploy the MSK Data Generator to create the topic and populate it with simulated data.

Create the EKS cluster

The first step is to install an EKS cluster in the same Amazon VPC subnets as the MSK cluster. You can modify the name of the MSK cluster by changing that environment variable MSK_CLUSTER_NAME if your cluster is created with a different name than suggested. You can also change the Amazon EKS cluster name by changing EKS_CLUSTER_NAME.

The environment variables that we define here are used throughout the walkthrough.

The last step is to update the kubeconfig with an entry for the EKS cluster:

AWS_ACCOUNT=$(aws sts get-caller-identity --output text --query Account)
export AWS_ACCOUNT
export AWS_REGION=${AWS_DEFAULT_REGION}
export MSK_CLUSTER_NAME=MSK-AZ-Aware
export EKS_CLUSTER_NAME=EKS-AZ-Aware
export EKS_CLUSTER_SIZE=3
export K8S_VERSION=1.32
export POD_ID_VERSION=1.3.5
 
MSK_BROKER_SG=$(aws kafka list-clusters \
  --query  'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].BrokerNodeGroupInfo.SecurityGroups'  \
  --output text | xargs)
export MSK_BROKER_SG

MSK_BROKER_CLIENT_SUBNETS=$(aws kafka list-clusters \
  --query  'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].BrokerNodeGroupInfo.ClientSubnets'  \
  --output text | xargs)
export MSK_BROKER_CLIENT_SUBNETS
 
VPC_ID=$(aws ec2 describe-subnets \
  --subnet-ids "$(echo "${MSK_BROKER_CLIENT_SUBNETS}" | cut -d' ' -f1)" \
  --query 'Subnets[0].VpcId' \
  --output text)
export VPC_ID

EKS_SUBNETS=$(echo ${MSK_BROKER_CLIENT_SUBNETS} | sed 's/ \+/,/g')
export EKS_SUBNETS

# Create a minimal config file for encrypted node volumes
cat > eks-config.yaml << EOF
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: ${EKS_CLUSTER_NAME}
  region: ${AWS_REGION}
  version: "${K8S_VERSION}"
vpc:
  id: "${VPC_ID}"
  subnets:
    public:
$(for subnet in ${MSK_BROKER_CLIENT_SUBNETS}; do
  AZ=$(aws ec2 describe-subnets --subnet-ids "$subnet" --query 'Subnets[0].AvailabilityZone' --output text)
  echo "      $AZ: { id: $subnet }"
done)
nodeGroups:
  - name: ng1
    instanceType: m5.xlarge
    desiredCapacity: ${EKS_CLUSTER_SIZE}
    minSize: ${EKS_CLUSTER_SIZE}
    maxSize: ${EKS_CLUSTER_SIZE}
    securityGroups:
      attachIDs: ["${MSK_BROKER_SG}"]
    volumeSize: 100
    volumeType: gp3
    volumeEncrypted: true
EOF

eksctl create cluster -f eks-config.yaml

aws eks update-kubeconfig \
  --region "${AWS_REGION}" \
  --name ${EKS_CLUSTER_NAME}

Next, you need to create an IAM policy, MSK-AZ-Aware-Policy, to allow access from the Amazon EKS pods to the MSK cluster. Note here that we’re using MSK-AZ-Aware as the cluster name.

Create a file, msk-az-aware-policy.json, with the IAM policy template:

cat > msk-az-aware-policy.json << EOF
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:DescribeClusterDynamicConfiguration",
                "kafka-cluster:AlterClusterDynamicConfiguration"
            ],
            "Resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:cluster/${MSK_CLUSTER_NAME}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:topic/${MSK_CLUSTER_NAME}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:group/${MSK_CLUSTER_NAME}/*"
            ]
        }
    ]
}
EOF

To create the IAM policy, use the following command. It first replaces the placeholders in the policy file with values from relevant environment variables, and then creates the IAM policy:

envsubst < msk-az-aware-policy.json | \
xargs -0 -I {} aws iam create-policy \
            --policy-name MSK-AZ-Aware-Policy \
            --policy-document {}

Configure EKS Pod Identity

Amazon EKS Pod Identity offers a simplified experience for obtaining IAM permissions for pods on Amazon EKS. This requires installing an add-on Amazon EKS Pod Identity Agent to the EKS cluster:

eksctl create addon \
  --cluster ${EKS_CLUSTER_NAME} \
  --name eks-pod-identity-agent \
  --version ${POD_ID_VERSION}

Confirm that the add-on has been installed and its status is ACTIVE and that the status of all the pods associated with the add-on is Running.

eksctl get addon \
  --cluster ${EKS_CLUSTER_NAME} \
  --region "${AWS_REGION}" \
  --name eks-pod-identity-agent -o json

kubectl get pods \
  -n kube-system \
  -l app.kubernetes.io/instance=eks-pod-identity-agent

After you’ve installed the add-on, you need to create a pod identity association between a Kubernetes service account and the IAM policy created earlier:

eksctl create podidentityassociation \
  --namespace kafka-ns \
  --service-account-name kafka-sa \
  --role-name EKS-AZ-Aware-Role \
  --permission-policy-arns arn:aws:iam::"${AWS_ACCOUNT}":policy/MSK-AZ-Aware-Policy \
  --cluster ${EKS_CLUSTER_NAME} \
  --region "${AWS_REGION}"

Install Kyverno

Kyverno is an open source policy engine for Kubernetes that allows for validation, mutation, and generation of Kubernetes resources using policies written in YAML, thus simplifying the enforcement of security and compliance requirements. You need to install Kyverno to dynamically inject metadata into the Amazon EKS pods as they enter the binding state to inform them of Availability Zone ID.

In AWS CloudShell, create a file named kyverno-values.yaml. This file defines the Kubernetes RBAC permissions for Kyverno’s Admission Controller to read Amazon EKS node metadata because the default Kyverno (v. 1.13 onwards) settings don’t allow this:

cat > kyverno-values.yaml << EOF
admissionController:
  rbac:
    clusterRole:
      extraResources:
        - apiGroups:
            - ""
          resources:
            - "nodes"
          verbs:
            - get
            - list
            - watch
EOF

After this file is created, you can install Kyverno using helm and providing the values file created in the previous step:

helm repo add kyverno https://kyverno.github.io/kyverno/
helm repo update

helm install kyverno kyverno/kyverno \
  -n kyverno \
  --create-namespace \
  --version 3.3.7 \
  -f kyverno-values.yaml

Starting with Kyverno v 1.13, the Admission Controller is configured to ignore the AdmissionReview requests for pods in binding state. This needs to be changed by editing the Kyverno ConfigMap:

kubectl -n kyverno edit configmap kyverno

The kubectl edit command uses the default editor configured in your environment (in our case Linux VIM).

This will open the ConfigMap in a text editor.

As highlighted in the following screenshot, [Pod/binding,*,*] should be removed from the resourceFilters field for the Kyverno Admission Controller to process AdmissionReview requests for pods in binding state.

Kubernetes YAML configuration detailing Kyverno policy resource filters and cluster roles

If Linux VIM is your default editor, you can delete the entry using VIM command 18x, meaning delete (or cut) 18 characters from the current cursor position. Save the modified configuration using the VIM command :wq, meaning write (or save) the file and quit.

After deleting, the resourceFilters field should look similar to the following screenshot.

Kubernetes YAML configuration with ReplicaSet resource filter highlighted for Kyverno policy management

If you have a different editor configured in your environment, follow the appropriate steps to achieve a similar outcome.

Configure Kyverno policy

You need to configure the policy that will make the pods rack aware. This policy is adapted from the suggested approach in the Kyverno blog post, Assigning Node Metadata to Pods. Create a new file with the name kyverno-inject-node-az-id.yaml:

cat > kyverno-inject-node-az-id.yaml  << EOF
apiVersion: kyverno.io/v2beta1
kind: ClusterPolicy
metadata:
  name: inject-node-az-id
spec:
  background: false
  rules:
    - name: inject-node-az-id
      match:
        any:
        - resources:
            kinds:
            - Pod/binding
      context:
      - name: node
        variable:
          jmesPath: request.object.target.name
          default: ''
      - name: node_az_id
        apiCall:
          urlPath: "/api/v1/nodes/{{node}}"
          jmesPath: "metadata.labels.\"topology.k8s.aws/zone-id\" || 'empty'"
      mutate:
        patchStrategicMerge:
          metadata:
            annotations:
              node_az_id: "{{ node_az_id }}"
EOF

It instructs Kyverno to watch for pods in binding state. After Kyverno receives the AdmissionReview request for a pod, it sets the variable node to the name of the node to which the pod is being bound. It also sets another variable node_az_id to the Availability Zone ID by calling the Kubernetes API /api/v1/nodes/node to get the node metadata label topology.k8s.aws/zone-id. Finally, it defines a mutate rule to inject the obtained AZ ID into the pod’s metadata as an annotation node_az_id.
After you’ve created the file, apply the policy using the following command:

kubectl apply -f kyverno-inject-node-az-id.yaml

Deploy a pod without rack awareness

Now let’s visualize the problem statement. To do this, connect to one of the EKS pods and check how it interacts with the MSK cluster when you run a Kafka consumer from the pod.

First, get the bootstrap string of the MSK cluster. Look up the Amazon Resource Names (ARNs) of the MSK cluster:

MSK_CLUSTER_ARN=$(
    aws kafka list-clusters \
      --query 'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].ClusterArn' \
      --output text)
export MSK_CLUSTER_ARN

Using the cluster ARN, you can get the bootstrap string with the following command:

BOOTSTRAP_SERVER_LIST=$(
    aws kafka get-bootstrap-brokers \
        --cluster-arn "${MSK_CLUSTER_ARN}" \
        --query 'BootstrapBrokerStringSaslIam' \
        --output text)
export BOOTSTRAP_SERVER_LIST

Create a new file named kafka-no-az.yaml:

cat > kafka-no-az.yaml << EOF
apiVersion: v1
kind: Namespace
metadata:
 name: kafka-ns
---
apiVersion: v1
kind: ServiceAccount
metadata:
 name: kafka-sa
 namespace: kafka-ns
automountServiceAccountToken: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-no-az
  namespace: kafka-ns
  labels:
    app: kafka-no-az
  annotations:
    node_az_id: ''
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka-no-az
  template:
    metadata:
      labels:
        app: kafka-no-az
    spec:
      serviceAccountName: kafka-sa
      containers:
      - image: bitnami/kafka:3.8.0
        name: kafka-no-az
        command: ["/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"]
        env:
        - name: BootstrapServerString
          value: ${BOOTSTRAP_SERVER_LIST}
        - name: MSK_TOPIC
          value: "MSK-AZ-Aware-Topic"
        - name: KAFKA_HOME
          value: /opt/bitnami/kafka
        - name: KAFKA_BIN
          value: /opt/bitnami/kafka/bin
        - name: KAFKA_CONFIG
          value: /opt/bitnami/kafka/config
        - name: KAFKA_LIBS
          value: /opt/bitnami/kafka/libs
        - name: KAFKA_LOG4J_OPTS
          value: "-Dlog4j.configuration=file:/opt/bitnami/kafka/config/log4j.properties"
        lifecycle:
          postStart:
            exec:
              command: 
              - "sh"
              - "-c"
              - |
                export KAFKA_HOME=/opt/bitnami/kafka
                export KAFKA_BIN=\${KAFKA_HOME}/bin
                export KAFKA_CONFIG=\${KAFKA_HOME}/config
                cat > \${KAFKA_CONFIG}/client.properties << EOF1
                security.protocol=SASL_SSL
                sasl.mechanism=AWS_MSK_IAM
                sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
                sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
                EOF1
                
                cat >> \${KAFKA_CONFIG}/log4j.properties << EOF2
                #
                # Enable logging of Kafka Client to stderr
                #
                log4j.rootLogger=WARN, stderr
                log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractFetch=DEBUG
                log4j.appender.stderr=org.apache.log4j.ConsoleAppender
                log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
                log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
                log4j.appender.stderr.Target=System.err
                EOF2
                cd \${KAFKA_HOME}/libs
                /usr/bin/curl -sS -L https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar --output \${KAFKA_LIBS}/aws-msk-iam-auth-2.2.0-all.jar
EOF

This pod manifest doesn’t make use of the Availability Zone ID injected into the metadata annotation and hence doesn’t add client.rack to the client.properties configuration.

Deploy the pods using the following command:

kubectl apply -f kafka-no-az.yaml

Run the following command to confirm that the pods have been deployed and are in the Running state:

kubectl -n kafka-ns get pods

Select a pod id from the output of the previous command, and connect to it using:

kubectl -n kafka-ns exec -it POD_ID -- sh

Run the Kafka consumer:

"${KAFKA_BIN}"/kafka-console-consumer.sh \
  --bootstrap-server  "${BootstrapServerString}" \
  --consumer.config  "${KAFKA_CONFIG}"/client.properties \
  --topic "${MSK_TOPIC}" \
  --from-beginning /tmp/non-rack-aware-consumer.log 2>&1 &

This command will dump all the resulting logs into the file, non-rack-aware-consumer.log. There’s a lot of information in those logs, and we encourage you to open them and take a deeper look. Next, examine the EKS pod in action. To do this, run the following command to tail the file to view fetch request results to the MSK cluster. You’ll notice a handful of meaningful logs to review as the consumer access various partitions of the Kafka topic:

grep -E "DEBUG.*Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-[0-9]+" /tmp/rack-aware-consumer.log | tail -5

Observe your log output, which should look similar to the following:

[2025-03-12 23:59:05,308] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-3 at position FetchPosition{offset=100, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,308] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-0 at position FetchPosition{offset=83, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,542] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-5 at position FetchPosition{offset=100, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,542] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-2 at position FetchPosition{offset=107, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,720] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-4 at position FetchPosition{offset=84, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,720] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-1 at position FetchPosition{offset=85, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,811] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-3 at position FetchPosition{offset=100, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,811] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-0 at position FetchPosition{offset=83, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)

You’ve now connected to a specific pod in the EKS cluster and run a Kafka consumer to read from the MSK topic without rack awareness. Remember that this pod is running within a single Availability Zone.

Reviewing the log output, you find rack: values as use1-az2, use1-az4, and use1-az6 as the pod makes calls to different partitions of the topic. These rack values represent the Availability Zone IDs that our brokers are running within. This means that our EKS pod is creating networking connections to brokers across three different Availability Zones, which would be accruing networking charges in our account.

Also notice that you have no way to check which node, and therefore Availability Zone, this EKS pod is running in. You can observe in the logs that it’s calling to MSK brokers in different Availability Zones, but there is no way to know which broker is in the same Availability Zone as the EKS pod you’ve connected to. Delete the deployment when you’re done:

kubectl -n kafka-ns delete -f kafka-no-az.yaml

Deploy a pod with rack awareness

Now that you have experienced the consumer behavior without rack awareness, you need to inject the Availability Zone ID to make your pods rack-aware.

Create a new file named kafka-az-aware.yaml:

cat > kafka-az-aware.yaml << EOF
apiVersion: v1
kind: Namespace
metadata:
 name: kafka-ns
---
apiVersion: v1
kind: ServiceAccount
metadata:
 name: kafka-sa
 namespace: kafka-ns
automountServiceAccountToken: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-az-aware
  namespace: kafka-ns
  labels:
    app: kafka-az-aware
  annotations:
    node_az_id: ''
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka-az-aware
  template:
    metadata:
      labels:
        app: kafka-az-aware
    spec:
      serviceAccountName: kafka-sa
      containers:
      - image: bitnami/kafka:3.8.0
        name: kafka-az-aware
        command: ["/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"]
        env:
        - name: BootstrapServerString
          value: ${BOOTSTRAP_SERVER_LIST}
        - name: MSK_TOPIC
          value: "MSK-AZ-Aware-Topic"
        - name: KAFKA_HOME
          value: /opt/bitnami/kafka
        - name: KAFKA_BIN
          value: /opt/bitnami/kafka/bin
        - name: KAFKA_CONFIG
          value: /opt/bitnami/kafka/config
        - name: KAFKA_LIBS
          value: /opt/bitnami/kafka/libs
        - name: KAFKA_LOG4J_OPTS
          value: "-Dlog4j.configuration=file:/opt/bitnami/kafka/config/log4j.properties"
        - name: NODE_AZ_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.annotations['node_az_id']
        lifecycle:
          postStart:
            exec:
              command: 
              - "sh"
              - "-c"
              - |
                export KAFKA_HOME=/opt/bitnami/kafka
                export KAFKA_BIN=\${KAFKA_HOME}/bin
                export KAFKA_CONFIG=\${KAFKA_HOME}/config
                cat > \${KAFKA_CONFIG}/client.properties << EOF1
                security.protocol=SASL_SSL
                sasl.mechanism=AWS_MSK_IAM
                sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
                sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
                EOF1
                if [ \$NODE_AZ_ID ]
                then
                  echo "client.rack=\$NODE_AZ_ID" >> \${KAFKA_CONFIG}/client.properties
                fi
                
                cat >> \${KAFKA_CONFIG}/log4j.properties << EOF2
                #
                # Enable logging of Kafka Client to stderr
                #
                log4j.rootLogger=WARN, stderr
                log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractFetch=DEBUG
                log4j.appender.stderr=org.apache.log4j.ConsoleAppender
                log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
                log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
                log4j.appender.stderr.Target=System.err
                EOF2
                
                /usr/bin/curl -sS -L https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar --output \${KAFKA_LIBS}/aws-msk-iam-auth-2.2.0-all.jar
EOF

As you can observe, the pod manifest defines an environment variable NODE_AZ_ID, assigning it the value from the pod’s own metadata annotation node_az_id that was injected by Kyverno. The manifest then uses the pod’s postStart lifecycle script to add client.rack into the client.properties configuration, setting it equal to the value in the environment variable NODE_AZ_ID.

Deploy the pods using the following command:

kubectl apply -f kafka-az-aware.yaml

Run the following command to confirm that the pods have been deployed and are in the Running state:

kubectl -n kafka-ns get pods

Verify that Availability Zone Ids have been injected into the pods

for pod in $(kubectl -n kafka-ns get pods --field-selector=status.phase==Running -o=name | grep "pod/kafka-az-aware-" | xargs)
do
  kubectl -n kafka-ns get "$pod" -o yaml | grep "node_az_id:"
done

Your output should look similar to:

node_az_id: use1-az2
node_az_id: use1-az4
node_az_id: use1-az6

Or:

AWS CloudShell showing Kafka namespace pods and node assignments in Kubernetes cluster

Select a pod id from the output of the get pods command and shell-in to it.

kubectl -n kafka-ns exec -it POD_ID -- sh

The output of the get $pod command matches the order of results from the get pods command. This matching will help you understand what Availability Zone your pod is running in so you can compare it to log outputs later.

After you’ve connected to your pod, run the Kafka consumer:

"${KAFKA_BIN}"/kafka-console-consumer.sh \
  --bootstrap-server  "${BootstrapServerString}" \
  --consumer.config  "${KAFKA_CONFIG}"/client.properties \
  --topic "${MSK_TOPIC}" \
  --from-beginning /tmp/non-rack-aware-consumer.log 2>&1 &

Similar to before, this command will dump all the resulting logs into the file, rack-aware-consumer.log. You create a new file so there’s no overlap between the Kafka consumers you’ve run. There’s a lot of information in those logs, and we encourage you to open them and take a deeper look. If you want to see the rack awareness of your EKS pod in action, run the following command to tail the file to view fetch request results to the MSK cluster. You can observe a handful of meaningful logs to review here as the consumer access various partitions of the Kafka topic:

grep -E "DEBUG.*Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-[0-9]+" /tmp/rack-aware-consumer.log | tail -5

Observe your log output, which should look similar to the following:

[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-5 at position FetchPosition{offset=527, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-4 at position FetchPosition{offset=509, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-3 at position FetchPosition{offset=527, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-2 at position FetchPosition{offset=522, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-1 at position FetchPosition{offset=533, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-0 at position FetchPosition{offset=520, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)

For each log line, you can now observe two rack: values. The first rack: value shows the current leader, the second rack: shows the rack that is being used to fetch messages.

For example, look at MSK-AZ-Aware-Topic-5. The leader is identified as rack: use1-az4, but the fetch request is sent to use1-az6 as indicated by to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)

You’ll notice something similar in all other log lines. The fetch is always to the broker in use1-az6, which maps to our expectation, given the pod we connected to was in this Availability Zone.

Congratulations! You’re consuming from the closest replica on Amazon EKS.

Clean Up

Delete the deployment when finished:

kubectl -n kafka-ns delete -f kafka-az-aware.yaml

To delete the EKS Pod Identity association:

eksctl delete podidentityassociation \
--cluster ${EKS_CLUSTER_NAME} \
--namespace kafka-ns \
--service-account-name kafka-sa

To delete the IAM policy:

aws iam delete-policy \
  --policy-arn arn:aws:iam::"${AWS_ACCOUNT}":policy/MSK-AZ-Aware-Policy

To delete the EKS cluster:

eksctl delete cluster -n ${EKS_CLUSTER_NAME} --disable-nodegroup-eviction

If you followed along with this post using the Amazon MSK Data Generator, be sure to delete your deployment so it’s no longer attempting to generate and send data after you delete the rest of your resources.

Clean up will depend on which deployment option you used. To read more about the deployment options and the resources created for the Amazon MSK Data Generator, refer to Getting Started in the GitHub repository.

Creating an MSK cluster was a prerequisite of this post, and if you’d like to clean up the MSK cluster as well, you can use the following command:

aws kafka delete-cluster --cluster-arn "${MSK_CLUSTER_ARN}"

There is no additional cost to using AWS CloudShell, but if you’d like to delete your shell, refer to the Delete a shell session home directory in the AWS CloudShell User Guide.

Conclusion

Apache Kafka nearest replica fetching, or rack awareness, is a strategic cost-optimization technique. By implementing it for Amazon MSK consumers on Amazon EKS, you can significantly reduce cross-zone traffic costs while maintaining robust, distributed streaming architectures. Open source tools such as Kyverno can simplify complex configuration challenges and drive meaningful savings.The solution we’ve demonstrated provides a powerful, repeatable approach to dynamically injecting Availability Zone information into Kubernetes pods, optimize Kafka consumer routing, and minimize reduce transfer costs.

Additional resources

To learn more about rack awareness with Amazon MSK, refer to Reduce network traffic costs of your Amazon MSK consumers with rack awareness.


About the authors

Austin Groeneveld is a Streaming Specialist Solutions Architect at Amazon Web Services (AWS), based in the San Francisco Bay Area. In this role, Austin is passionate about helping customers accelerate insights from their data using the AWS platform. He is particularly fascinated by the growing role that data streaming plays in driving innovation in the data analytics space. Outside of his work at AWS, Austin enjoys watching and playing soccer, traveling, and spending quality time with his family.

Farooq Ashraf is a Senior Solutions Architect at AWS, specializing in SaaS, Generative AI, and MLOps. He is passionate about blending multi-tenant SaaS concepts with Cloud services to innovate scalable solutions for the digital enterprise, and has several blog posts, and workshops to his credit.

Automate data lineage in Amazon SageMaker using AWS Glue Crawlers supported data sources

Post Syndicated from Mohit Dawar original https://aws.amazon.com/blogs/big-data/automate-data-lineage-in-amazon-sagemaker-using-aws-glue-crawlers-supported-data-sources/

The next generation of Amazon SageMaker is the center for all your data, analytics, and AI. Bringing together widely adopted Amazon Web Services (AWS) machine learning (ML) and analytics capabilities, it delivers an integrated experience for analytics and AI with unified access to all your data. From Amazon SageMaker Unified Studio, a single data and AI development environment, you can access your data and use a suite of powerful tools for data processing, SQL analytics, model development, training and inference, and generative AI development.

With data lineage, now part of Amazon SageMaker Catalog, you can centralize lineage metadata of your data assets in a single place. You can track the flow of data over time, determining a clear understanding of where it originated, how it has changed, and its usage across the business. By providing this level of transparency, data lineage helps data consumers gain trust that the data is correct and compliant for their use cases. With data lineage captured at the table, column, and job level, data producers can conduct impact analysis of changes in their data pipelines and respond to data issues when needed, for example, when a column in the resulting dataset is missing the quality required by the business.

Data lineage is a powerful tool that can transform how organizations understand and manage their data flows. In this post, we explore its real-world impact through the lens of an ecommerce company striving to boost their bottom line.

To illustrate this practical application, we walk you through how you can use the prebuilt integration between SageMaker Catalog and AWS Glue crawlers to automatically capture lineage for data assets stored in Amazon Simple Storage Service (Amazon S3) and Amazon DynamoDB. Using this workflow, you can capture lineage automatically from additional data sources using AWS Glue crawlers. Refer to the Data lineage support matrix in the SageMaker Unified Studio User Guide for supported sources. We also use SageMaker Unified Studio to navigate these data assets and learn about their origin, transformations, and dependencies, thanks to the lineage metadata captured using the AWS Glue crawlers.

Key features of the SageMaker Catalog lineage graph

In SageMaker Unified Studio, you can explore and discover data assets of your organization suited for your use case. As you dive into these data assets, you can learn more about its business context, schema, quality, and lineage. When you decide to work with a subset of these assets, you can subscribe to them in a self-service fashion and start working with them. For more detail, visit Data discovery, subscription, and consumption in the SageMaker Unified Studio User Guide.

SageMaker Studio provides a visual lineage graph that shows how a data asset has evolved from its source through transformations to its final state. This helps data scientists, engineers, and analysts answer key questions such as:

  • Where did this data come from?
  • What transformations has it gone through?
  • Which downstream assets will be impacted by a change?

With this level of visibility, teams can perform faster impact analysis, find the root cause of data quality issues, and ensure models are built on trusted data. It also supports better collaboration so users can confidently use and share data across the organization. The following screenshot shows how SageMaker Unified Studio visualizes data lineage, making it straightforward to trace data flow and understand dependencies.

  • Column-level lineage – You can expand column-level lineage when available in dataset nodes. This automatically shows relationships with upstream or downstream dataset nodes if source column information is available.
  • Column search – If the dataset has more than 10 columns, the node presents pagination to navigate to columns not initially presented. To quickly view a particular column, you can search on the dataset node that lists only the searched column.
  • Details pane – Each lineage node captures and displays the following details:
    • Every dataset node has three tabs: LINEAGE, SCHEMA, and HISTORY. The HISTORY tab lists the different versions of lineage event captured for that node.
    • The job node has a details pane to display job details with the tabs Job info and History. The details pane also captures queries or expressions run as part of the job.
  • View dataset nodes only – If you want to filter out the job nodes, you can choose the open view control icon in the graph viewer and toggle the display dataset nodes only, which will remove all the job nodes from the graph and let you navigate only the dataset nodes.
  • Version tabs – All lineage nodes in Amazon DataZone data lineage will have versioning, captured as history, based on lineage events captured. You can view lineage at a selected timestamp that opens a new tab on the lineage page to help compare or contrast between the different timestamps.

You can try some of these features as you explore the data assets of this post. To learn more on data lineage in SageMaker, we encourage you to dive deep into the Data lineage in Amazon SageMaker Unified Studio.

Solution overview

Imagine a scenario where an ecommerce company aims to optimize conversion rates and enhance customer experience by gaining deeper insights into the customer journey. They need to connect the dots between user interactions and actual purchases, but with data scattered across multiple sources, where do they begin? This is where data lineage becomes invaluable. To perform their analysis, they need data from two primary sources:

  • Clickstream data stored in Amazon S3 (in JSON or Parquet format)
  • Transactional order data stored as items in Amazon DynamoDB

To make these datasets discoverable across the business, you need to:

  1. Create a project in SageMaker Unified Studio that will be used to source and manage the datasets
  2. Enable data lineage capture in the SageMaker Unified Studio project
  3. Set up the resources for this use case, which includes an AWS Glue data source (set up in SageMaker Unified Studio) and AWS Glue crawler (set up in AWS Glue)
  4. Run the AWS Glue crawler to catalog the datasets in AWS Glue Data Catalog
  5. Source the metadata of the data assets into the SageMaker Catalog by running the data source
  6. Use SageMaker Unified Studio to navigate through the lineage of the data assets and visualize their origin
  7. Understand how schema evolution is captured in the data asset’s lineage

Prerequisites

To complete the steps on this post, you need an SageMaker Unified Studio domain already deployed in your AWS account. To get started quickly in a testing environment, we suggest creating your SageMaker domain using the quick setup option as explained in Create an Amazon SageMaker Unified Studio domain – quick setup.

Solution steps

To capture data lineage for AWS Glue tables managed with AWS Glue crawlers using SageMaker Unified Studio, complete the steps in the following sections.

Set up a SageMaker project with SQL capability

In SageMaker Unified Studio, a project profile defines an uber template for projects in your Amazon SageMaker unified domain. By setting up a project with the right tooling (project profile), you will provision resources you can use to work with data, which might include cataloging it in SageMaker, transforming it into new data assets, analyzing it to drive business value, or even use it for ML or AI applications.

To demonstrate data lineage effectively, we use SageMaker SQL analytics project profile for a streamlined setup. Although this profile offers comprehensive data analytics capabilities, we focus specifically on two key components:

  • AWS Glue database – A lakehouse for storing and managing technical metadata
  • Data source job – Automatically collects and tracks metadata into SageMaker Catalog

We’ve chosen this profile to bypass complex manual configurations so we can focus on the core concepts of data lineage.

To create a new project in your SageMaker domain using the SQL analytics project profile, follow the steps detailed in SQL analytics project profile. Keep all default configurations when creating the project.

After creating your project in SageMaker Studio, you’ll unlock powerful data lineage capabilities that make tracking and understanding your data flows intuitive. Through the data sourcing feature, you can easily monitor how data moves from source to the AWS Glue database. This visibility becomes particularly valuable when debugging data issues—you can quickly trace data back to its source, understand how changes impact downstream processes, and identify affected analyses or reports. Next, populate the AWS Glue database with sample data to observe these features in action and demonstrate how they can streamline your data operations.

For further guidance on how to access the details of the new SageMaker project, refer to Get project details. After you access the data source details, in the Database name field, take note of the AWS Glue database name associated to the SageMaker project.

Enable data lineage capture in the SageMaker project’s data source

To enable lineage capture, follow these steps:

  1. Expand the Actions menu, then choose Edit data source.
  2. Go to the connections and select Import data lineage to configure lineage capture from the source, as shown in the following screenshot.
  3. Make other changes to the data source fields as desired, then choose Save.

Enabling lineage will make sure the data source job will capture lineage in the next run.

Deploy resources for the use case

Follow these steps:

  1. To deploy the resources required for this post, download the AWS CloudFormation template amazon-datazone-examples in the AWS Samples GitHub repository. Deploy it in your AWS account.

For further guidance on how to deploy a CloudFormation stack, refer to Create a stack from the CloudFormation console. You need to provide a Stack name and the name of the AWS GlueDatabaseName associated to the project of your SageMaker domain, as shown in the following screenshot.

  1. Choose Next.

The template will deploy the following resources:

  • A S3 bucket with a sample file of clickstream data. The bucket name and location of the file will follow the path pattern s3://ecomm-analytics-<ACCOUNT_ID>-<REGION>/clickstream/<YYYY>/<MM>/<DD>/data.json. The file will contain a sample record with the following structure:
{
    "session_id": "abc123",
    "user_id": "u789",
    "event_type": "product_view",
    "product_id": "prod456",
    "timestamp": "2025-06-04T09:23:12Z"
}
  • A DynamoDB table with a sample item of order data (transactions). The table will be named OrderTransactionTable. The sample item will have the following structure:
{
    "order_id": "ord789",
    "user_id": "u789",
    "product_id": "prod456",
    "order_total": 79.99,
    "order_timestamp": "2025-06-04T09:27:10Z"
}
  • An AWS Glue crawler configured to crawl the S3 bucket and DynamoDB table deployed as part of the stack and store the metadata in the AWS Glue database associated to the SageMaker project. You can access the crawler’s details in the AWS console, as shown in the following screenshot.

Run the AWS Glue crawler

The AWS Glue crawler deployed in the previous step will allow you to capture metadata from the two data sources, Amazon S3 and DynamoDB, and store it in AWS Glue Data Catalog, specifically in the database associated to the SageMaker project. After the metadata is stored, it will be accessible to SageMaker.

Before running the crawler, you need to provide AWS Lake Formation permissions to the IAM role that the AWS Glue crawler will use to interact with your data source and target AWS Glue database. The following command will grant the permissions needed for the crawler to store metadata into the AWS Glue database of the SageMaker project.

To invoke this command, we recommend using AWS CloudShell on the AWS console as explained in AWS CloudShell Concepts. Update the <REGION>, <ACCOUNT_ID> and <GLUE_DATABASE_NAME> placeholders with the right values for your AWS Region, AWS account ID, and name of the AWS Glue database associated to the SageMaker project.

aws lakeformation grant-permissions \
  --region  \
  --principal DataLakePrincipalIdentifier=arn:aws:iam:::role/glue-crawler-role \ 
  --permissions CREATE_TABLE \
  --resource '{ "Database": { "Name": "" } }'
  

Next, run the AWS Glue Crawler on the AWS console. After the crawler successfully finishes, two new tables, clickstream and ordertransactiontable, will be created in the AWS Glue database associated to the SageMaker project. Refer to Viewing crawler results and details to learn more about AWS Glue crawler results.

Source metadata from the AWS Glue database into SageMaker

To source metadata from data assets in the AWS Glue database, including their lineage, into SageMaker, use the data source that was deployed as part of the SageMaker project creation.

  1. To run the data source, go to the data source details page.
  2. Choose Run. (Data sources can be scheduled to run as well, however, for this demonstration we trigger a manual run).

After the data source run is complete, metadata from both data assets in the AWS Glue database will be imported into the SageMaker domain as the project’s inventory assets. You can find the details of the data source run from within SageMaker Unified Studio, which include:

  • The data assets from the AWS Glue database that were ingested into SageMaker.
  • The status of the data lineage import for each data asset, which includes an event ID for traceability. This lineage event ID can be used to debug inconsistencies in the resulting lineage graph. You can use the GetLineageEvent API to retrieve the raw payload of the lineage event.

Visualizing the data lineage graph of the data assets in SageMaker Unified Studio

With SageMaker Unified Studio, you have a single place to manage and discover data assets. When accessing a data asset published in the SageMaker central catalog or in your project’s own inventory, you can dive into the asset’s metadata, which includes its schema, business description, custom metadata forms, quality, lineage, and more. To visualize the lineage graph of each data asset of this post, follow these steps:

  1. In SageMaker Studio, navigate to the Assets section of the SageMaker project details page and choose INVENTORY
  2. Select the asset that you want to explore. You can also access the asset directly from the data source run by selecting the asset name.
  3. To view the lineage graph of the data asset up to its origin, shown in the following screenshots, choose the LINEAGE tab.
    • For clickstream table (Sourced from S3)

    • For order transactions table (Sourced from DynamoDB)

With lineage, you can now confirm that the data originated from sources such as Amazon S3 and Amazon DynamoDB and understand how it has been transformed along the way. Because of this end-to-end visibility, you can trust the data, make informed decisions, and provide compliance with confidence. The lineage graph captures essential metadata that forms the foundation of lineage tracking.

  • This includes table schemas, column definitions and their data types.
  • Column-level lineage becomes particularly powerful in this context. Imagine your clickstream’s AWS Glue table powers an Amazon QuickSight dashboard analyzing customer purchase patterns and notice discrepancies in your revenue reports. With column lineage, you can instantly trace the source of those columns.
  • This granular visibility not only accelerates debugging but also proves invaluable during schema changes, as we show in the following section by changing the source schema.
  • The crawler details such as crawlerRunId (present in the source identifier of the lineage node) and crawler start and end times can be used to debug which crawler runs updated the table.

Understanding your data asset’s schema evolution through lineage in SageMaker Unified Studio

Imagine the order transactions source in DynamoDB was updated with new information. Because this source powers an Amazon QuickSight report for the customer using the AWS Glue database table, it’s important for consumers to know what changes in the data pipeline updated the report.

  1. Edit the DynamoDB table item with additional columns to learn how lineage graph can be used to view historical updates:
{
    "order_id": "ord789",
    "user_id": "u789",
    "product_id": "prod456",
    "order_total": 79.99,
    "order_timestamp": "2025-06-04T09:27:10Z",
	"customerSegment": "new-customer",
    "conversionSource": "primeDayEmailCampaign"
}
  1. Enter the OrderTransactionsCrawler Glue crawler again on the AWS console. After completion, you’ll notice that it updated the ordertransactiontable AWS Glue table, as shown in the following screenshot.

  1. Run again the data source associated to the project in SageMaker Unified Studio to import the latest metadata into the SageMaker Catalog. After completion, you’ll notice the data source updated the ordertransactiontable data asset in the SageMaker Catalog, as shown in the following screenshot.

This section explores how lineage can be useful to track the updates.

Navigate to the ordertransactiontable data asset in SageMaker Catalog by selecting it from the data source run and choose the LINEAGE tab, as shown in the following screenshot.

Notice how the new columns are available in the lineage graph. A new crawler run ID is present as the source identifier of the crawler lineage node. The history tab shows multiple crawler runs. You can navigate to check the state of the system during the first run.

Cleanup

After you’re done, we recommend to cleaning up the resources created for this post to avoid unintended charges:

  1. Delete the inventory assets that were cataloged in the SageMaker project’s inventory, as explained in Delete an Amazon SageMaker Unified Studio asset.
  2. Delete the SageMaker project that was created as part of this post, as explained in Delete a project.
  3. Delete the CloudFormation stack that was deployed as part of this post, as explained in Delete a stack from the CloudFormation console.
  4. The S3 bucket created as part of the CloudFormation stack will remain after its deletion because it contains a data file in it. Empty and delete the bucket, as explained in Deleting a general purpose bucket.

Conclusion

In this post, you were able to explore the data lineage capabilities of Amazon SageMaker, specifically when working with AWS Glue crawlers. You learned how you can set up an AWS Glue crawler to infer metadata from data assets in multiple sources such as Amazon S3 and DynamoDB and store it the AWS Glue Data Catalog. You also imported this metadata, including data lineage, into Amazon SageMaker through the data source capability of a SageMaker project. Finally, you explored the resulting lineage graph of data assets in SageMaker Unified Studio and saw some of the functionalities available to understand the origin path of them, understand how columns are transformed, and what impact looks like when performing changes to any step of the pipeline.We encourage you to now test the capabilities you explored in this post with your own data. By following the pattern presented in this post, many customers have been able to achieve governance of their data lake and lakehouse platforms on top of Amazon SageMaker with data lineage and more.


About the authors

Mohit Dawar is a Senior Software Engineer at Amazon Web Services (AWS) working on Amazon DataZone. Over the past 3 years, he has led efforts around the core metadata catalog, generative AI–powered metadata curation, and lineage visualization. He enjoys working on large-scale distributed systems, experimenting with AI to improve user experience, and building tools that make data governance feel effortless. Connect with him on LinkedIn: Mohit Dawar.

Jose Romero is a Senior Solutions Architect for Startups at Amazon Web Services (AWS) based in Austin, TX, US. He is passionate about helping customers architect modern platforms at scale for data, AI, and ML. As a former senior architect in AWS Professional Services, he enjoys building and sharing solutions for common complex problems so that customers can accelerate their cloud journey and adopt best practices. Connect with him on LinkedIn: Jose Romero.

Accelerate your data quality journey for lakehouse architecture with Amazon SageMaker, Apache Iceberg on AWS, Amazon S3 tables, and AWS Glue Data Quality

Post Syndicated from Brody Pearman original https://aws.amazon.com/blogs/big-data/accelerate-your-data-quality-journey-for-lakehouse-architecture-with-amazon-sagemaker-apache-iceberg-on-aws-amazon-s3-tables-and-aws-glue-data-quality/

In an era where data drives innovation and decision-making, organizations are increasingly focused on not only accumulating data but on maintaining its quality and reliability. High-quality data is essential for building trust in analytics, enhancing the performance of machine learning (ML) models, and supporting strategic business initiatives.

By using AWS Glue Data Quality, you can measure and monitor the quality of your data. It analyzes your data, recommends data quality rules, evaluates data quality, and provides you with a score that quantifies the quality of your data. With this, you can make confident business decisions. With this launch, AWS Glue Data Quality is now integrated with the lakehouse architecture of Amazon SageMaker, Apache Iceberg on general purpose Amazon Simple Storage Service (Amazon S3) buckets, and Amazon S3 Tables. This integration brings together serverless data integration, quality management, and advanced ML capabilities in a unified environment.

This post explores how you can use AWS Glue Data Quality to maintain data quality of S3 Tables and Apache Iceberg tables on general purpose S3 buckets. We’ll discuss strategies for verifying the quality of published data and how these integrated technologies can be used to implement effective data quality workflows.

Solution overview

In this launch, we’re supporting the lakehouse architecture of Amazon SageMaker, Apache Iceberg on general purpose S3 buckets, and Amazon S3 Tables. As example use cases, we demonstrate data quality on an Apache Iceberg table stored in a general purpose S3 bucket as well as on Amazon S3 Tables. The steps will cover the following:

  1. Create an Apache Iceberg table on a general purpose Amazon S3 bucket and an Amazon S3 table in a table bucket using two AWS Glue extract, transform, and load (ETL) jobs
  2. Grant appropriate AWS Lake Formation permissions on each table
  3. Run data quality recommendations at rest on the Apache Iceberg table on general purpose S3 bucket
  4. Run the data quality rules and visualize the results in Amazon SageMaker Unified Studio
  5. Run data quality recommendations at rest on the S3 table
  6. Run the data quality rules and visualize the results in SageMaker Unified Studio

The following diagram is the solution architecture.

Prerequisites

To implement the instructions, you must have the following prerequisites:

Create S3 tables and Apache Iceberg on general purpose S3 bucket

First, complete the following steps to upload data and scripts:

  1. Upload the attached AWS Glue job scripts to your designated script bucket in S3
    1. create_iceberg_table_on_s3.py
    2. create_s3_table_on_s3_bucket.py
  2. To download the New York City Taxi – Yellow Trip Data dataset for January 2025 (Parquet file), navigate to NYC TLC Trip Record Data, expand 2025, and choose Yellow Taxi Trip records under January section. A file called yellow_tripdata_2025-01.parquet will be downloaded to your computer.
  3. On the Amazon S3 console, open an input bucket of your choice and create a folder called nyc_yellow_trip_data. The stack will create a GlueJobRole with permissions to this bucket.
  4. Upload the yellow_tripdata_2025-01.parquet file to the folder.
  5. Download the CloudFormation stack file. Navigate to the CloudFormation console. Choose Create stack. Choose Upload a template file and select the CloudFormation template you downloaded. Choose Next.
  6. Enter a unique name for Stack name.
  7. Configure the stack parameters. Default values are provided in the following table:
Parameter Default value Description
ScriptBucketName N/A – user-supplied Name of the referenced Amazon S3 general purpose bucket containing the AWS Glue job scripts
DatabaseName iceberg_dq_demo Name of the AWS Glue Database to be created for the Apache Iceberg table on general purpose Amazon S3 bucket
GlueIcebergJobName create_iceberg_table_on_s3 The name of the created AWS Glue job that creates the Apache Iceberg table on general purpose Amazon S3 bucket
GlueS3TableJobName create_s3_table_on_s3_bucket The name of the created AWS Glue job that creates the Amazon S3 table
S3TableBucketName dataquality-demo-bucket Name of the Amazon S3 table bucket to be created.
S3TableNamespaceName s3_table_dq_demo Name of the Amazon S3 table bucket namespace to be created
S3TableTableName ny_taxi Name of the Amazon S3 table to be created by the AWS Glue job
IcebergTableName ny_taxi Name of the Apache Iceberg table on general purpose Amazon S3 to be created by the AWS Glue job
IcebergScriptPath scripts/create_iceberg_table_on_s3.py The referenced Amazon S3 path to the AWS Glue script file for the Apache Iceberg table creation job. Verify the file name matches the corresponding GlueIcebergJobName
S3TableScriptPath scripts/create_s3_table_on_s3_bucket.py The referenced Amazon S3 path to the AWS Glue script file for the Amazon S3 table creation job. Verify the file name matches the corresponding GlueS3TableJobName
InputS3Bucket N/A – user-supplied bucket Name of the referenced Amazon S3 bucket with which the NY Taxi data was uploaded
InputS3Path nyc_yellow_trip_data The referenced Amazon S3 path with which the NY Taxi data was uploaded
OutputBucketName N/A – user-supplied Name of the created Amazon S3 general purpose bucket for the AWS Glue job for Apache Iceberg table data

Complete the following steps to configure AWS Identity and Access Management (IAM) and Lake Formation permissions:

  1. If you haven’t previously worked with S3 Tables and analytics services, navigate to Amazon S3.
  2. Choose Table buckets.
  3. Choose Enable integration to enable analytics service integrations with your S3 table buckets.
  4. Navigate to the Resources tab for your AWS CloudFormation stack. Note the IAM role with the logical ID GlueJobRole and the database name with the logical ID GlueDatabase. Additionally, note the name of the S3 table bucket with the logical ID S3TableBucket as well as the namespace name with the logical ID S3TableBucketNamespace. The S3 table bucket name is the portion of the Amazon Resource Name (ARN) which follows: arn:aws:s3tables:<region>:<accountID>:bucket/{S3 Table bucket Name}. The namespace name is the portion of the namespace ARN which follows: arn:aws:s3tables:<region>:<accountID>:bucket/{S3 Table bucket Name}|{namespace name}.
  5. Navigate to the Lake Formation console with a Lake Formation data lake administrator.
  6. Navigate to the Databases tab and select your GlueDatabase. Note the selected default catalog should match your AWS account ID.
  7. Select the Actions dropdown menu and under Permissions, choose Grant.
  8. Grant your GlueJobRole from step 4 the necessary permissions. Under Database permissions, select Create table and Describe, as shown in the following screenshot.

Navigate back to the Databases tab in Lake Formation and select the catalog that matches with the value of S3TableBucket you noted in step 4 in the format: <AWS account ID>:s3tablescatalog/<S3 Table Bucket name>

  1. Select your namespace name. From the Actions dropdown menu, under Permissions, choose Grant.
  2. Grant your GlueJobRole from step 4 the necessary permissions Under Database permissions, select Create table and Describe, as shown in the following screenshot.

To run the jobs created in the CloudFormation stack to create the sample tables and configure Lake Formation permissions for the DataQualityRole, complete the following steps:

  1. In the Resources tab of your CloudFormation stack, note the AWS Glue job names for the logical resource IDs: GlueS3TableJob and GlueIcebergJob.
  2. Navigate to the AWS Glue console and select ETL jobs. Select your GlueIcebergJob from step 11 and choose Run job. Select your GlueS3TableJob and choose Run job.
  3. To verify the successful creation of your Apache Iceberg table on general purpose S3 bucket in the database, navigate to Lake Formation with your Lake Formation data lake administrator permissions. Under Databases, select your GlueDatabase. The selected default catalog should match your AWS account ID.
  4. On the dropdown menu, choose View and then Tables. You should see a new tab with the table name you specified for IcebergTableName. You have verified the table creation.
  5. Select this table and grant your DataQualityRole (<stack_name>-DataQualityRole-<xxxxxx>) the necessary Lake Formation permissions by choosing the Grant link in the Actions tab. Choose Select, Describe from Table permissions for the new Apache Iceberg table.
  6. To verify the S3 table in the S3 table bucket, navigate to Databases in the Lake Formation console with your Lake Formation data lake administrator permissions. Make sure the selected catalog is your S3 table bucket catalog: <AWS account ID>:s3tablescatalog/<S3 Table Bucket name>
  7. Select your S3 table namespace and choose the dropdown menu View.
  8. Choose Tables and you should see a new tab with the table name you specified for S3TableTableName. You have verified the table creation.
  9. Choose the link for the table and under Actions, choose Grant. Grant your DataQualityRole the necessary Lake Formation permissions. Choose Select, Describe from Table permissions for the S3 table.
  10. In the Lake Formation console with your Lake Formation data lake administrator permissions, on the Administration tab, choose Data lake locations .
  11. Choose Register location. Input your OutputBucketName as the Amazon S3 path. Input the LakeFormationRole from the stack resources as the IAM role. Under Permission mode, choose Lake Formation.
  12. On the Lake Formation console under Application integration settings, select Allow external engines to access data in Amazon S3 locations with full table access, as shown in the following screenshot.

Generate recommendations for Apache Iceberg table on general purpose S3 bucket managed by Lake Formation

In this section, we show how to generate data quality rules using the data quality rule recommendations feature of AWS Glue Data Quality for your Apache Iceberg table on a general purpose S3 bucket. Follow these steps:

  1. Navigate to the AWS Glue console. Under Data Catalog, choose Databases. Choose the GlueDatabase.
  2. Under Tables, select your IcebergTableName. On the Data quality tab, choose Run history.
  3. Under Recommendation runs, choose Recommend rules.
  4. Use the DataQualityRole (<stack_name>-DataQualityRole-<xxxxxx>) to generate data quality rule recommendations, leaving the other settings as default. The results are shown in the following screenshot.

Run data quality rules for Apache Iceberg table on general purpose S3 bucket managed by Lake Formation

In this section, we show how to create a data quality ruleset with the recommended rules. After creating the ruleset, we run the data quality rules. Follow these steps:

  1. Copy the resulting rules from your recommendation run by selecting the dq-run ID and choosing Copy.
  2. Navigate back to the table under the Data quality tab and choose Create data quality rules. Paste the ruleset from step 1 here. Choose Save ruleset, as shown in the following screenshot.

  1. After saving your ruleset, navigate back to the Data Quality tab for your Apache Iceberg table on the general purpose S3 bucket. Select the ruleset you created. To run the data quality evaluation run on the ruleset using your data quality role, choose Run, as shown in the following screenshot.

Generate recommendations for the S3 table on the S3 table bucket

In this section, we show how to use the AWS Command Line Interface (AWS CLI) to generate recommendations for your S3 table on the S3 table bucket. This will also create a data quality ruleset for the S3 table. Follow these steps:

  1. Fill in your S3 table namespace name, S3 table table name, Catalog ID, and Data Quality role ARN in the following JSON file and save it locally:
{
    "DataSource": {
        "GlueTable": {
            "DatabaseName": "<namespace name>",
            "TableName": "<table name>",
            "CatalogId": "<account ID>:s3tablescatalog/<s3 table bucket name>"
        }
    },
    "Role": "<Data Quality role ARN>",
    "NumberOfWorkers": 5,
    "Timeout": 120,
    "CreatedRulesetName": "data_quality_s3_table_demo_ruleset"
}
  1. Enter the following AWS CLI command replacing local file name and region with your own information:
aws glue start-data-quality-rule-recommendation-run --cli-input-json file://<file name> --region <region>
  1. Run the following AWS CLI command to confirm the recommendation run succeeds:
aws glue get-data-quality-rule-recommendation-run --run-id <input run ID from step 2> --region <region>

Run data quality rules for the S3 table on the S3 table bucket

In this section, we show how to use the AWS CLI to evaluate the data quality ruleset on the S3 tables bucket that we just created. Follow these steps:

  1. Replace S3 table namespace name, S3 tables table name, Catalog ID, and Data Quality role ARN with your own information in the following JSON file and save it locally:
{
    "DataSource": {
         "GlueTable": {
            "DatabaseName": "<namespace name>",
            "TableName": "<table name>",
            "CatalogId": "<account ID>:s3tablescatalog/<s3 table bucket name>"
        }
    },
    "Role": "<>",
    "NumberOfWorkers": 2,
    "Timeout": 120,
    "AdditionalRunOptions": {
        "CloudWatchMetricsEnabled": true,
        "CompositeRuleEvaluationMethod": "COLUMN"
    },
    "RulesetNames": ["data_quality_s3_table_demo_ruleset"]
}
  1. Run the following AWS CLI command replacing local file name and region with your information:
aws glue start-data-quality-ruleset-evaluation-run --cli-input-json file://<file name> --region <region>
  1. Run the following AWS CLI command replacing region and data quality run ID with your information:
aws glue get-data-quality-ruleset-evaluation-run --run-id <input run ID from step 2> --region <region>

View results in SageMaker Unified Studio

Complete the following steps to view results from your data quality evaluation runs in SageMaker Unified Studio:

  1. Log in to the SageMaker Unified Studio portal using your single sign-on (SSO).
  2. Navigate to your project and note the project role ARN
  3. Navigate to the Lake Formation console with your Lake Formation data lake administrator permissions. Select your Apache Iceberg table that you created on general purpose S3 bucket and choose Grant from the Actions dropdown menu. Grant the following Lake Formation permissions to your SageMaker Unified Studio project role from step 2:
    1. Describe for Table permissions and Grantable permissions
  4. Next, select your S3 Table from the S3 Table bucket catalog in Lake Formation and choose Grant from the Actions drop-down. Grant the below Lake Formation permissions to your SageMaker Unified Studio project role from step 2:
    1. Describe for Table permissions and Grantable permissions
  5. Follow the steps at Create an Amazon SageMaker Unified Studio data source for AWS Glue in the project catalog to configure your data source for your GlueDatabase and your S3 tables namespace.
    1. Choose a name and optionally enter a description for your data source details.
    2. Choose AWS Glue (Lakehouse) for your Data source type. Leave connection and data lineage as the default values.
    3. Choose Use the AwsDataCatalog for the Apache Iceberg table on general purpose S3 bucket AWS Glue database.
    4. Choose the Database name corresponding to the GlueDatabase.Choose Next.
    5. Under Data quality, select Enable data quality for this data source. Leave the rest of the defaults.
    6. Configure the next data source with a name for your S3 table namespace. Optionally, enter a description for your data source details.
    7. Choose AWS Glue (Lakehouse) for your Data source type. Leave connection and data lineage as the default values.
    8. Choose to enter the catalog name: s3tablescatalog/<S3TableBucketName>
    9. Choose the Database name corresponding to the S3 table namespace. Choose Next.
    10. Select Enable data quality for this data source. Leave the rest of the defaults.
  6. Run each dataset.
  7. Navigate to your project’s Assets and select the related asset that you created for Apache Iceberg table on general purpose S3 bucket. Navigate to the Data Quality tab to view your data quality results. You should be able to see the data quality results for the S3 table asset similarly.

The data quality results in the following screenshot show each rule evaluated in the selected data quality evaluation run and its result. The data quality score calculates the percentage of rules that passed, and the overview shows how certain rule types faired across the evaluation. For example, Completeness rule types all passed, but ColumnValues rule types passed only three out of nine times.

Cleanup

To avoid incurring future charges, clean up the resources you created during this walkthrough:

  1. Navigate to the blog post output bucket and delete its contents.
  2. Un-register the data lake location for your output bucket in Lake Formation
  3. Revoke the Lake Formation permissions for your SageMaker project role, for your data quality role, and for your AWS Glue job role.
  4. Delete the input data file and the job scripts from your bucket.
  5. Delete the S3 table.
  6. Delete the CloudFormation stack.
  7. [Optional] Delete your SageMaker Unified Studio domain and the associated CloudFormation stacks it created on your behalf.

Conclusion

In this post, we demonstrated how you can now generate data quality recommendation for your lakehouse architecture using Apache Iceberg tables on general purpose Amazon S3 buckets and Amazon S3 Tables. Then we showed how to integrate and view these data quality results in Amazon SageMaker Unified Studio. Try this out for your own use case and share your feedback and questions in the comments.


About the Authors

Brody Pearman is a Senior Cloud Support Engineer at Amazon Web Services (AWS). He’s passionate about helping customers use AWS Glue ETL to transform and create their data lakes on AWS while maintaining high data quality. In his free time, he enjoys watching football with his friends and walking his dog.

Shiv Narayanan is a Technical Product Manager for AWS Glue’s data management capabilities like data quality, sensitive data detection and streaming capabilities. Shiv has over 20 years of data management experience in consulting, business development and product management.

Shriya Vanvari is a Software Developer Engineer in AWS Glue. She is passionate about learning how to build efficient and scalable systems to provide better experience for customers. Outside of work, she enjoys reading and chasing sunsets.

Narayani Ambashta is an Analytics Specialist Solutions Architect at AWS, focusing on the automotive and manufacturing sector, where she guides strategic customers in developing modern data and AI strategies. With over 15 years of cross-industry experience, she specializes in big data architecture, real-time analytics, and AI/ML technologies, helping organizations implement modern data architectures. Her expertise spans across lakehouse architecture, generative AI, and IoT platforms, enabling customers to drive digital transformation initiatives. When not architecting modern solutions, she enjoys staying active through sports and yoga.

Build an analytics pipeline that is resilient to Avro schema changes using Amazon Athena

Post Syndicated from Mohammad Sabeel original https://aws.amazon.com/blogs/big-data/build-an-analytics-pipeline-that-is-resilient-to-avro-schema-changes-using-amazon-athena/

As technology progresses, the Internet of Things (IoT) expands to encompass more and more things. As a result, organizations collect vast amounts of data from diverse sensor devices monitoring everything from industrial equipment to smart buildings. These sensor devices frequently undergo firmware updates, software modifications, or configuration changes that introduce new monitoring capabilities or retire obsolete metrics. As a result, the data structure (schema) of the information transmitted by these devices evolves continuously.

Organizations commonly choose Apache Avro as their data serialization format for IoT data due to its compact binary format, built-in schema evolution support, and compatibility with big data processing frameworks. This becomes crucial when sensor manufacturers release updates that add new metrics or deprecate old ones, allowing for seamless data processing. For example, when a sensor manufacturer releases a firmware update that adds new temperature precision metrics or deprecates legacy vibration measurements, Avro’s schema evolution capabilities allow for seamless handling of these changes without breaking existing data processing pipelines.

However, managing schema evolution at scale presents significant challenges. For example, organizations need to store and process data from thousands of sensors and update their schemas independently, handle schema changes occurring as frequently as every hour due to rolling device updates, maintain historical data compatibility while accommodating new schema versions, query data across multiple time periods with different schemas for temporal analysis, and ensure minimal query failures due to schema mismatches.

To address this challenge, this post demonstrates how to build such a solution by combining Amazon Simple Storage Service (Amazon S3) for data storage, AWS Glue Data Catalog for schema management, and Amazon Athena for one-time querying. We’ll focus specifically on handling Avro-formatted data in partitioned S3 buckets, where schemas can change frequently while providing consistent query capabilities across all data regardless of schema versions.

This solution is specifically designed for Hive-based tables, such as those in the AWS Glue Data Catalog, and is not applicable for Iceberg tables. By implementing this approach, organizations can build a highly adaptive and resilient analytics pipeline capable of handling extremely frequent Avro schema changes in partitioned S3 environments.

Solution overview

In this post as an example, we’re simulating a real-world IoT data pipeline with the following requirements:

  • IoT devices continuously upload sensor data in Avro format to an S3 bucket, simulating real-time IoT data ingestion
  • The schema change happens frequently over time
  • Data will be partitioned hourly to reflect typical IoT data ingestion patterns
  • Data needs to be queryable using the most recent schema version through Amazon Athena.

To achieve these requirements, we demonstrate the solution using automated schema detection. We use AWS Command Line Interface (AWS CLI) and AWS SDK for Python (Boto3) scripts to simulate an automated mechanism that continually monitors the S3 bucket for new data, detects schema changes in incoming Avro files, and triggers necessary updates to the AWS Glue Data Catalog.

For schema evolution handling, our solution will demonstrate how to create and update table definitions in the AWS Glue Data Catalog, incorporate Avro schema literals to handle schema changes, and use the Athena partition projection for efficient querying across schema versions. The data steward or admin needs to know when and how the schema is updated so that the admin can manually change the columns in the UpdateTable API call. For validation and querying, we use Amazon Athena queries to verify table definitions and partition details and demonstrate successful querying of data across different schema versions. By simulating these components, our solution addresses the key requirements outlined in the introduction:

  • Handling frequent schema changes (as often as hourly)
  • Managing data from thousands of sensors updating independently
  • Maintaining historical data compatibility while accommodating new schemas
  • Enabling querying across multiple time periods with different schemas
  • Minimizing query failures due to schema mismatches

Although in a production environment this would be integrated into a sophisticated IoT data processing application, our simulation using AWS CLI and Boto3 scripts effectively demonstrates the principles and techniques for managing schema evolution in large-scale IoT deployments.

The following diagram illustrates the solution architecture.

Prerequisites:

To perform the solution, you need to have the following prerequisites:

Create the base table

In this section, we simulate the initial setup of a data pipeline for IoT sensor data. This step is crucial because it establishes the foundation for our schema evolution demonstration. This initial table serves as the starting point from which our schema will evolve. It allows us to demonstrate how to handle schema changes over time. In this scenario, the base table contains three key fields: customerID (bigint), sentiment (a struct containing customerrating), and dt (string) as a partition column. And Avro schema literal (‘avro.schema.literal’)along with other configurations. Follow these steps:

  1. Create a new file named `CreateTableAPI.py` with the following content. Replace 'Location': 's3://amzn-s3-demo-bucket/' with your S3 bucket details and <AWS Account ID> with your AWS account ID:
import boto3
import time

if __name__ == '__main__':
    database_name = " blogpostdatabase"
    table_name = "blogpost_table_test"
    catalog_id = ''
    client = boto3.client('glue')

    response = client.create_table(
        CatalogId=catalog_id,
        DatabaseName=database_name,
        TableInput={
            'Name': table_name,
            'Description': 'sampletable',
            'Owner': 'root',
            'TableType': 'EXTERNAL_TABLE',
            'LastAccessTime': int(time.time()),
            'LastAnalyzedTime': int(time.time()),
            'Retention': 0,
            'Parameters' : {
                'avro.schema.literal': '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 }] } ], "default" : 0 }]}'
            },
            'StorageDescriptor': {
                'Columns': [
                    {
                        'Name': 'customerID',
                        'Type': 'bigint',
                        'Comment': 'from deserializer'
                    },
                    {
                        'Name': 'sentiment',
                        'Type': 'struct<customerrating:bigint>',
                        'Comment': 'from deserializer'
                    }
                ],
                'Location': 's3:///',
                'InputFormat': 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat',
                'OutputFormat': 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat',
                'SerdeInfo': {
                    'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe',
                    'Parameters': {
                        'avro.schema.literal': '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 } ] } ], "default" : 0 }]}'
                    }
                }
            },
            'PartitionKeys': [
                {
                    'Name': 'dt',
                    'Type': 'string'
                }
            ]
        }
    )

    print(response)
  1. Run the script using the command:
python3 CreateTableAPI.py

The schema literal serves as a form of metadata, providing a clear description of your data structure. In Amazon Athena, Avro table schema Serializer/Deserializer (SerDe) properties are essential for schema is compatible with the data stored in files, facilitating accurate translation for query engines. These properties enable the precise interpretation of Avro-formatted data, allowing query engines to correctly read and process the information during execution.

The Avro schema literal provides a detailed description of the data structure at the partition level. It defines the fields, their data types, and any nested structures within the Avro data. Amazon Athena uses this schema to correctly interpret the Avro data stored in Amazon S3. It makes sure that each field in the Avro file is mapped to the correct column in the Athena table.

The schema information helps Athena optimize query run by understanding the data structure in advance. It can make informed decisions about how to process and retrieve data efficiently. When the Avro schema changes (for example, when new fields are added), updating the schema literal allows Athena to recognize and work with the new structure. This is crucial for maintaining query compatibility as your data evolves over time. The schema literal provides explicit type information, which is essential for Avro’s type system. This provides accurate data type conversion between Avro and Athena SQL types.

For complex Avro schemas with nested structures, the schema literal informs Athena how to navigate and query these nested elements. The Avro schema can specify default values for fields, which Athena can use when querying data where certain fields might be missing. Athena can use the schema to perform compatibility checks between the table definition and the actual data, helping to identify potential issues. In the SerDe properties, the schema literal tells the Avro SerDe how to deserialize the data when reading it from Amazon S3.

It’s crucial for the SerDe to correctly interpret the binary Avro format into a form Athena can query. The detailed schema information aids in query planning, allowing Athena to make informed decisions about how to execute queries efficiently. The Avro schema literal specified in the table’s SerDe properties provides Athena with the exact field mappings, data types, and physical structure of the Avro file. This enables Athena to perform column pruning by calculating precise byte offsets for required fields, reading only those specific portions of the Avro file from S3 rather than retrieving the entire record.

Parameters' : {
                'avro.schema.literal': '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 }] } ], "default" : 0 }]}'
            },
  1. After creating the table, verify its structure using the SHOW CREATE TABLE command in Athena:
CREATE EXTERNAL TABLE `blogpost_table_test`(
  `customerid` bigint COMMENT 'from deserializer', 
  `sentiment` struct<customerrating:bigint> COMMENT 'from deserializer')
PARTITIONED BY ( 
  `dt` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
WITH SERDEPROPERTIES ( 
  'avro.schema.literal'='{\"type\" : \"record\", \"name\" : \"customerdata\", \"namespace\" : \"com.data.test.avro\", \"fields\" : [{ \"name\" : \"customerID\", \"type\" : \"long\", \"default\" : -1 },{ \"name\" : \"sentiment\", \"type\" : [ \"null\", { \"type\" : \"record\", \"name\" : \"sentiment\", \"doc\" : \"***** CoreETL ******\", \"fields\" : [ { \"name\" : \"customerrating\", \"type\" : \"long\", \"default\" : 0 } ] } ], \"default\" : 0 }]}') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/'
TBLPROPERTIES (
  'avro.schema.literal'='{\"type\" : \"record\", \"name\" : \"customerdata\", \"namespace\" : \"com.data.test.avro\", \"fields\" : [{ \"name\" : \"customerID\", \"type\" : \"long\", \"default\" : -1 },{ \"name\" : \"sentiment\", \"type\" : [ \"null\", { \"type\" : \"record\", \"name\" : \"sentiment\", \"doc\" : \"***** CoreETL ******\", \"fields\" : [ { \"name\" : \"customerrating\", \"type\" : \"long\", \"default\" : 0 } ] } ], \"default\" : 0 }]}')

Note that the table is created with the initial schema as described below:

[
  {
    "Name": "customerid",
    "Type": "bigint",
    "Comment": "from deserializer"
  },
  {
    "Name": "sentiment",
    "Type": "struct<confirmedImpressions:bigint>",
    "Comment": "from deserializer"
  },
  {
    "Name": "dt",
    "Type": "string",
    "PartitionKey": "Partition (0)"
  }
]

With the table structure in place, you can load the first set of IoT sensor data and establish the initial partition. This step is crucial for setting up the data pipeline that will handle incoming sensor data.

  1. Download the example sensor data from the following S3 bucket
s3://aws-blogs-artifacts-public/artifacts/BDB-4745

Download initial schema from the first partition

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4745/dt=2024-03-21/initial_schema_sample1.avro 

Download second schema from the second partition

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4745/dt=2024-03-22/second_schema_sample2.avro

Download third schema from the third partition

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4745/dt=2024-03-23/third_scehama_sample3avro
  1. Upload the Avro-formatted sensor data to your partitioned S3 location. This represents your first day of sensor readings, organized in the date-based partition structure. Replace the bucket name amzn-s3-demo-bucket with your S3 bucket name and add a partitioned folder for the dt field.
s3://amzn-s3-demo-bucket/dt=2024-03-21/
  1. Register this partition in the AWS Glue Data Catalog to make it discoverable. This tells AWS Glue where to find your sensor data for this specific date:
ALTER TABLE  iot_sensor_data ADD PARTITION (dt='2024-03-21');
  1. Validate your sensor data ingestion by querying the newly loaded partition. This query helps verify that your sensor readings are correctly loaded and accessible:
SELECT * FROM "blogpostdatabase "."iot_sensor_data" WHERE dt='2024-03-21';

The following screenshot shows the query results.

This initial data load establishes the foundation for the IoT data pipeline, which means you can begin tracking sensor measurements while preparing for future schema evolution as sensor capabilities expand or change.

Now, we demonstrate how the IoT data pipeline handles evolving sensor capabilities by introducing a schema change in the second data batch. As sensors receive firmware updates or new monitoring features, their data structure needs to adapt accordingly. To show this evolution, we add data from sensors that now include visibility measurements:

  1. Examine the evolved schema structure that accommodates the new sensor capability:
{
  "fields": [
    {
      "Name": "customerid",
      "Type": "bigint",
      "Comment": "from deserializer"
    },
    {
      "Name": "sentiment",
      "Type": "struct<confirmedImpressions:bigint,visibility:bigint>",
      "Comment": "from deserializer"
    },
    {
      "Name": "dt",
      "Type": "string",
      "PartitionKey": "Partition (0)"
    }
  ]
}

Note the addition of the visibility field within the sentiment structure, representing the sensor’s enhanced monitoring capability.

  1. Upload this enhanced sensor data to a new date partition:
s3://amzn-s3-demo-bucket/dt=2024-03-22/
  1. Verify data consistency across both the original and enhanced sensor readings:
SELECT * FROM "blogpostdatabase"."blogpost_table_test" LIMIT 10;

This demonstrates how the pipeline can handle sensor upgrades while maintaining compatibility with historical data. In the next section, we explore how to update the table definition to properly manage this schema evolution, providing seamless querying across all sensor data regardless of when the sensors were upgraded. This approach is particularly valuable in IoT environments where sensor capabilities frequently evolve, which means you can maintain historical data while accommodating new monitoring features.

Update the AWS Glue table

To accommodate evolving sensor capabilities, you need to update the AWS Glue table schema. Although traditional methods such as MSCK REPAIR TABLE or ALTER TABLE ADD PARTITION work for small datasets for updating partition information, you can use an alternate method to handle tables with more than 100K partitions efficiently.

We use the Athena partition projection, which eliminates the need to process extensive partition metadata, which can be time-consuming for large datasets. Instead, it dynamically infers partition existence and location, allowing for more efficient data management. This method also speeds up query planning by quickly identifying relevant partitions, leading to faster query execution. Additionally, it reduces the number of API calls to the metadata store, potentially lowering costs associated with these operations. Perhaps most importantly, this solution maintains performance as the number of partitions grows, prodicing scalability for evolving datasets. These benefits combine to create a more efficient and cost-effective way of handling schema evolution in large-scale data environments.

To update your table schema to handle the new sensor data, follow these steps:

  1. Copy the following code into the UpdateTableAPI.py file:
import boto3

client = boto3.client('glue')

db = 'blogpostdatabase'
tb = 'blogpost_table_test'

response = client.get_table(
    DatabaseName=db,
    Name=tb
)

print(response)


table_input = {
    'Description': response['Table'].get('Description', ''),
    'Name': response['Table'].get('Name', ''),
    'Owner': response['Table'].get('Owner', ''),
    'Parameters': response['Table'].get('Parameters', {}),
    'PartitionKeys': response['Table'].get('PartitionKeys', []),
    'Retention': response['Table'].get('Retention'),
    'StorageDescriptor': response['Table'].get('StorageDescriptor', {}),
    'TableType': response['Table'].get('TableType', ''),
    'ViewExpandedText': response['Table'].get('ViewExpandedText', ''),
    'ViewOriginalText': response['Table'].get('ViewOriginalText', '')

}

for col in table_input['StorageDescriptor']['Columns']:
    if col['Name'] == 'sentiment':
        col['Type'] = 'struct<confirmedImpressions:bigint,visibility:bigint>'


table_input['StorageDescriptor']['SerdeInfo']['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0}] } ], "default" : 0 }]}'
table_input['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0} ] } ], "default" : 0 }]}'
table_input['Parameters']['projection.dt.type'] = 'date'
table_input['Parameters']['projection.dt.format'] = 'yyyy-MM-dd'
table_input['Parameters']['projection.enabled'] = 'true'
table_input['Parameters']['projection.dt.range'] = '2024-03-21,NOW'

response = client.update_table(
    DatabaseName=db,
    TableInput=table_input
)

This Python script demonstrates how to update an AWS Glue table to accommodate schema evolution and enable partition projection:

  1. It uses Boto3 to interact with AWS Glue API.
  2. Retrieves the current table definition from the AWS Glue Data Catalog.
  3. Updates the 'sentiment' column structure to include new fields.
  4. Modifies the Avro schema literal to reflect the updated structure.
  5. Adds partition projection parameters for the partition column dt
    table_input['Parameters']['projection.dt.type'] = 'date'
    table_input['Parameters']['projection.dt.format'] = 'yyyy-MM-dd'
    table_input['Parameters']['projection.enabled'] = 'true'
    table_input['Parameters']['projection.dt.range'] = '2024-03-21,NOW'

    1. Sets projection type to 'date'
    2. Defines date format as 'yyyy-MM-dd'
    3. Enables partition projection
    4. Sets date range from '2024-03-21' to 'NOW'
projection.date.type='date' --> Data type of the partition column
projection.date.format='yyyy-MM-dd' -> Data format of the partition column
projection.enabled='true' -> Enable the partition projection
projection.date.range='2024-04-26,NOW'. -> The range of the partition column
  1. Run the script using the following command:
python3 UpdateTableAPI.py

The script applies all changes back to the AWS Glue table using the UpdateTable API call. The following screenshot shows the table property with the new Avro schema literal and the partition projection.

After the table property is updated, you don’t need to add the partitions manually using the MSCK REPAIR TABLE or ALTER TABLE command. You can validate the result by running the query in the Athena console.

SELECT * FROM "blogpostdatabase"." blogpost_table_test " limit 10;

The following screenshot shows the query results.

This schema evolution strategy efficiently handles new data fields across different time periods. Consider the 'visibility' field introduced on 2024-03-22. For data from 2024-03-21, where this field doesn’t exist, the solution automatically returns a default value of 0. This approach makes the query consistent across all partitions, regardless of their schema version.

Here’s the Avro schema configuration that enables this flexibility:

{
  "type": "record",
  "name": "customerdata",
  "fields": [
    {"name": "customerID", "type": "long", "default": -1},
    {"name": "sentiment", "type": ["null", {
      "type": "record",
      "name": "sentiment",
      "fields": [
        {"name": "customerrating", "type": "long", "default": 0},
        {"name": "visibility", "type": "long", "default": 0}
      ]
    }], "default": null}
  ]
}

Using this configuration, you can run queries across all partitions without modifications, maintain backward compatibility without data migration, and support gradual schema evolution without breaking existing queries.

Building on the schema evolution example, we now introduce a third enhancement to the sensor data structure. This new iteration adds a text-based classification capability through a 'category' field (string type) to the sentiment structure. This represents a real-world scenario where sensors receive updates that add new classification capabilities, requiring the data pipeline to handle both numeric measurements and textual categorizations.

The following is the enhanced schema structure:

{
  "fields": [
    {
      "Name": "customerid",
      "Type": "bigint"
    },
    {
      "Name": "sentiment",
      "Type": "struct<confirmedImpressions:bigint,visibility:bigint,category:string>"
    },
    {
      "Name": "dt",
      "Type": "string"
    }
  ]
}

This evolution demonstrates how the solution flexibly accommodates different data types as sensor capabilities expand while maintaining compatibility with historical data.

To implement this latest schema evolution for the new partition (dt=2024-03-23), we update the table definition to include the ‘category’ field. Here’s the modified UpdateTableAPI.py script that handles this change:

  1. Update the file UpdateTableAPI.py:
import boto3

client = boto3.client('glue')

db = 'blogpostdatabase'
tb = 'blogpost_table_test'

response = client.get_table(
DatabaseName=db,
Name=tb
)

print(response)


table_input = {
'Description': response['Table'].get('Description', ''),
'Name': response['Table'].get('Name', ''),
'Owner': response['Table'].get('Owner', ''),
'Parameters': response['Table'].get('Parameters', {}),
'PartitionKeys': response['Table'].get('PartitionKeys', []),
'Retention': response['Table'].get('Retention'),
'StorageDescriptor': response['Table'].get('StorageDescriptor', {}),
'TableType': response['Table'].get('TableType', ''),
'ViewExpandedText': response['Table'].get('ViewExpandedText', ''),
'ViewOriginalText': response['Table'].get('ViewOriginalText', '')

}

for col in table_input['StorageDescriptor']['Columns']:
if col['Name'] == 'sentiment':
col['Type'] = 'struct<confirmedImpressions:bigint,visibility:bigint,category:string>'


table_input['StorageDescriptor']['SerdeInfo']['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0},{"name":"category","type":"string","default":"null"} ] } ], "default" : 0 }]}'
table_input['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0},{"name":"category","type":"string","default":"null"} ] } ], "default" : 0 }]}'
table_input['Parameters']['projection.dt.type'] = 'date'
table_input['Parameters']['projection.dt.format'] = 'yyyy-MM-dd'
table_input['Parameters']['projection.enabled'] = 'true'
table_input['Parameters']['projection.dt.range'] = '2024-03-21,NOW'

response = client.update_table(
DatabaseName=db,
TableInput=table_input
)
  1. Verify the changes by running the following query:
SELECT * FROM "blogpostdatabase"."blogpost_table_test" LIMIT 10;

The following screenshot shows the query results.

There are three key changes in this update:

  1. Added 'category' field (string type) to the sentiment structure
  2. Set default value "null" for the category field
  3. Maintained existing partition projection settings

To support that latest sensor data enhancement, we updated the table definition to include a new text-based 'category' field in the sentiment structure. The modified UpdateTableAPI script adds this capability while maintaining the established schema evolution patterns. It achieves this by updating both the AWS Glue table schema and the Avro schema literal, setting a default value of "null" for the category field.

This provides backward compatibility. Older data (before 2024-03-23) shows "null" for the category field, and new data includes actual category values. The script maintains the partition projection settings, enabling efficient querying across all time periods.

You can verify this update by querying the table in Athena, which will now show the complete data structure, including numeric measurements (customerrating, visibility) and text categorization (category) across all partitions. This enhancement demonstrates how the solution can seamlessly incorporate different data types while preserving historical data integrity and query performance.

Cleanup

To avoid incurring future costs, delete your Amazon S3 data if you no longer need it.

Conclusion

By combining Avro’s schema evolution capabilities with the power of AWS Glue APIs, we’ve created a robust framework for managing diverse, evolving datasets. This approach not only simplifies data integration but also enhances the agility and effectiveness of your analytics pipeline, paving the way for more sophisticated predictive and prescriptive analytics.

This solution offers several key advantages. It’s flexible, adapting to changing data structures without disrupting existing analytics processes. It’s scalable, able to handle growing volumes of data and evolving schemas efficiently. You can automate it and reduce the manual overhead in schema management and updates. Finally, because it minimizes data movement and transformation costs, it’s cost-effective.

Related references


About the authors

Mohammad Sabeel Mohammad Sabeel is a Senior Cloud Support Engineer at Amazon Web Services (AWS) with over 14 years of experience in Information Technology (IT). As a member of the Technical Field Community (TFC) Analytics team, he is a Subject matter expert in Analytics services AWS Glue, Amazon Managed Workflows for Apache Airflow (MWAA), and Amazon Athena services. Sabeel provides expert guidance and technical support to enterprise and strategic customers, helping them optimize their data analytics solutions and overcome complex challenges. With deep subject matter expertise he enables organizations to build scalable, efficient, and cost-effective data processing pipelines.

Indira Balakrishnan Indira Balakrishnan is a Principal Solutions Architect in the Amazon Web Services (AWS) Analytics Specialist Solutions Architect (SA) Team. She helps customers build cloud-based Data and AI/ML solutions to address business challenges. With over 25 years of experience in Information Technology (IT), Indira actively contributes to the AWS Analytics Technical Field community, supporting customers across various Domains and Industries. Indira participates in Women in Engineering and Women at Amazon tech groups to encourage girls to pursue STEM path to enter careers in IT. She also volunteers in early career mentoring circles.

Secure generative SQL with Amazon Q

Post Syndicated from Gregory Knowles original https://aws.amazon.com/blogs/big-data/secure-generative-sql-with-amazon-q/

Amazon Q generative SQL brings generative AI capabilities to help speed up deriving insights from your Amazon Redshift data warehouses and AWS Glue Data Catalog, generating SQL for Amazon Redshift or Amazon Athena. With Amazon Q, you get SQL commands generated with your context. This means you can focus on deriving insights faster, rather than having to first learn potentially complex schemas. Without generative SQL, your data analysts might have to frequently switch between different types of SQL, which can further slow analysis down. Amazon Q generative SQL can help by generating SQL statements from natural language and speeding up development. This can help onboard analysts faster and improve analyst productivity. The generative SQL experience is available through Amazon SageMaker Unified Studio and Amazon Redshift Query Editor v2.

To scale the use of generative SQL in production scenarios, you need to consider how relevant and accurate SQL is generated. In doing so, it’s important to understand what data is used and how your information is protected. Amazon Q generative SQL is designed to keep your data secure and private. Your queries, data, and database schemas are not used to train generative AI foundation models (FMs). For more information, see Considerations when interacting with Amazon Q generative SQL.

In the post Write queries faster with Amazon Q generative SQL for Amazon Redshift, we provided general advice around getting started with generative SQL. In this post, we discuss the design and security controls in place when using generative SQL and its use in both SageMaker Unified Studio and Amazon Redshift Query Editor v2.

Solution overview

Generating relevant SQL requires context from your data warehouse or data catalog schemas. Your analysts can ask free text or natural language questions in the Amazon Q chat window and have SQL statements returned that reference your tables and columns. It’s important that the generated SQL is consistent with your schema so that it can find the most relevant fields to answer questions and generate queries that accurately reference data. In SageMaker Unified Studio or Amazon Redshift Query Editor v2, when the Amazon Q chat window is open, database metadata that is viewable under the connection context is made available to Amazon Q for SQL generation. This means that only the schema information that the connecting user can access is used. Tables or database objects the user doesn’t have access to are excluded.

When a user submits questions in the Amazon Q chat window, a search algorithm is used to find the most relevant context from the available database schema metadata information. This context is combined with the user’s question and used as a prompt to a large language model (LLM) to generate a SQL statement. The supporting information is cached so that your data source doesn’t need to be queried every time a user initiates SQL generation. Instead, data source metadata will be periodically refreshed if it remains in use, or you can trigger a manual refresh. If the data is not being used, Amazon Q will automatically delete it. Where applicable, the information used to support SQL generation is encrypted with an AWS Key Management Service (AWS KMS) customer managed KMS key where one has been specified in the SageMaker Unified Studio or Amazon Redshift Query Editor v2 settings. Otherwise, an AWS managed key is used. Your information is encrypted in transit and at rest.

The following diagram shows the process flow for SQL generation when using SageMaker Unified Studio or the Amazon Redshift Query Editor and using Amazon Redshift or Data Catalog source data.

Process diagram for SQL generation

The Amazon Q generative SQL process can be summarized as the following steps:

  1. A user interacts with the Amazon Q chat pane through SageMaker Unified Studio or the Amazon Redshift Query Editor.
  2. The SQL chat frontend sends the prompt along with the connection configuration to Amazon Q.
  3. Amazon Q uses the connection context to retrieve information that will support SQL generation if this data is not already available.
  4. Amazon Q encrypts the retrieved information under the appropriate AWS managed or customer managed KMS key. The information is subsequently decrypted on retrieval.
  5. The information is stored along with custom context information, if this has been provided.
  6. Relevant context from the combined information is selected and added to the user’s questions and sent to an LLM to generate a SQL statement, which is returned to the user.
  7. The user can decide whether to run the statement and can provide feedback on usefulness and accuracy.

Additional context to enhance SQL generation

You can provide further context to supplement the database schema information, which can help improve the accuracy and relevancy of the generated SQL.

One option is to provide custom context. Custom context gives the option to specify instructions and extra information, such as descriptions of tables and columns. These descriptions can then be used to help the selection of relevant tables and attributes when generating SQL statements. This is particularly relevant when your schema uses more obscure naming that might not directly relate to business terms or uses non-standard abbreviations. For example, consider a table called sls_r1_2024. With custom context, you can add a table description specifying that, for example, the table includes sales information across stores in the US region for the calendar year 2024. This information can help the LLM generate SQL referencing the correct tables. The same approach can be applied to columns within the table. Your custom context is encrypted using a customer managed KMS key if one has been specified (during Amazon Redshift Query Editor account creation or SageMaker Unified Studio project creation) or an AWS managed key otherwise.

You can also introduce constraints using custom context. For example, you can explicitly include or exclude specific schemas, tables, or columns from SQL query generation. Similarly, specific topics can also be disallowed, such as not generating SQL statements to support financial reporting. For more details about the information that can be supplied, refer to Custom context.

Another option is to grant SQL query history access to the user establishing the connection. This information is then also made available to enhance SQL generation and to provide the LLM with examples of relevant queries. Be aware that granting wider SQL query history access to the connecting user, and therefore also the generative SQL workflow, allows viewing of queries over tables or objects the user might not have access to. Furthermore, string literals might be present in historic statements that might contain sensitive information. To help mitigate this risk, you could instead use the CuratedQueries section of custom context to provide predefined question and answer examples, without exposing all user queries.

Generated statement response

Before a SQL statement is returned to the user, Amazon Q tries to detect syntax issues. This step helps improve the likelihood that only valid SQL syntax is returned. Amazon Q will use the available information for the user to return statements that align with user permissions, to reduce scenarios where users can’t run generated statements. For example, if you have given access to SQL query history information, then the SQL generation step might produce a query statement referencing a table that the user asking the question doesn’t have access to. Amazon Q minimizes the occurrence of this scenario by assessing if the generated SQL aligns with user permissions and updating the statement if not. User permissions are not bypassed through the use of Amazon Q generative SQL. If a statement was returned referencing a table the user doesn’t have access to, the authorization applied to the user will enforce access control when the statement is executed.

Statements generated by Amazon Q that could potentially change your database, such as DML or DDL statements, are returned with a warning. The warning highlights to the user that running the statement could potentially modify the database. Again, these statements are only executable if the user has the required permissions.

Prerequisites

Amazon Q generative SQL works with your Redshift data warehouses and Data Catalog tables. To get started, you should have data available in either or both of these environments. To use Amazon Q generative SQL with your AWS Glue tables, you need a SageMaker Unified Studio domain. Within your domain, you can use the Amazon Q chat integration to ask questions of your data and have SQL generated. This also works for Amazon Redshift data sources available in the domain. You can use Amazon Q generative SQL without a SageMaker Unified Studio domain using the Amazon Redshift Query Editor. Access to the editor enables Amazon Q chat integration against your Amazon Redshift data sources.

Enable Amazon Q generative SQL

You can control access to generative SQL at the account-Region level in the Amazon Redshift Query Editor or at the SageMaker Unified Studio domain level. To enable this feature, an account admin must explicitly turn on Amazon Q generative SQL. By default, the feature is not accessible to your users. Administrators that have permission for the sqlworkbench:UpdateAccountQSqlSettings AWS Identity and Access Management (IAM) action can turn the Amazon Q generation SQL feature on or off through the admin window, as illustrated in the following sections. When turned off, this will restrict users from opening the Amazon Q chat pane and help prevent interaction with generative SQL.

Enable Amazon Q in your SageMaker domain

To enable Amazon Q in your SageMaker domain, you can navigate to the Amazon Q tab on the domain settings page and choose to enable the service. For more information, see Amazon Q in Amazon SageMaker Unified Studio.

Enable Amazon Q in SageMaker Unified Studio domain

Enable Amazon Q in Amazon Redshift

To enable Amazon Q generative SQL from the Amazon Redshift Query Editor, access the Amazon Q generative SQL settings. This requires the administrator to have the sqlworkbench:UpdateAccountQSqlSettings permission in their IAM policy. For more information, see Updating generative SQL settings as an administrator.

Enabling Amazon Q generative SQL from Redshift query editor

With generative SQL enabled at the account-Region level, you can restrict access to specific users with IAM controls. IAM administrators can build IAM policies that allow or deny access to the action sqlworkbench:GetQSqlRecommendations. For more information, refer to Actions, resources, and condition keys for AWS SQL Workbench. Policies can then be associated with IAM users or roles to control access to SQL generation at a more granular level. An appropriately scoped service control policy (SCP) can be used to limit access to SQL generation to specific accounts within your organization if required.

The following is an example policy denying access to use SQL generation:

{
"Version": "2012-10-17",
    "Statement": [
        {
"Sid": "DenyAccessToAmazonQGenerativeSql",
            "Effect": "Deny",
            "Action": [
                "sqlworkbench:GetQSqlRecommendations"
            ],
            "Resource": "*",
        }
    ]
}

Cross-Region inference

Amazon Q Developer uses cross-Region inference to distribute traffic across different AWS Regions, which provides increased throughput and resilience during high demand periods, improved performance, and access to the latest Amazon Q Developer capabilities.

When a request is made from an Amazon Q Developer profile, it is kept within the Regions in the same geography as the original data. Although this doesn’t change where the data is stored, the requests and output results might move across Regions during the inference process. Data is encrypted when transmitted across Amazon’s network. For more information on cross-Region inference, see Cross-region processing in Amazon Q Developer.

Monitoring

To monitor which IAM users or roles are interacting with generative SQL, you can use AWS CloudTrail. CloudTrail monitors API calls and logs which identities have performed particular actions. When a user first asks a question, a CloudTrail event is emitted called IngestQSqlMetadata. This is a result of Amazon Q starting the metadata ingest process. Ingestion is an asynchronous operation, so there might be a series of GetQSqlMetadataStatus events. This is due to the workflow checking the ingestion process status.

After the workflow has completed successfully, each question sees a GetQSqlRecommendation event. This is the result of users submitting questions and triggering generation of SQL statements. The following is an example CloudTrail event for GetQSqlRecommendation. In this example, Amazon Q emits detailed CloudTrail events highlighting the warehouse being queried, IAM principal calling Amazon Q, and the entire response structure from Amazon Q in responseElements:

{
    "eventVersion": "1.09",
    "userIdentity": {
        "type": "AssumedRole",
        "principalId": "AROA123456789EXAMPLE:demouser",
        "arn": "arn:aws:sts::111122223333:assumed-role/DemoUser",
        "accountId": "111122223333",
        "accessKeyId": "ASIAIOSFODNN7EXAMPLE",
        "sessionContext": {
            "sessionIssuer": {
                "type": "Role",
                "principalId": "AROA123456789EXAMPLE",
                "arn": "arn:aws:iam::111122223333:role/DemoUser",
                "accountId": "111122223333",
                "userName": "DemoUser"
            },
            "attributes": {
                "creationDate": "2025-01-17T05:31:01Z",
                "mfaAuthenticated": "false"
            }
        }
    },
    "eventTime": "2025-01-17T05:34:51Z",
    "eventSource": "sqlworkbench.amazonaws.com",
    "eventName": "GetQSqlRecommendation",
    "awsRegion": "us-east-1",
    "sourceIPAddress": "122.171.17.139",
    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:133.0) Gecko/20100101 Firefox/133.0",
    "requestParameters": {
        "dbConfig": {
            "database": "sample_data_dev"
        },
        "databaseConfiguration": {
            "redshiftConfig": {
                "clusterIdentifier": "redshift-cluster-1",
                "database": "sample_data_dev"
            }
        },
        "prompt": "HIDDEN_DUE_TO_SECURITY_REASONS",
        "clientToken": "HIDDEN_DUE_TO_SECURITY_REASONS",
        "logConfig": {},
        "sqlworkbenchConnectionArn": "arn:aws:sqlworkbench:us-east-1:111122223333:connection/47ahg61-ce0b-4646-831b-a140ea4055ae"
    },
    "responseElements": {
        "data": {
            "extractionErrors": false,
            "guardRails": {
                "isDml": false
            },
            "sqlStatement": "HIDDEN_DUE_TO_SECURITY_REASONS",
            "syntaxErrors": "HIDDEN_DUE_TO_SECURITY_REASONS"
        },
        "logSessionId": "623318ad-dbcc-4f69-ae08-f85d1b63a70f",
        "questionId": "623318ad-dbcc-4f69-ae08-f85d1b63a70f",
        "originalQuestionId": "623318ad-dbcc-4f69-ae08-ae08asd1a"
    },
    "requestID": "623318ad-dbcc-4f69-ae08-f85d1b63a70f",
    "eventID": "ac2c1932-49b1-41b3-a1af-20fa4461cf7d",
    "readOnly": false,
    "eventType": "AwsApiCall",
    "managementEvent": true,
    "recipientAccountId": "111122223333",
    "eventCategory": "Management",
    "tlsDetails": {
        "tlsVersion": "TLSv1.3",
        "cipherSuite": "TLS_AES_128_GCM_SHA256",
        "clientProvidedHostHeader": "qsql.sqlworkbench.us-east-1.amazonaws.com"
    },
    "sessionCredentialFromConsole": "true"
}

Conclusion

In this post, we discussed the Amazon Q generative SQL workflow. We highlighted the process around using your schema context alongside metadata such as historic SQL queries and custom context. Using this metadata allows the generation of relevant SQL that helps accelerate your analyst’s productivity. Although it’s important to assist analysts, it’s also imperative to make sure data remains secure and protected. To support this, generative SQL uses only the data the connected user has access to. This helps prevent exposure to information beyond their authorization.When you’re looking to increase the relevance of generated SQL through sharing additional query history, it’s important to consider the trade-off of exposing additional information to the user. Deciding your approach here should take into account the domain context of the data and the possible exposure of metadata the user doesn’t have access to, or potentially sensitive information that might appear in query strings. Keeping these considerations in mind can help you achieve the appropriate security posture for your workloads.

To get started with Amazon Q generative SQL, see Write queries faster with Amazon Q generative SQL for Amazon Redshift and Interacting with Amazon Q generative SQL.


About the authors

Gregory Knowles is a data and AI specialist solution architect at AWS, focusing on the UK public sector. With extensive experience in cloud-based architectures, Greg guides public sector customers in implementing modern data solutions. His expertise spans governance, analytics, and AI/ML. Greg’s passion lies in accelerating transformation and innovation to improve productivity and outcomes. He has successfully led projects that moved data systems into the cloud, adopted new data architectures, and implemented AI at scale in production.

Abhinav Tripathy is a Software Engineer and Security Guardian at AWS, where he develops Amazon Q generative SQL by combining machine learning, databases, and web systems. Abhinav is passionate about building scalable web systems from scratch that solve real customer challenges. Outside of work, he enjoys traveling, watching soccer, and playing badminton.

Erol Murtezaoglu is a Technical Product Manager at AWS, is an inquisitive and enthusiastic thinker with a drive for self-improvement and learning. He has a strong and proven technical background in software development and architecture, balanced with a drive to deliver commercially successful products. Erol highly values the process of understanding customer needs and problems, in order to deliver solutions that exceed expectations.

Optimize industrial IoT analytics with Amazon Data Firehose and Amazon S3 Tables with Apache Iceberg

Post Syndicated from Ashok Padmanabhan original https://aws.amazon.com/blogs/big-data/optimize-industrial-iot-analytics-with-amazon-data-firehose-and-amazon-s3-tables-with-apache-iceberg/

Manufacturing organizations are racing to digitize their operations through Industry 4.0 initiatives. A key challenge they face is capturing, processing, and analyzing real-time data from industrial equipment to enable data-driven decision making.Modern manufacturing facilities generate massive amounts of real-time data from their production lines. Capturing this valuable data requires a two-tier architecture: first, an edge device that understands industrial protocols collects data directly from the shop floor sensors. Then, these edge gateways securely buffer and transmit the data to AWS Cloud, providing reliability during network interruptions.

In this post, we show how to use AWS service integrations to minimize custom code while providing a robust platform for industrial data ingestion, processing, and analytics. By using Amazon S3 Tables and its built-in optimizations, you can maximize query performance and minimize costs without additional infrastructure setup. Additionally, AWS IoT Greengrass supports VPC endpoints, and you can securely communicate between the edge gateway (hosted on premises) and AWS.

Solution overview

Let’s consider a manufacturing line with and equipment sensors capturing flow rate, temperature, and pressure. To perform analysis on this data, you ingest real-time streaming data from these sensors into the AWS environment using an edge gateway. After data lands in AWS, you can use various analytics services to gain insights.

To demonstrate the data flow from the edge to the cloud, we have assets, machines, and tools publish data using MQTT. Optionally, we use a simulated edge device that publishes data to a local MQTT endpoint. We use an edge gateway with an AWS IoT Greengrass V2 edge runtime to stream data through Amazon Data Firehose in the cloud to S3 Tables.

The following diagram illustrates the solution architecture.

High Level Arch

Fig 1 : High Level Architecture

The workflow consists of the following steps:

  1. Collect data from Internet of Things (IoT) sensors and stream real-time data from edge devices to the AWS Cloud using AWS IoT Greengrass.
  2. Ingest, transform, and land data in near real time using Data Firehose, with the Firehose component on AWS IoT Greengrass, and S3 Tables integration.
  3. Store and organize the tabular data using S3 Tables, which provides purpose-built storage for Apache Iceberg format with a simple, performant, and cost-effective querying solution.
  4. Query and analyze the tabular data using Amazon Athena.

The edge data flow consists of the following key components:

  • IoT device to local MQTT broker – A simulated device used to generate data for the purposes of this post. In a typical production implementation, this would be your equipment or gateway that supports MQTT. IoT devices can publish messages to a local MQTT broker (Moquette) running on AWS IoT Greengrass.
  • MQTT bridge – The MQTT bridge component relays messages between:
    • MQTT broker (where client devices communicate)
    • Local AWS IoT Greengrass publish/subscribe (IPC)
  • Local PubSub (custom) component – This component completes the following tasks:
    • Subscribes to the local IPC messages.
    • Forwards messages to the kinesisfirehose/message topic.
    • Uses the IPC interface to subscribe to messages.
  • Firehose component – The Firehose component subscribes to the kinesisfirehose/message topic. The component then streams the data to Data Firehose in the cloud. It uses QoS 1 for reliable message delivery.

You can scale this solution to multiple edge locations, so you have a seamless view of data across multiple locations of the manufacturing site, as a low-code solution.In the following sections, we walk through the steps to configure the cloud data ingestion flow:

  1. Create an S3 Tables bucket and enable integration with AWS analytics services.
  2. Create a namespace in the table bucket using the AWS Command Line Interface (AWS CLI).
  3. Create a table in the table bucket with the defined schema using the AWS CLI.
  4. Create an AWS Identity and Access Management (IAM) role for Data Firehose with necessary permissions.
  5. Configure AWS Lake Formation permissions:
    • Grant Super permissions on specific tables for the Data Firehose role.
  6. Set up a Data Firehose stream:
    • Choose Direct PUT as the source and Iceberg tables as the destination.
    • Configure the destination settings with database and table names.
    • Specify an Amazon Simple Storage Service (Amazon S3) bucket for error output.
    • Associate the IAM role created earlier.
  7. Verify and query data using Athena:
    • Grant Lake Formation permissions for Athena access.
    • Query the table to verify data ingestion.

Prerequisites

You must have the following prerequisites:

  • An AWS account
  • The required IAM privileges to launch AWS IoT Greengrass on an edge gateway (or another supported device)
  • An Amazon Elastic Compute Cloud (Amazon EC2) instance with a supported operating system to perform a proof of concept

Install AWS IoT Greengrass on the edge gateway

For instructions to install AWS IoT Greengrass, refer to Install the AWS IoT Greengrass Core software. After you complete the installation, you will have a core device provisioned, as shown in the following screenshot. The status of the device says Healthy, which means that your account is able to communicate with the device successfully.

For a proof of concept, you can use an Ubuntu-based EC2 instance as your edge gateway.

Greengrass Core Device

Fig 2: Greengrass Core Device

Provision a Data Firehose stream

For detailed steps on setting up Data Firehose to deliver data to Iceberg tables, refer to Deliver data to Apache Iceberg Tables with Amazon Data Firehose. For S3 Tables integration, refer to Build a data lake for streaming data with Amazon S3 Tables and Amazon Data Firehose.

Because you’re using AWS IoT Greengrass to stream data, you can skip the Kinesis Data Generator steps mentioned in these tutorials. The data will instead flow from your edge devices through the Greengrass components to Data Firehose.After you complete these steps, you will have a Firehose stream and S3 Tables bucket, as shown in the following screenshot. Note the Amazon Resource Name (ARN) of the Firehose stream to use in subsequent steps.

Amazon Data Firehose Stream

Fig 3: Amazon Data Firehose Stream

Deploy the Greengrass components

Complete the following steps to configure and deploy the Greengrass components. For more details, refer to Create deployments.

  1. Use the following configuration to enable message routing from local MQTT to the AWS IoT Greengrass PubSub component. Note the topic in the code. This is the MQTT topic where the devices will send the data to.
{
  "reset": [""],
  "merge": {
    "mqttTopicMapping": {
      "HelloWorldIotCoreMapping": {
        "topic": "clients/#",
        "source": "LocalMqtt",
        "target": "Pubsub"
      }
    }
  }
}
  1. Use the following configuration to deploy the Firehose component. Use the Firehose stream ARN that you noted earlier.
{
"reset": [""],
"merge": {
   "lambdaExecutionParameters": {
     "EnvironmentVariables": {
       "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:us-east-1:<<account-id>>:deliverystream/<<stream name>>"
         }
     },
   "containerMode": "NoContainer"
      }
}
  1. Use the following configuration to deploy the legacy subscription router component (Note that this is a dependent component to the Firehose component):
{
"reset": [""],
"merge": {
   "subscriptions": {
      "aws-greengrass-kinesisfirehose": {
          "id": "aws-greengrass-kinesisfirehose",
          "source": "component:aws.greengrass.KinesisFirehose",
          "subject": "kinesisfirehose/message/status",
         "target": "cloud"
                  }
           }
         }
}
  1. Create and deploy a custom PubSub component. You can use the following sample code snippet in your preferred language to deploy as a Greengrass component. You can use gdk to create custom components.
{
"reset": [""],
"merge": {
   "subscriptions": {
      "aws-greengrass-kinesisfirehose": {
         "id": "aws-greengrass-kinesisfirehose",
        "source": "component:aws.greengrass.KinesisFirehose",
        "subject": "kinesisfirehose/message/status",
          "target": "cloud"
        }
        }
    }
       }

After you deploy the components, you will see them on the Components tab of your core device.

Greengrass Components

Fig 4: AWS IoT Greengrass components

Ingest data

In this step, you ingest the data from your device to AWS IoT Greengrass, which will subsequently land in Data Firehose. Complete the following steps:

  1. From your edge device that is MQTT aware, or your edge gateway, publish the data to the topic defined earlier ( client/#). For example, we publish the data to the client/devices/telemetry MQTT topic.
  2. If you want to do this as a proof of concept, refer to Create a virtual device with Amazon EC2 to create a sample IoT device.

The following code is a sample payload for our example:

PAYLOAD="{
\"device_id\": \"$DEVICE_ID\",
\"timestamp\": \"$TIMESTAMP\",
\"temperature\": $TEMPERATURE,
\"pressure\": $PRESSURE,
\"flow_rate\": $FLOW_RATE,
\"vibration\": $VIBRATION,
\"motor_speed\": $MOTOR_SPEED,
\"status\": \"$STATUS\",
\"battery\": $((RANDOM % 30 + 70 )),
}"

For additional details on how to publish messages from a sample device, refer to Just-in-time provisioning.

The MQTT bridge component will route the payload from the MQTT topic (client/devices/telemetry) to an IPC topic by the same name. The custom component that you deployed earlier will listen to the IPC topic client/devices/telemetry and publish to the IPC topic kinesisfirehose/message. The message must follow the structure described in Input data.

Validate the data in Athena

You can now query the data published from the edge IoT device using Athena. On the Athena console, find the catalog and database that you set up, and run the following query:SELECT * FROM <<database>>."device_telemetry" limit 10;You should see the data displayed as shown in the following screenshot. Note the database and table name that you had defined as part of the “Provision a Data Firehose” stream step.

Validate Data in Athena

Fig 5: Validate Data in Athena

Scale out the solution

In the preceding sections, we showed how multiple equipments can ingest data into the cloud using a single Greengrass edge gateway device. Because manufacturing locations are distributed in a real-world scenario, you might set up Greengrass devices at other sites and publish the data to the same Firehose stream. This makes sure the data from different sites is landed into a single S3 bucket, is partitioned appropriately (Device_Id in our example), and can be queried seamlessly.

Clean up

After you validate the results, you can delete the following resources to avoid incurring additional costs:

  1. Delete the EC2 Ubuntu instance you created for your proof of concept.
  2. Delete the Firehose delivery stream and associated resources.
  3. Drop the Athena tables created for querying the data.
  4. Delete the S3 Tables bucket you provisioned.

Conclusion

In this post, we showed how to set up a scalable edge-to-cloud near real-time data ingestion framework using AWS IoT Greengrass and start performing analytics on the data within AWS services using a low-code approach. We demonstrated how to optimize the data storage into Iceberg format with S3 Tables, and transform the streaming data before it lands on the storage layer using Data Firehose. We also discussed how you can scale this solution horizontally across multiple manufacturing locations (plants or sites) to create a low-code solution to analyze data in near real time.

To learn more, refer to the following resources:


About the authors

Joyson Neville Lewis is a Sr. Conversational AI Architect with AWS Professional Services. Joyson worked as a Software/Data engineer before diving into the Conversational AI and Industrial IoT space. He assists AWS customers to materialize their AI visions using Voice Assistant/Chatbot and IoT solutions.

Anil Vure is a Sr. IoT Data Architect with AWS Professional services. Anil has extensive experience building large-scale data platforms and works with manufacturing customers designing high-speed data ingestion systems.

Ashok Padmanabhan is a Sr. IoT Data Architect with AWS Professional Services. Ashok primarily works with manufacturing and automotive customers to design and build Industry 4.0 solutions.

Use Databricks Unity Catalog Open APIs for Spark workloads on Amazon EMR

Post Syndicated from Venkat Viswanathan original https://aws.amazon.com/blogs/big-data/use-databricks-unity-catalog-open-apis-for-spark-workloads-on-amazon-emr/

This post was written with John Spencer, Sreeram Thoom, and Dipankar Kushari from Databricks.

Organizations need seamless access to data across multiple platforms and business units. A common scenario involves one team using Amazon EMR for data processing while needing to access data that another team manages in Databricks Unity Catalog. Traditionally, this would require data duplication or complex manual setups.

Although both Amazon EMR and Databricks Unity Catalog are powerful tools on their own, integrating them effectively is crucial for maintaining strong data governance, security, and operational efficiency. In this post, we demonstrate how to achieve this integration using Amazon EMR Serverless, though the approach works well with other Amazon EMR deployment options and Unity Catalog OSS.

EMR Serverless makes running big data analytics frameworks straightforward by offering a serverless option that automatically provisions and manages the infrastructure required to run big data applications. Teams can run Apache Spark and other workloads without the complexity of cluster management, while providing cost-effective scaling based on actual workload demands and seamless integration with AWS services and security controls.

Databricks Unity Catalog serves as a unified governance solution for data and AI assets, providing centralized access control and auditing capabilities. It enables fine-grained permissions across workspaces and cloud platforms, while supporting comprehensive metadata management and data discovery across the organization, and can complement governance tools like AWS Lake Formation.

To enable Amazon EMR to process data maintained in Unity Catalog, the data team traditionally copies data products across the platforms to a location accessible by Amazon EMR. The practice of data duplication not only leads to increased storage costs, but also severely impacts data quality and makes it challenging to effectively enforce same governance policies across different systems, track data lineage, enforce data retention policies, and maintain consistent access controls across the organization.

Now using Unity Catalog’s Open REST APIs, Amazon EMR customers can read from and write to Databricks Unity Catalog and Unity Catalog OSS tables using Spark, enabling cross-platform interoperability while maintaining governance and access controls across Amazon EMR and Unity Catalog.

Solution overview

In this post, we will provide an overview of EMR Spark workload integration with Databricks Unity Catalog and walk through the end-to-end process of reading from and writing to Databricks Unity Catalog tables using Amazon EMR and Spark. We show you how to configure EMR Serverless to interact with Databricks Unity Catalog, run an interactive Spark workload to access the data, and run an analysis to derive insights.

The following diagram illustrates the solution architecture.

Prerequisites

You must have the following prerequisites:

In the following sections, we walk through the process of reading and writing to Unity Catalog with EMR Serverless.

Enable Unity Catalog for external access

Log in to your workspace as a Databricks admin and complete the following steps to configure external access to read Databricks objects:

  1. Enable external data access for your metastore. For instructions, see Enable external data access on the metastore.
  2. Set up a principal that will be configured with Amazon EMR for data access.
  3. Grant the principal the privilege to configure the integration of the EXTERNAL USE SCHEMA privilege on the schema containing the objects. For instructions, see Grant a principal EXTERNAL USE SCHEMA.
  4. For this post, we generate a Databricks personal access token (PAT) for the principal and note it down. For instructions, refer to Authorizing access to Databricks resources and Databricks personal access token authentication.

For a production deployment, store the PAT in AWS Secrets Manager. You can use it in a later step to read and write to Unity Catalog with Amazon EMR.

Configure EMR Spark to access Unity Catalog

In this walkthrough, we run PySpark interactive queries through notebooks using EMR Studio. Complete the following steps:

  1. Open the AWS Management Console with administrator permission.
  2. Create an EMR Studio to run interactive workloads. To create a workspace, you need to specify the S3 bucket created in the prerequisites and the minimum service role for EMR Serverless. For instructions, see Set up an EMR Studio.
  3. For this post, we create two EMR Serverless applications. For instructions, see Creating an EMR Serverless application from the EMR Studio console.
    1. For Iceberg tables, create an EMR Serverless application called dbx-demo-application-iceberg with version 7.8.0 or higher. Make sure to deselect Use AWS Glue Data Catalog as Metastore under Additional Configurations, Metastore configuration. Add the following Spark configuration (see Configure applications). Provide the name of the catalog in Unity Catalog that contains your tables and the URL of the Databricks workspace.
      {
          "runtimeConfiguration": [
              {
                  "classification": "spark-defaults",
                  "properties": {
                      "spark.jars": "/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar",
                      "spark.jars.packages": "io.unitycatalog:unitycatalog-spark_2.12:0.2.0",
                      "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                      "spark.sql.defaultCatalog": "<uc-catalog-name>",
                      "spark.sql.catalog.<uc-catalog-name>": "org.apache.iceberg.spark.SparkCatalog",
                      "spark.sql.catalog.<uc-catalog-name>.uri": "https://<workspace-url>/api/2.1/unity-catalog/iceberg-rest",
                      "spark.sql.catalog.<uc-catalog-name>.type": "rest",
                      "spark.sql.catalog.<uc-catalog-name>.warehouse": "<uc-catalog-name>"
                  }
              }
          ]
      }

    2. For Delta tables, create an EMR Serverless application called dbx-demo-application and version 7.8.0 or higher. Make sure to deselect Use AWS Glue Data Catalog as Metastore under Additional Configurations, Metastore configuration. Add the following Spark configuration (see Configure applications). Provide the name of the catalog in Unity Catalog that contains your tables and the URL of the Databricks workspace.
      {
          "runtimeConfiguration": [
              {
                  "classification": "spark-defaults",
                  "properties": {
                      "spark.jars": "/usr/share/aws/delta/lib/delta-spark.jar,/usr/share/aws/delta/lib/delta- storage.jar",
                      "spark.jars.packages": "io.unitycatalog:unitycatalog-spark_2.12:0.2.0",
                      "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
                      "spark.sql.defaultCatalog": "<uc-catalog-name>",
                      "spark.sql.catalog.spark_catalog": "io.unitycatalog.spark.UCSingleCatalog",
                      "spark.sql.catalog.<uc-catalog-name>": "io.unitycatalog.spark.UCSingleCatalog",
                      "spark.sql.catalog.<uc-catalog-name>.uri": "https://<workspace-url>/api/2.1/unity-catalog"
                  }
              }
          ]
      }

  4. To set up your interactive workload with a runtime role, see Run interactive workloads with EMR Serverless through EMR Studio.

Read and write to Unity Catalog with Amazon EMR

Launch the workspace created in the previous step. Download the notebooks create-delta-table and create-iceberg-table and upload them to the EMR Studio workspace.

The create-delta-table.ipynb notebook configures the metastore properties to work with Delta tables. The create-iceberg-table.ipynb notebook configures the metastore properties to work with Iceberg tables.

Add the generated token to the session.

For a production deployment, store the PAT in Secrets Manager.

For Iceberg tables, connect to the EMR Serverless application dbx-demo-application-iceberg with the runtime role created in earlier steps under compute and run the notebook (create-iceberg-table). Select PySpark as the kernel and execute each cell in the notebook by choosing the run icon. Refer to Submit a job run or interactive workload for further details about how to run an interactive notebook.

We use the following code to create an external Iceberg table in the catalog:

CREATE SCHEMA IF NOT EXISTS customerschema;
USE SCHEMA customerschema;
CREATE TABLE IF NOT EXISTS iceberg_customer (id string, name string, country string) USING iceberg; 
insert into iceberg customer values('1','Alice','US');

For Delta tables, connect to the EMR Serverless application dbx-demo-application with the runtime role created in earlier steps and run the notebook (create-delta-table). Select PySpark as the kernel and execute each cell in the notebook by choosing the run icon. Refer to Submit a job run or interactive workload for further details about how to run an interactive notebook.

We use the following code to create an external Delta table in the catalog:

CREATE SCHEMA IF NOT EXISTS customerschema;
USE SCHEMA customerschema;
CREATE TABLE IF NOT EXISTS delta_customer (id int, name string, country string) USING delta LOCATION ‘s3://<bucket_name>/emr-dbx/external/customerschema/delta_customer’;
insert into delta_customer values(1,'Bob','US'); 

Verify in Databricks for both Iceberg and Delta tables

Now you can run queries in Databricks Unity Catalog to show the records inserted into the Iceberg and Delta tables from EMR Serverless:

  1. Log in to your Databricks workspace.
  2. Choose SQL Editor in the navigation pane.
  3. Run queries for both Iceberg and Delta tables.
  4. Verify the results show the same as what you saw in the Jupyter notebook in EMR Studio.

The following screenshot shows an example of querying the Iceberg table.

The following screenshot shows an example of querying the Delta table.

Clean up

Clean up the resources used in this post to avoid additional charges:

  1. Delete the IAM roles for this post.
  2. Delete the EMR applications and EMR Studio setup created for this post.
  3. Delete the resources created in Unity Catalog.
  4. Empty and then delete the S3 bucket.

Summary

In this post, we demonstrated the powerful interoperability between Amazon EMR and Databricks Unity Catalog by walking through how to enable external access to Unity Catalog, configure EMR Spark to connect seamlessly with Unity Catalog, and perform DML and DDL operations on Unity Catalog tables using EMR Serverless.

To learn more about using EMR Serverless, see Getting started with Amazon EMR Serverless. To learn more about using tools like EMR Spark with Unity Catalog, see Unity Catalog integrations.


About the authors

Venkatavaradhan (Venkat) Viswanathan is a Global Partner Solutions Architect at Amazon Web Services. Venkat is a Technology Strategy Leader in Data, AI, ML, generative AI, and Advanced Analytics. Venkat is a Global SME for Databricks and helps AWS customers design, build, secure, and optimize Databricks workloads on AWS.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She works with the product team and customers to build robust features and solutions for their analytical data platform. She enjoys building data mesh solutions and sharing them with the community.

Ramkumar Nottath is a Principal Solutions Architect at AWS focusing on Analytics services. He enjoys working with various customers to help them build scalable, reliable big data and analytics solutions. His interests extend to various technologies such as analytics, data warehousing, streaming, data governance, and machine learning. He loves spending time with his family and friends.

John Spencer is a Product Manager at Databricks, dedicated to making Unity Catalog work seamlessly with customers’ ecosystems of tools and platforms so they can easily access, govern, and use their data.

Sreeram Thoom is a Specialist Solutions Architect at Databricks helping customers design secure, scalable applications on the Data Lakehouse.

Dipankar Kushari is a specialist solutions architect at Databricks helping customer architect and build secured applications on Data Lakehouse.

Trusted identity propagation using IAM Identity Center for Amazon OpenSearch Service

Post Syndicated from Sohaib Katariwala original https://aws.amazon.com/blogs/big-data/trusted-identity-propagation-using-iam-identity-center-for-amazon-opensearch-service/

Enterprise customers of Amazon OpenSearch Service require comprehensive security controls with seamless authentication and authorization mechanisms when accessing data in provisioned domains and Amazon OpenSearch Serverless collections. Security teams within these organizations must not only maintain compliance with enterprise policies but also need to make sure that their users can access data securely, with robust identity management. AWS IAM Identity Center is a popular mechanism for identity management that provides single sign-on (SSO) capabilities for these enterprise customers. IAM Identity Center can use Security Assertion Markup Language (SAML) with both OpenSearch Service provisioned domains and OpenSearch Serverless. Now, by using trusted identity propagation, IAM Identity Center provides a new, direct method for accessing data in OpenSearch Service.

In this post, we outline how you can take advantage of this new access method to simplify data access using the OpenSearch UI and still maintain robust role-based access control for your OpenSearch data.

Trusted identity propagation overview

Trusted identity propagation in IAM Identity Center adds the identity context of a user to a role when accessing OpenSearch Service, which in turn uses this context to authorize and scope OpenSearch data access. This simplifies the authentication and authorization flow for customers because the applications access the data on their behalf. Users or user agents need not be present between the application and the backend services for this authorization to happen, unlike methods like SAML where a user agent needs to be present between these entities as a go-between for exchanging assertions. This flexibility helps simplify accessing a wide variety of data sources such as data residing within the Amazon Virtual Private Cloud (Amazon VPC) of an OpenSearch Service domain, or an OpenSearch Serverless collection. By using the OpenSearch UI, you can additionally simplify the backend connections, resulting in seamless access to the data. The following figures shows how the identity propagation works with OpenSearch Service.

Prerequisites

Before starting to use IAM Identity Center with OpenSearch Service, there are a few options that you must enable. To start, set up an organization or account instance of IAM Identity Center following the instruction in this guide. For OpenSearch Service-provisioned domains, you must enable the IAM Identity Center (IDC) Authentication –new option. You can do this though AWS CloudFormation, OpenSearch REST API, AWS SDK, or the AWS Management Console.

To enable Identity Center using the console

  1. To add the capability for an existing provisioned domain, go to the OpenSearch Service console and navigate to the Security configuration tab and choose Edit.

Amazon OpenSearch Service console highlighting security settings, cluster health status, and VPC endpoint configuration

  1. After this step, or if you are creating new domain, select the check box for IAM Identity Center (IDC) Authentication – new.
  2. You have various options to choose for Subject Key and Roles Key depending upon how you want to establish your role-based access control discussed later in this post. For now, select UserName for Subject Key and GroupName for Roles Key.

AWS IAM Identity Center SSO configuration interface with API authentication controls and identity mapping settings

  1. For OpenSearch Serverless, choose Serverless in the navigation pane, then Security and Authentication. Choose Edit in the IAM Identity Center (IdC) authentication – new section.

AWS OpenSearch Service authentication management console with SAML providers list and IAM Identity Center integration status

  1. Select the checkbox for Authenticate with IAM Identity Center, and then choose Save.

AWS IAM Identity Center configuration modal for enabling centralized authentication and resource access management

  1. Select the checkbox for Authentication with IAM Identity Center under Single sign-on authentication, when creating an OpenSearch UI application. For step-by-step instructions on how to create an OpenSearch UI application, see Creating an OpenSearch UI application

AWS single sign-on setup interface with option to enable IAM Identity Center for unified access management

After these steps, you’re ready to configure IAM Identity Center by creating new users and groups, or by using existing user identities.

Propagating IAM Identity Center identities

Currently, adding single sign-on authentication with IAM Identity Center can be done while setting up a new OpenSearch UI application. Use the following steps to create a new OpenSearch UI application. Note that single sign-on currently cannot be turned on after an application is created. After single sign-on is enabled, you should see an AWS managed application under Applications in the Identity Center console.

AWS Identity Center interface showing managed applications tab with OpenSearch integration and service details

Assigning users and groups

After the application is created and the status shows as Active, you need to assign users and groups to the application. This assignment is important and recommended because these assignments determine the scope and permissions for data access within OpenSearch Service. To do this, select the application you created in the previous steps in the OpenSearch Service console. Here, you will see an option for IAM Identity Center user and groups under Single sign-on authentication. Choose Assign users and groups and select the appropriate Identity Center users and groups.

 AWS IAM Identity Center dashboard showing connected status, role details, and option to assign users and groups for SSO

For OpenSearch Serverless, you must create a new data access policy or add a rule to an existing one to grant IAM Identity Center principals appropriate permissions to access the collections. For example, the following figure shows a data access policy that grants specific permissions to a one user with Rule 1 and provide a more restrictive permission to a group with Rule 2.

Amazon OpenSearch Service console displaying sample data access policy with IAM principal and collection/index permissions

At this point your OpenSearch Service domains, OpenSearch Serverless collections and OpenSearch UI are set up for identity propagation.

Fine grained access control for IAM Identity Center identities

Fine grained access control is a role-based access control for OpenSearch Service that provides security at index, document, and field levels for provisioned domains. You can choose what aspects of identity context you propagate to OpenSearch Service. You can choose between UserId, UserName, and Email for your Subject keys, and GroupId and GroupName for your Roles key. This configuration is important because the values of the properties in the identity context are used to match exactly with the user and backend role mapping within OpenSearch Service provisioned domains. Note that if IAM Identity Center sign-on isn’t enabled, OpenSearch Service can only evaluate the request signature with AWS signature Version 4. This means that the role your OpenSearch UI will use won’t contain identity context for authorization. To complete authorization, add the values of the identity context fields to the OpenSearch role mapping. See Mapping roles to users under Managing permissions. Role mapping can be done using OpenSearch REST API, AWS SDK, or using OpenSearch Dashboards.

To map roles using OpenSearch Dashboards

  1. From the menu icon  on the top left corner or your screen, select Management, Security, Roles, <Your role>.
  2. Choose the Mapped Users tab and select Manage mapping.

AWS security console showing Map User interface with internal user creation and backend role management for IAM

  1. When mapping the role, make sure that you enter the values corresponding to the Subject key. This value must be the same as in your identity context. Additionally, use the Roles key to assign access-based IAM Identity Center groups.

With OpenSearch Serverless, the granularity of access control is at the index level so you will need to add additional rules in the Data Access Policy to control principals who can access collections or indices within a collection.

Verifying identity propagation

The final step is to verify identity propagation.

  1. Open the OpenSearch UI application and select IAM Identity Center from the Login drop down.

  1. After you complete the login process with IAM Identity Center, the OpenSearch UI will open. Choose the user icon in the lower left corner of the screen to verify that it’s your correct principal from Identity Center. It should match the Identity Center property you chose earlier.

OpenSearch welcome interface showing available workspaces, documentation links, and user profile menu with logout option

  1. To verify correct identity propagation, choose the Dev tools  icon just above the user profile icon in the bottom left corner of the screen.
  2. Select the correct OpenSearch domain or OpenSearch Serverless collection data source in the top right corner of the screen and run a _search query. You should see results from the data source confirming that the identity is correctly propagated to OpenSearch Service.

Dev tools showing successful search operation with 5ms response time and config details

Conclusion

In this post, we showed you how to use Trusted Identity Propagation using IAM Identity Center for Amazon OpenSearch Service, providing a streamlined approach to secure data access while maintaining robust access controls. This solution offers several key benefits:

  • Simplified authentication: By eliminating the need for user agents between applications and backend services, the solution streamlines the authentication process compared to traditional SAML-based approaches.
  • Enhanced security: The integration maintains comprehensive security controls while providing seamless authentication and authorization mechanisms for both OpenSearch Service provisioned domains and Amazon OpenSearch Serverless collections.
  • Flexible identity management: Organizations can use existing IAM Identity Center implementations to manage user access, making it easier to maintain compliance with enterprise security policies.
  • Fine-grained access control: The solution supports detailed access control at the index, document, and field level for provisioned domains, allowing organizations to implement precise security measures.

Get started implementing this solution in your environment today!

For more information about identity management and security best practices with OpenSearch Service, we recommend:


About the authors

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Sohaib Katariwala is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service based out of Chicago, IL. His interests are in all things data and analytics. More specifically he loves to help customers use AI in their data strategy to solve modern day challenges.

Introducing MCP Server for Apache Spark History Server for AI-powered debugging and optimization

Post Syndicated from Manabu McCloskey original https://aws.amazon.com/blogs/big-data/introducing-mcp-server-for-apache-spark-history-server-for-ai-powered-debugging-and-optimization/

Organizations running Apache Spark workloads, whether on Amazon EMR, AWS Glue, Amazon Elastic Kubernetes Service (Amazon EKS), or self-managed clusters, invest countless engineering hours in performance troubleshooting and optimization. When a critical extract, transform, and load (ETL) pipeline fails or runs slower than expected, engineers end up spending hours navigating through multiple interfaces such as logs or Spark UI, correlating metrics across different systems and manually analyzing execution patterns to identify root causes. Although Spark History Server provides detailed telemetry data, including job execution timelines, stage-level metrics, and resource consumption patterns, accessing and interpreting this wealth of information requires deep expertise in Spark internals and navigating through multiple interconnected web interface tabs.

Today, we’re announcing the open source release of Spark History Server MCP, a specialized Model Context Protocol (MCP) server that transforms this workflow by enabling AI assistants to access and analyze your existing Spark History Server data through natural language interactions. This project, developed collaboratively by AWS open source and Amazon SageMaker Data Processing, turns complex debugging sessions into conversational interactions that deliver faster, more accurate insights without requiring changes to your current Spark infrastructure. You can use this MCP server with your self-managed or AWS managed Spark History Servers to analyze Spark applications running in the cloud or on-premises deployments.

Understanding Spark observability challenge

Apache Spark has become the standard for large-scale data processing, powering critical ETL pipelines, real-time analytics, and machine learning (ML) workloads across thousands of organizations. Building and maintaining Spark applications is, however, still an iterative process, where developers spend significant time testing, optimizing, and troubleshooting their code. Spark application developers focused on data engineering and data integration use cases often encounter significant operational challenges due to a few different reasons:

  • Complex connectivity and configuration options to a variety of resources with Spark – Although this makes it a popular data processing platform, it often makes it challenging to find the root cause of inefficiencies or failures when Spark configurations aren’t optimally or correctly configured.
  • Spark’s in-memory processing model and distributed partitioning of datasets across its workers – Although good for parallelism, this often makes it difficult for users to identify inefficiencies. This results in slow application execution or root cause of failures caused by resource exhaustion issues such as out of memory and disk exceptions.
  • Lazy evaluation of Spark transformations – Although lazy evaluation optimizes performance, it makes it challenging to accurately and quickly identify the application code and logic that caused the failure from the distributed logs and metrics emitted from different executors.

Spark History Server

Spark History Server provides a centralized web interface for monitoring completed Spark applications, serving comprehensive telemetry data including job execution timelines, stage-level metrics, task distribution, executor resource consumption, and SQL query execution plans. Although Spark History Server assists developers for performance debugging, code optimization, and capacity planning, it still has challenges:

  • Time-intensive manual workflows – Engineers spend hours navigating through the Spark History Server UI, switching between multiple tabs to correlate metrics across jobs, stages, and executors. Engineers must constantly switch between the Spark UI, cluster monitoring tools, code repositories, and documentation to piece together a complete picture of application performance, which often takes days.
  • Expertise bottlenecks – Effective Spark debugging requires deep understanding of execution plans, memory management, and shuffle operations. This specialized knowledge creates dependencies on senior engineers and limits team productivity.
  • Reactive problem-solving – Teams typically discover performance issues only after they impact production systems. Manual monitoring approaches don’t scale to proactively identify degradation patterns across hundreds of daily Spark jobs.

How MCP transforms Spark observability

The Model Context Protocol provides a standardized interface for AI agents to access domain-specific data sources. Unlike general-purpose AI assistants operating with limited context, MCP-enabled agents can access technical information about specific systems and provide insights based on actual operational data rather than generic recommendations.With the help of Spark History Server accessible through MCP, instead of manually gathering performance metrics from multiple sources and correlating them to understand application behavior, engineers can engage with AI agents that have direct access to all Spark execution data. These agents can analyze execution patterns, identify performance bottlenecks, and provide optimization recommendations based on actual job characteristics rather than general best practices.

Introduction to Spark History Server MCP

The Spark History Server MCP is a specialized bridge between AI agents and your existing Spark History Server infrastructure. It connects to one or more Spark History Server instances and exposes their data through standardized tools that AI agents can use to retrieve application metrics, job execution details, and performance data.

Importantly, the MCP server functions purely as a data access layer, enabling AI agents such as Amazon Q Developer CLI, Claude desktop, Strands Agents, LlamaIndex, and LangGraph to access and reason about your Spark data. The following diagram shows this flow.

The Spark History Server MCP directly addresses these operational challenges by enabling AI agents to access Spark performance data programmatically. This transforms the debugging experience from manual UI navigation to conversational analysis. Instead of hours in the UI, ask, “Why did job spark-abcd fail?” and receive root cause analysis of the failure. This allows users to use AI agents for expert-level performance analysis and optimization recommendations, without requiring deep Spark expertise.

The MCP server provides comprehensive access to Spark telemetry across multiple granularity levels. Application-level tools retrieve execution summaries, resource utilization patterns, and success rates across job runs. Job and stage analysis tools provide execution timelines, stage dependencies, and task distribution patterns for identifying critical path bottlenecks. Task-level tools expose executor resource consumption patterns and individual operation timings for detailed optimization analysis. SQL-specific tools provide query execution plans, join strategies, and shuffle operation details for analytical workload optimization. You can review the complete set of tools available in the MCP server in the project README.

How to use the MCP server

The MCP is an open standard that enables secure connections between AI applications and data sources. This MCP server implementation supports both Streamable HTTP and STDIO protocols for maximum flexibility.

The MCP server runs as a local service within your infrastructure either on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon EKS, connecting directly to your Spark History Server instances. You maintain complete control over data access, authentication, security, and scalability.

All the tools are available with streamable HTTP and STDIO protocol:

  • Streamable HTTP – Full advanced tools for LlamaIndex, LangGraph, and programmatic integrations
  • STDIO mode – Core functionality of Amazon Q CLI and Claude Desktop

For deployment, it supports multiple Spark History Server instances and provides deployments with AWS Glue, Amazon EMR, and Kubernetes.

Quick local setup

To set up Spark History MCP server locally, execute the following commands in your terminal:

git clone 
cd spark-history-server-mcp

# Install Task (if not already installed)
brew install go-task # macOS, see  for others

# Setup and start testing
task install            # Install dependencies
task start-spark-bg     # Start Spark History Server with sample data
task start-mcp-bg       # Start MCP Server
task start-inspector-bg # Start MCP Inspector

# Opens  for interactive testing
# When done, run task stop-all

For comprehensive configuration examples and integration guides, refer to the project README.

Integration with AWS managed services

The Spark History Server MCP integrates seamlessly with AWS managed services, offering enhanced debugging capabilities for Amazon EMR and AWS Glue workloads. This integration adapts to various Spark History Server deployments available across these AWS managed services while providing a consistent, conversational debugging experience:

  • AWS Glue – Users can use the Spark History Server MCP integration with self-managed Spark History Server on an EC2 instance or launch locally using Docker container. Setting up the integration is straightforward. Follow the step-by-step instructions in the README to configure the MCP server with your preferred Spark History Server deployment. Using this integration, AWS Glue users can analyze AWS Glue ETL job performance regardless of their Spark History Server deployment approach.
  • Amazon EMR – Integration with Amazon EMR uses the service-managed Persistent UI feature for EMR on Amazon EC2. The MCP server requires only an EMR cluster Amazon Resource Name (ARN) to discover the available Persistent UI on the EMR cluster or automatically configure a new one for cases its missing with token-based authentication. This eliminates the need for manually configuring Spark History Server setup while providing secure access to detailed execution data from EMR Spark applications. Using this integration, data engineers can ask questions about their Spark workloads, such as “Can you get job bottle neck for spark-<emr-applicationId>? ” The MCP responds with detailed analysis of execution patterns, resource utilization differences, and targeted optimization recommendations, so teams can fine-tune their Spark applications for optimal performance across AWS services.

For comprehensive configuration examples and integration details, refer to the AWS Integration Guides.

Looking ahead: The future of AI-assisted Spark optimization

This open-source release establishes the foundation for enhanced AI-powered Spark capabilities. This project establishes the foundation for deeper integration with AWS Glue and Amazon EMR to simplify the debugging and optimization experience for customers using these Spark environments. The Spark History Server MCP is open source under the Apache 2.0 license. We welcome contributions including new tool extensions, integrations, documentation improvements, and deployment experiences.

Get started today

Transform your Spark monitoring and optimization workflow today by providing AI agents with intelligent access to your performance data.

  • Explore the GitHub repository
  • Review the comprehensive README for setup and integration instructions
  • Join discussions and submit issues for enhancements
  • Contribute new features and deployment patterns

Acknowledgment: A special thanks to everyone who contributed to the development and open-sourcing of the Apache Spark history server MCP: Vaibhav Naik, Akira Ajisaka, Rich Bowen, Savio Dsouza.


About the authors

Manabu McCloskey is a Solutions Architect at Amazon Web Services. He focuses on contributing to open source application delivery tooling and works with AWS strategic customers to design and implement enterprise solutions using AWS resources and open source technologies. His interests include Kubernetes, GitOps, Serverless, and Souls Series.

Vara Bonthu is a Principal Open Source Specialist SA leading Data on EKS and AI on EKS at AWS, driving open source initiatives and helping AWS customers to diverse organizations. He specializes in open source technologies, data analytics, AI/ML, and Kubernetes, with extensive experience in development, DevOps, and architecture. Vara focuses on building highly scalable data and AI/ML solutions on Kubernetes, enabling customers to maximize cutting-edge technology for their data-driven initiatives

Andrew Kim is a Software Development Engineer at AWS Glue, with a deep passion for distributed systems architecture and AI-driven solutions, specializing in intelligent data integration workflows and cutting-edge feature development on Apache Spark. Andrew focuses on re-inventing and simplifying solutions to complex technical problems, and he enjoys creating web apps and producing music in his free time.

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Kartik Panjabi is a Software Development Manager on the AWS Glue team. His team builds generative AI features for the Data Integration and distributed system for data integration.

Mohit Saxena is a Senior Software Development Manager on the AWS Data Processing Team (AWS Glue and Amazon EMR). His team focuses on building distributed systems to enable customers with new AI/ML-driven capabilities to efficiently transform petabytes of data across data lakes on Amazon S3, databases and data warehouses on the cloud.

Accelerating development with the AWS Data Processing MCP Server and Agent

Post Syndicated from Shubham Mehta original https://aws.amazon.com/blogs/big-data/accelerating-development-with-the-aws-data-processing-mcp-server-and-agent/

Data engineering teams face an increasingly complex landscape when building and maintaining analytics environments. From sourcing and organizing data to implementing transformation pipelines and managing access controls, the process of transforming raw data into actionable insights involves numerous interconnected components. While individual tools exist for each task, connecting them into cohesive workflows remains time-consuming and requires deep technical expertise across multiple AWS services.

To address these challenges and enhance developer productivity, we’re excited to introduce the AWS Data Processing MCP Server, an open-source tool that uses the Model Context Protocol (MCP) to simplify analytics environment setup on AWS. We’re also open sourcing a stand-alone Data Processing Agent implementation in AWS Strands SDK to use this MCP server to help customers further customize it for their use cases. This powerful integration enables AI assistants to understand your data processing environment and guide you through complex workflows using natural language interactions.

Understanding the Model Context Protocol advantage

The MCP is an emerging open standard that defines how AI models, particularly large language models (LLMs), can securely access and interact with external tools, data sources, and services. Rather than requiring developers to learn intricate API syntax across multiple services, MCP enables AI assistants to understand your environment contextually and provide intelligent guidance throughout your data processing journey.

The AWS Data Processing MCP Server harnesses this capability by providing AI code assistants with real-time visibility into your AWS data processing pipeline. This includes access to AWS Glue job statuses, Amazon Athena query results, Amazon EMR cluster metrics, and AWS Glue Data Catalog metadata through a unified interface that LLMs can understand and reason about.

AWS analytics integration

The AWS Data Processing MCP Server integrates deeply with AWS Glue for data cataloging and ETL operations, Amazon EMR for big data processing, and Athena for serverless analytics. This integration transforms how developers interact with these services by providing contextual awareness that enables AI assistants and Data Processing Strands Agent to make intelligent recommendations based on your actual infrastructure and data patterns.

Rather than requiring manual navigation between service consoles or memorizing complex API parameters, the MCP server enables natural language interactions that automatically translate to appropriate service operations. This approach reduces the learning curve for new team members while accelerating productivity for experienced developers working across multiple AWS analytics services.

Getting started with the AWS Data Processing MCP Server

You’ll need to follow the steps in the prerequisites section before you can start using MCP servers.

Prerequisites

Before configuring the MCP server, ensure you have the following prerequisites in place:

System requirements:

  • macOS or supported Linux environment
  • Python 3.10 or higher
  • UV package manager for Python dependency management
  • AWS Command Line Interface (AWS CLI) installed and configured with appropriate credentials

IAM permissions: Review and configure your security policies for the IAM roles and permissions that would grant necessary access to the AWS Data Processing MCP Server and Agent to execute AWS data processing operations on your behalf. For read-only operations, attach policies that include permissions for Data Catalog access, Amazon CloudWatch metrics, Amazon EMR cluster descriptions, and Athena query operations. For write operations, make sure that your AWS Identity and Access Management (IAM) role includes the AWSGlueServiceRole managed policy along with permissions for creating and managing Amazon EMR clusters and Athena workgroups.

Set up using Amazon Q CLI

Amazon Q Developer CLI provides an intuitive way to interact with the AWS Data Processing MCP Server directly from your terminal. This integration combines the natural language processing capabilities of Amazon Q with the data processing tools, enabling you to manage complex analytics workflows through conversational commands.

Installation and configuration:

  1. Install the Amazon Q Developer CLI.
  2. Clone the MCP Server repository:

git clone https://github.com/awslabs/mcp

  1. Edit your Q Developer CLI’s MCP configuration file named mcp.json:
{
  "mcpServers": {
    "aws.dp-mcp": {
      "autoApprove": [],
      "disabled": false,
      "command": "uvx",
      "args": [
        "awslabs.aws-dataprocessing-mcp-server@latest",
        "--allow-write"
      ],
      "env": {
        "AWS_PROFILE": "your-aws-profile",
        "AWS_REGION": "your-preferred-region"
      }
    }
  }
}
  1. Verify your setup by running the /tools command in the Q Developer CLI to see the available Data Processing MCP tools.

Set up using Claude Desktop

Claude Desktop offers another powerful way to interact with the AWS Data Processing MCP Server through Anthropic’s Claude interface, providing a user-friendly chat experience for managing your data processing workflows.

Installation and configuration:

  1. Download and install Claude Desktop for your operating system.
  2. Open Claude Desktop and navigate to Settings (gear icon in the bottom left).
  3. Go to the Developer tab and configure your MCP server by adding same configuration as step 3 in Q CLI setup.
  4. Restart Claude Desktop to activate the MCP server connection.
  5. Test the integration by starting a new conversation and asking: What data processing tools are available to me?

Enhanced developer experience

After being configured with either Amazon Q CLI or Claude Desktop, your workflow transforms dramatically. Instead of constructing complex AWS CLI commands with multiple parameters, you can use natural language requests. For example, rather than memorizing the syntax for creating AWS Glue crawlers, you can ask:

Create a Glue crawler for my S3 bucket that runs weekly and updates the data catalog with any schema changes

Accelerating development with MCP servers

Next, we explore the common patterns that emerge when using MCP in data processing development workflows.

Data onboarding and discovery

One of the most common challenges data teams face is efficiently onboarding new datasets and making them immediately useful for analysis. Consider a scenario where your marketing team receives a CSV file containing customer interaction data that needs to be quickly analyzed for campaign insights. Traditionally, this process involves multiple manual steps: uploading the file to Amazon Simple Storage Service (Amazon S3), configuring an AWS Glue crawler to discover the schema, creating appropriate table definitions, setting up proper partitioning, and finally making the data queriable through Athena.

With the AWS Data Processing MCP Server, this entire workflow becomes conversational. You can describe your goal using natural language:

I have a customer interaction CSV file that I need to analyze for marketing insights. Help me get this data ready for business users to query

The AI assistant, powered by the MCP server’s deep AWS integration, automatically handles the technical implementation details, guides you through uploading the file to an appropriate Amazon S3 location, configures and runs an AWS Glue crawler with optimal settings, creates properly formatted table definitions, and sets up Athena access with appropriate workgroup configurations for cost control.

The following video demonstration showcases how developers can use Amazon Q CLI with Data Processing MCP server for data onboarding.

Business insights and automated reporting

Modern organizations require timely, accurate insights to drive business decisions, but traditional analytics workflows often create bottlenecks between data availability and business consumption. Imagine you need to identify potentially fraudulent transactions across multiple data sources including cardholder information, credit card details, merchant data, and transaction records. Rather than manually writing complex SQL queries with multiple joins and filters, you can describe your analytical goal:

Analyze our transaction data across cardholders, credit cards, and merchants to identify suspicious activities involving transactions over $5,000 and create an automated weekly report.

The MCP server interprets this request and automatically constructs the appropriate analytical workflow. It examines your data catalog to understand table relationships, generates optimized SQL queries with proper joins across your datasets, executes the analysis using Athena with cost-effective query patterns, and formats the results into actionable reports. The system can establish automated delivery mechanisms, such as email reports or dashboard updates, ensuring stakeholders receive timely insights without manual intervention while creating scheduled AWS Glue jobs that continuously monitor for emerging patterns.

We’re also releasing a stand-alone Data Processing Agent developed using AWS Strands SDK that you can customize further with your system prompts and context for your use cases. You can run it locally or deploy it using Amazon Bedrock AgentCore. The following video demonstration showcases how developers can use Data Processing Agent for driving business insights.

Observability and performance monitoring

Maintaining visibility across complex data processing environments requires sophisticated monitoring capabilities that traditional approaches often fail to provide. The AWS Data Processing MCP Server enables intelligent observability by synthesizing real-time telemetry from across your AWS analytics infrastructure into actionable insights. For AWS Glue environments, the MCP server continuously analyzes job metadata, execution logs, resource configurations, and data catalog statistics to provide operational intelligence. Rather than manually navigating CloudWatch dashboards or parsing log files, you can ask questions like Show me performance trends across my ETL jobs and identify optimization opportunities. The following video demonstration showcases how developers can use Claude Desktop with Data Processing MCP Server to monitor Glue jobs and catalogs.

For Amazon EMR clusters, the MCP server aggregates cluster metadata, instance usage patterns, and failure events into unified operational views. This enables proactive management where you can request Analyze my EMR environment for cost optimization opportunities and potential reliability risks. The system responds with detailed analysis of cluster utilization patterns, recommendations for right-sizing instance types, identification of long-running clusters that might represent cost leakage, and alerts about configuration patterns that could impact reliability. The observability capabilities extend beyond simple monitoring to predictive insights by analyzing historical patterns to forecasting resource needs and recommend preventive actions. The following video demonstration showcases how developers can use Claude Desktop with Data Processing MCP Server to monitor EMR clusters.

Security and architectural considerations

All MCP server operations occur within your AWS account boundaries, helping to ensure that sensitive data does not leave your controlled environment. The server provides contextual information to AI assistants through metadata and API responses based on IAM access permissions available to the role being used. Integration with IAM helps ensure that operations respect existing permission boundaries and organizational policies.

The architecture supports graduated autonomy where routine operations can proceed automatically while high-impact changes require human approval. This balanced approach enables productivity gains while maintaining appropriate oversight for critical business operations.

Conclusion

In this post, we explored how the AWS Data Processing MCP Server accelerates analytics solution development across our analytics services. We demonstrated how data engineers can transform raw data into business-ready insights through AI-assisted workflows, significantly reducing development time and complexity. The AWS Data Processing MCP Server offers extensive capabilities beyond these use cases. You can use the MCP’s context-rich APIs to develop customized solutions for observability, automation, and optimization. This flexibility allows you to create workflows tailored to your specific data environments and business needs.By bringing AWS data processing capabilities directly into development workflows—whether through AWS CLI, IDEs, or AI-assisted tools—teams can focus on solving business problems rather than managing infrastructure. We encourage you to explore innovative applications of the MCP Server, combining its powerful context engine with AI-driven analysis to uncover new opportunities for efficiency and insight across their data ecosystems.

Get started today by accessing the open source code, documentation, and setup instructions in the AWS Labs GitHub repository. Integrate the MCP Server into your development workflow and transform how you build analytics solutions on AWS. We’ll continue to iterate based on customer feedback and look forward to seeing how customers extend these capabilities to solve complex data challenges.

Acknowledgment: A special thanks to everyone who contributed to the development and open-sourcing of the AWS Data Processing MCP server and Agent: Raghavendhar Thiruvoipadi Vidyasagar, Chris Kha, Sandeep Adwankar, Nidhi Gupta, Xiaoxi Liu, Kathryn Lin, Alexa Perlov, Alain Krok, Xiaorun Yu, Maheedhar Reddy Chapiddi, and Rajendra Gujja. 


About the authors

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Vaibhav Naik is a software engineer at AWS Glue, passionate about building robust, scalable solutions to tackle complex customer problems. With a keen interest in generative AI, he likes to explore innovative ways to develop enterprise-level solutions that harness the power of cutting-edge AI technologies.

Liyuan Lin is a Software Engineer at AWS Glue, where she works on building generative AI and data integration tools to help customers solve their data challenges. She specializes in developing solutions that combine AI capabilities with data integration workflows, making it easier for customers to manage and transform their data effectively.

Arun A K is a Big Data Solutions Architect with AWS. He works with customers to provide architectural guidance for running analytics solutions on the cloud. In his free time, Arun loves to enjoy quality time with his family.

Sarath Krishnan is a Senior Solutions Architect with Amazon Web Services. He is passionate about enabling enterprise customers on their digital transformation journey. Sarath has extensive experience in architecting highly available, scalable, cost-effective, and resilient applications on the cloud. His area of focus includes DevOps, machine learning, MLOps, and generative AI.

Pradeep Patel is a Software Development Manager on the AWS Data Processing Team (AWS Glue and Amazon EMR). His team focuses on building distributed systems to enable seamless Spark Code Transformation using AI.

Mohit Saxena is a Senior Software Development Manager on the AWS Data Processing Team (AWS Glue and Amazon EMR). His team focuses on building distributed systems to enable customers with new AI/ML-driven capabilities to efficiently transform petabytes of data across data lakes on Amazon S3, databases and data warehouses on the cloud.