Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

Medidata’s journey to a modern lakehouse architecture on AWS

Post Syndicated from Mike Araujo original https://aws.amazon.com/blogs/big-data/medidatas-journey-to-a-modern-lakehouse-architecture-on-aws/

This post was co-authored by Mike Araujo Principal Engineer at Medidata Solutions.

The life sciences industry is transitioning from fragmented, standalone tools towards integrated, platform-based solutions. Medidata, a Dassault Systèmes company, is building a next-generation data platform that addresses the complex challenges of modern clinical research. In this post, we show you how Medidata created a unified, scalable, real-time data platform that serves thousands of clinical trials worldwide with AWS services, Apache Iceberg, and a modern lakehouse architecture.

Challenges with legacy architecture

As the Medidata clinical data repository expanded, the team recognized the shortcomings of the legacy data solution to provide quality data products to their customers across their growing portfolio of data offerings. Several data tenants began to erode. The following diagram shows Medidata’s legacy extract, transform, and load (ETL) architecture.

Built upon a series of scheduled batch jobs, the legacy system proved ill-equipped to provide a unified view of the data across the entire ecosystem. Batch jobs ran at different intervals, often requiring a sufficient degree of scheduling buffer to make sure upstream jobs completed within the expected window. As the data volume expanded, the jobs and their schedules continued to inflate, introducing a latency window between ingestion and processing for dependent consumers. Different consumers operating from various underlying data services further magnified the problem as pipelines had to be continuously built across a variety of data delivery stacks.

The expanding portfolio of pipelines began to overwhelm existing maintenance operations. With more operations, the opportunity for failure expanded and recovery efforts further complicated. Existing observability systems were inundated with operational data, and identifying the root cause of data quality issues became a multi-day endeavor. Increases in the data volume required scaling considerations across the entire data estate.

Additionally, the proliferation of data pipelines and copies of the data in different technologies and storage systems necessitated expanding access controls with enhanced security features to make sure only the correct users had access to the subset of data to which they were permitted. Making sure access control changes were correctly propagated across all systems added a further layer of complexity to consumers and producers.

Solution overview

With the advent of Clinical Data Studio (Medidata’s unified data management and analytics solution for clinical trials) and Data Connect (Medidata’s data solution for acquiring, transforming, and exchanging electronic health record (EHR) data across healthcare organizations), Medidata introduced a new world of data discovery, analysis, and integration to the life sciences industry powered by open source technologies and hosted on AWS. The following diagram illustrates the solution architecture.

Fragmented batch ETL jobs were replaced by real-time Apache Flink streaming pipelines, an open source, distributed engine for stateful processing, and powered by Amazon Elastic Kubernetes Service (Amazon EKS), a fully managed Kubernetes service. The Flink jobs write to Apache Kafka running in Amazon Managed Apache Kafka (Amazon MSK), a streaming data service that manages Kafka infrastructure and operations, before landing in Iceberg tables backed by the AWS Glue Data Catalog, a centralized metadata repository for data assets. From this collection of Iceberg tables, a central, single source of data is now accessible from a variety of consumers without additional downstream processing, alleviating the need for custom pipelines to satisfy the requirements of downstream consumers. Through these fundamental architectural changes, the team at Medidata solved the issues presented by the legacy solution.

Data availability and consistency

With the introduction of the Flink jobs and Iceberg tables, the team was able to deliver a consistent view of their data across the Medidata data experience. Pipeline latency was reduced from days to minutes, helping Medidata customers realize a 99% performance gain from the data ingestion to the data analytics layers. Due to Iceberg’s interoperability, Medidata users saw the same view of the data regardless of where they viewed that data, minimizing the need for consumer-driven custom pipelines because Iceberg could plug into existing consumers.

Maintenance and durability

Iceberg’s interoperability provided a single copy of the data to satisfy their use cases, so the Medidata team could focus its observation and maintenance efforts on a five-times smaller subset of operations than previously required. Observability was enhanced by tapping into the various metadata components and metrics exposed by Iceberg and the Data Catalog. Quality management transformed from cross-system traces and queries to a single analysis of unified pipelines, with an added benefit of point in time data queries thanks to the Iceberg snapshot feature. Data volume increases are handled with out-of-box scaling supported by the entire infrastructure stack and AWS Glue Iceberg optimization features that include compaction, snapshot retention, and orphan file deletion, which provide a set-and-forget experience for solving a number of common Iceberg frustrations, such as the small file problem, orphan file retention, and query performance.

Security

With Iceberg at the center of its solution architecture, the Medidata team no longer had to spend the time building custom access control layers with enhanced security features at each data integration point. Iceberg on AWS centralizes the authorization layer using familiar systems such as AWS Identity and Access Management (IAM), providing a single and durable control for data access. The data also stays entirely within the Medidata virtual private cloud (VPC), further reducing the opportunity for unintended disclosures.

Conclusion

In this post, we demonstrated how legacy universe of consumer-driven custom ETL pipelines can be replaced with a scalable, high-performant streaming lakehouses. By putting Iceberg on AWS at the center of data operations, you can have a single source of data for your consumers.

To learn more about Iceberg on AWS, refer to Optimizing Iceberg tables and Using Apache Iceberg on AWS.


About the authors

Mike Araujo

Mike is a Principal Engineer at Medidata Solutions, working on building a next generation data and AI platform for clinical data and trials. By using the power of open source technologies such as Apache Kafka, Apache Flink, and Apache Iceberg, Mike and his team have enabled the delivery of billions of clinical events and data transformations in near real time to downstream consumers, applications, and AI agents. His core skills focus on architecting and building big data and ETL solutions at scale as well as their integration in agentic workflows.

Sandeep Adwankar

Sandeep is a Senior Product Manager at AWS, who has driven feature launches across Amazon SageMaker, AWS Glue, and AWS Lake Formation. He has led initiatives in Amazon S3 Tables analytics, Iceberg compaction strategies, and AWS Glue Iceberg optimizations. His recent work focuses on generative AI and autonomous systems, including the AWS Glue Data Catalog model context protocol and Amazon Bedrock structured knowledge bases. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that accelerate their business outcomes.

Ian Beatty

Ian is a Technical Account Manager at AWS, where he specializes in supporting independent software vendor (ISV) customers in the healthcare and life sciences (HCLS) and financial services industry (FSI) sectors. Based in the Rochester, NY area, Ian helps ISV customers navigate their cloud journey by maintaining resilient and optimized workloads on AWS. With over a decade of experience building on AWS since 2014, he brings deep technical expertise from his previous roles as an AWS Architect and DevSecOps team lead for SaaS ISVs before joining AWS more than 3 years ago.

Ashley Chen

Ashley is a Solutions Architect at AWS based in Washington D.C. She supports independent software vendor (ISV) customers in the healthcare and life sciences industries, focusing on customer enablement, generative AI applications, and container workloads.

Improving throughput of serverless streaming workloads for Kafka

Post Syndicated from Anton Aleksandrov original https://aws.amazon.com/blogs/compute/improving-throughput-of-serverless-streaming-workloads-for-kafka/

Event-driven applications often need to process data in real-time. When you use AWS Lambda to process records from Apache Kafka topics, you frequently encounter two typical requirements: you need to process very high volumes of records in close to real-time, and you want your consumers to have the ability to scale rapidly to handle traffic spikes. Achieving both necessitates understanding how Lambda consumes Kafka streams, where the potential bottlenecks are, and how to optimize configurations for high throughput and best performance.

In this post, we discuss how to optimize Kafka processing with Lambda for both high throughput and predictable scaling. We explore the Lambda’s Kafka Event Source Mappings (ESMs) scaling, optimization techniques available during record consumption, how to use ESM Provisioned Mode for bursty workloads, and which observability metrics you need to use for performance optimization.

Overview

To start processing records from a Kafka topic with a Lambda function, whether using Amazon Managed Streaming for Apache Kafka (Amazon MSK) or a self-managed Kafka cluster, you create an ESM: a lightweight serverless resource that consumes records from Kafka topics and invokes your function.

The scaling behavior of Kafka ESMs is based on the offset lag. This is a metric indicating the number of records in the topic that have not yet been consumed by the Lambda function. This metric typically grows when producers publish new records faster than consumers process them. As the lag grows, the Lambda service gradually adds more Kafka consumers (also known as pollers) to your ESM. To preserve ordering guarantees, the maximum number of pollers is capped by the number of partitions in the topic. Lambda also scales pollers down automatically when lag decreases.

Each ESM follows a consistent polling workflow: poll -> filter -> batch -> invoke, as shown in the following diagram. Every stage has configurable options that directly affect performance, latency, and cost.


Figure 1. ESM processing workflow.

Polling: Increasing predictability with Provisioned Mode

By default, Kafka ESM uses the on-demand polling mode. In this mode, ESM starts with one poller, automatically adds more pollers when the offset lag grows, and scales the number of pollers down as lag decreases. On-demand mode does not need upfront scaling configuration and is the lowest-cost option for steady workloads. For many applications, this behavior is sufficient: scaling up can take several minutes, but the throughput eventually catches up, and you only pay for the resources you use, such as number of invocations.

However, if your workloads are bursty and latency-sensitive, then on-demand scaling may not be fast enough and can result in a rapidly growing lag. This can be addressed by switching to Provisioned Mode, which gives you more fine-grained control to configure a minimum and maximum number of always-on pollers for your Kafka ESM. These pollers remain connected even when traffic is low, so consumption begins immediately when a spike occurs, and scaling within the configured range is faster and more predictable.

The following diagram shows the performance improvements of using the ESM in Provisioned Mode for bursty workloads. You can see that in on-demand mode it took ESM over 15 minutes to eventually catch up to the new traffic volume, while in Provisioned Mode the ESM handled the traffic increase instantly.


Figure 2. Comparing Kafka ESM on-demand and Provisioned Mode.

Best practices for using Provisioned Mode:

  • Start small: Provisioned Mode is a paid capability. AWS recommends that for smaller topics (less than 10 partitions) you start with a single provisioned poller to evaluate throughput and observe workload behavior. For larger topics, you can start with a higher number of provisioned pollers to accommodate the baseline consumption. You can adjust this configuration at any time as you learn traffic patterns and refine your performance targets.
  • Estimate throughput: A single provisioned poller can process up to 5 MB/s of Kafka data. Monitor your average record size and per-record processing time to establish a baseline for minimum and maximum pollers, then validate with real workload metrics.
  • Set a low floor and flexible ceiling: Choose a minimum number of pollers that makes sure that latency targets are met when a traffic burst occurs, then allow the ESM to scale toward a higher maximum as needed.

See Low latency processing for Kafka event sources for more information.

To summarize:

  • Use Provisioned Mode for bursty traffic, strict SLOs, or when backlogs pose downstream risk.
  • Use on-demand polling mode for steady traffic, flexible latency requirements, or when minimizing cost is the primary objective.

Filtering: Drop irrelevant records early

By default, all records from Kafka are delivered to your Lambda function. This approach is direct and flexible. Your handler code decides which records to process and which to ignore. This default behavior is highly efficient for workloads where nearly all records are valuable.

When you find yourself discarding a large portion of records in your handler code, you can use native ESM filtering capabilities to drop irrelevant records before they reach your function. You can filter early to reduce cost, free up concurrency, increase throughput, and make sure that your Lambda function spends cycles on valuable work only.

The following diagram shows the application of an ESM filter to only process telemetry that meets a specified condition.


Figure 3. ESM filtering configuration.

Batching: Processing more records per invocation

You can batch multiple Kafka records together to process more data per invocation and increase the efficiency of your Lambda functions. Larger batches help you achieve higher throughput and reduce costs by making better use of each invocation run. To get the best results, you should balance batch size and latency targets and adjust the configuration based on your workload’s specific traffic patterns and SLOs.

Lambda gives you two primary controls for configuring ESM batching behavior:

  • Batch window: This is how long the ESM waits to accumulate records before invoking your function. A shorter window produces smaller batches and more frequent invocations. A longer window (up to 5 minutes) produces larger batches and less frequent invocations.
  • Batch size: This is the maximum number of records that the ESM can accumulate before invoking your function, up to 10,000.

There’s no single setting that universally works for all workloads. Your optimal configuration depends on workload characteristics such as latency tolerance and record size. AWS recommends starting with the default values and then gradually adjusting the configuration based on your requirements. For example, you can increase the batch size while monitoring function duration, error rates, and end-to-end latency.

The following diagram shows how to configure batch window and size using Terraform:


Figure 4. ESM batch window and batch size configuration with Terraform.

The ESM invokes your function when one of the following three conditions is met:

  1. The batch window elapses.
  2. The accumulated batch reaches the configured maximum batch size.
  3. The accumulated payload approaches the 6 MB maximum invocation payload limit of Lambda.

When using higher batch window values during traffic spikes, you typically see more records-per-batch and longer function invocation durations. This is normal: larger batches can take longer to process. Always interpret the Duration metric in the context of the batch size being processed.

Invoke: Process each batch faster and more efficiently

You control how quickly each batch completes through two main factors: the efficiency of your function code and the compute resources you allocate to your functions. You can improve both to process more records per second, reduce the necessary concurrency, and lower cost.

Optimize your code: Review your function handler code to identify where you can reduce work per record. For example, eliminate redundant serialization, initialize dependencies once during function startup, and consider parallel processing within the handler (where applicable). For performance-critical workloads, you can also choose languages that compile to binary, such as Go or Rust, which typically deliver high performance with lower resource usage.

Tune compute resources: Increasing the memory function allocation proportionally increases vCPU. Use the Lambda PowerTuning tool to find the memory configuration that best balances performance and cost for your workload.

Correlate metrics: As you optimize, monitor Duration and Concurrency. You should see the concurrency drop as duration improves. That correlation confirms that your changes are improving the system throughput and efficiency.

When you combine handler optimizations with early filtering and efficient batching, even small improvements can make your pipeline noticeably faster to operate under load.

Observability drives good decisions

You can’t optimize what you can’t see. To tune your data processing pipeline, use a combination of OffsetLag, function invocation metrics, and Kafka broker metrics to understand your data processing performance. OffsetLag tells you whether your function is keeping up with incoming records, as shown in the following figure. Function metrics such as Duration, Concurrency, Errors, and Throttles show how efficiently your code is processing record batches. If you use Provisioned Mode, then you can use the Provisioned Pollers metric to track the poller capacity.


Figure 5. Kafka consumption observability with Amazon CloudWatch.

Always interpret function duration in the context of batch size. During traffic spikes, you can typically observe both duration and actual batch size increase, which is expected amortization, not a regression. For alerting, monitor lag growth, unexpected drops in invocation rate, and error spikes. With these signals in place, you can detect issues early and tune your configuration with confidence.

A sample step-by-step optimization loop

  1. Establish a clean baseline: Make your handler idempotent and batch-aware, start with a short batch window and moderate batch size. Monitor your ESM and confirm offset lag stays near zero at steady state.
  2. Filter early: Move static checks (record type, version, other custom properties) into ESM filtering and verify invoked counts drop relative to polled counts, proving the filter saves cost and concurrency.
  3. Increase batch size gradually while monitoring the duration, error rates, and latency metrics. Extend the batch window slightly if spikes cause too many invocations.
  4. Speed up the handler: Increase memory for more CPU, reduce per-record I/O, remove redundant serialization, and parallelize safely inside the batch while tracking duration and concurrency metrics together.
  5. Prove spike readiness: Replay realistic surges, monitor offset lag and drain time, and enable Provisioned Mode with a small minimum if recovery takes too long, adjusting with MB/s-per-poller estimates.
  6. Implement alerting: Watch for sustained lag growth, unexpected gaps between polled and invoked, and error spikes tied to partitions or large batches. Always read metrics in context with batch size.
  7. Re-evaluate periodically: Re-measure system throughput, confirm filter effectiveness, and retune batch and memory settings regularly as workloads evolve.

Conclusion

Optimizing Kafka streams processing with AWS Lambda necessitates understanding how ESMs work and tuning consumption components: polling, filtering, batching, and invoking. Filtering redundant records early removes unnecessary work, batching helps you process more records per invocation, and handler optimizations make sure that you make the most of the compute that you allocate. Together, these adjustments let you scale efficiently and keep offset lag under control.

When your workload is bursty, use Provisioned Mode to absorb spikes without long recovery times. With the right alerts on lag, errors, and unexpected polled versus invoked behavior, you can spot problems early and adjust before they impact users. Following this optimization guide gives you a practical way to measure, tune, and revisit your setup as traffic patterns change.

To learn more about optimizing Kafka consumption, see the AWS re:Invent 2024 session about Improving throughput and monitoring of serverless streaming workloads.

To learn more about building Serverless architectures see Serverless Land.

How Yelp modernized its data infrastructure with a streaming lakehouse on AWS

Post Syndicated from Umesh Dangat original https://aws.amazon.com/blogs/big-data/how-yelp-modernized-its-data-infrastructure-with-a-streaming-lakehouse-on-aws/

This is a guest post by Umesh Dangat, Senior Principal Engineer for Distributed Services and Systems at Yelp, and Toby Cole, Principle Engineer for Data Processing at Yelp, in partnership with AWS.

Yelp processes massive amounts of user data daily—over 300 million business reviews, 100,000 photo uploads, and countless check-ins. Maintaining sub-minute data freshness with this volume presented a significant challenge for our Data Processing team. Our homegrown data pipeline, built in 2015 using then-modern streaming technologies, scaled effectively for many years. As our business and data needs evolved, we began to encounter new challenges in managing observability and governance across an increasingly complex data ecosystem, prompting the need for a more modern approach. This affected our outage incidents, making it harder to both assess impact and restore service. At the same time, our streaming framework struggled with Kafka for data streaming and permanent data storage. In addition, our connectors to analytical data stores experienced latencies exceeding 18 hours.

This came to a head when our efforts to comply with General Data Protection Regulation (GDPR) requirements revealed gaps in our infrastructure that would require us to clean up our data, while simultaneously maintaining operational reliability and reducing data processing times. Something had to change.

In this post, we share how we modernized our data infrastructure by embracing a streaming lakehouse architecture, achieving real-time processing capabilities at a fraction of the cost while reducing operational complexity. With this modernization effort, we reduced analytics data latencies from 18 hours to mere minutes, while also removing the need for using Kafka as a permanent storage for our change log streams.

The problem: Why we needed change

We started this transformation by initiating a migration from self-managed Apache Kafka to Amazon Managed Streaming for Apache Kafka (Amazon MSK), which significantly reduced our operational overhead and enhanced security. Amazon MSK’s express brokers also provided better elasticity for our Apache Kafka clusters. While these improvements were a promising start, we recognized the need for a more fundamental architectural change

Legacy architecture pain points

Let’s examine the specific challenges and limitations of our previous architecture that prompted us to seek a modern solution.

The following diagram depicts Yelp’s original data architecture.

Kafka topics proliferated across our infrastructure, creating long processing chains. As a result, each hop added latency, operational overhead, and storage costs. The system’s reliance on Kafka for both ingestion and storage created a fundamental bottleneck—Kafka’s architecture, optimized for high-throughput messaging, wasn’t designed for long-term storage and to handle complex querying patterns.

Another challenge was our custom “Yelp CDC” format—a proprietary change data capture language—was powerful and tailored to our needs. However, as our team grew and our use cases expanded, it introduced complexity and a steeper learning curve for new engineers. It also made integrations with off-the-shelf systems more complex and maintenance intensive.

The cost and latency trade-off

The traditional trade-off between real-time processing and cost efficiency had us caught in an expensive bind. Real-time streaming systems demand significant resources to maintain state within compute engines like Apache Flink, keep multiple copies of data across Kafka clusters, and run always-on processing jobs. Our infrastructure costs were growing, and it was largely driven by:

  • Long Kafka chains: Data often traversed 4-5 Kafka topics before reaching its destination and each topic was replicated for reliability
  • Duplicate data storage: The same data existed in multiple formats across different systems—raw in Kafka, processed in intermediate topics, and final forms in data warehouses and Flink RocksDB for join-like use cases
  • Complex custom tooling maintenance: The proprietary nature of our tools meant engineering resources were focused on maintenance rather than building new capabilities

Meanwhile, our business requirements became more demanding. Teams at Yelp needed faster insights, near-real-time results, and the ability to quickly run complex historical analyses without delay. This pushed us to shape our new architecture to improve streaming discovery and metadata visibility, provide more flexible transformation tooling, and simplify operational workflows with faster recovery times.

Understanding the streamhouse concept

To understand how we solved our data infrastructure challenges, it’s important to first grasp the concept of a streamhouse and how it differs from traditional architectures.

Evolution of data architecture

To understand why a streaming lakehouse or streamhouse was the answer to our challenges, it’s helpful to trace the evolution of data architectures. The journey from data warehouses to modern streaming systems reveals why each generation solved certain problems while creating new ones.

Data warehouses like Amazon Redshift and Snowflake brought structure and reliability to analytics, but their batch-oriented nature meant accepting hours or days of latency. Data lakes emerged to handle the volume and variety of big data, using low-cost object storage like Amazon S3, but often became “data swamps” without proper governance. The lakehouse architecture, pioneered by technologies like Apache Iceberg and Delta Lake, promised to combine the best of both, the structure of warehouses with the flexibility and economics of lakes.

But even lakehouses were designed with batch processing in mind. While they added streaming capabilities, these were often bolted on rather than fundamental to the architecture. What we needed was something different: a reimagining that treated streaming as a first-class citizen while maintaining lakehouse economics.

What makes a streamhouse different

A streamhouse, as we define it, is “a stream processing framework with a storage layer that leverages a table format, making intermediate streaming data directly queryable.” This seemingly simple definition represents a fundamental shift in how we think about data processing.

Traditional streaming systems maintain dynamic tables like materialized views in databases, but these aren’t directly queryable. You can only consume them as streams, limiting their utility for ad-hoc analysis or debugging. Lakehouses, conversely, excel at queries but struggle with low-latency updates and complex streaming operations like out-of-order event handling or partial updates.

The streamhouse bridges this gap by:

  • Treating batch as a special case of streaming, rather than a separate paradigm
  • Making data, including intermediate processing results, queryable via SQL
  • Providing streaming-native features like database change-data capture (CDC) and temporal joins
  • Leveraging cost-effective object storage while maintaining minute-level latencies

Core capabilities we needed

Our requirements for a streaming lakehouse were shaped by years of operating at scale:

Real-time processing with minute-level latency: While sub-second latency wasn’t necessary for most use cases, our previous hours-long delays weren’t acceptable. The sweet spot was processing latencies measured in minutes fast enough for real-time decision-making but relaxed enough to leverage cost-effective storage.

Efficient CDC handling: With numerous MySQL databases powering our applications, the ability to efficiently capture and process database changes was crucial. The solution needed to handle both initial snapshots and ongoing changes seamlessly, without manual intervention or downtime.

Cost-effective scaling: The architecture had to break the linear relationship between data volume and cost. This meant leveraging tiered storage, with hot data on fast storage and cold data on low-cost object storage, all while maintaining query performance.

Built-in data management: Schema evolution, data lineage, time travel queries, and data quality controls needed to be first-class features, not afterthoughts. Our experience maintaining our custom Schematizer taught us that these capabilities were essential for operating at scale.

The solution architecture

Our modernized data infrastructure combines several key technologies into a cohesive streamhouse architecture that addresses our core requirements while maintaining operational efficiency.

Our technology stack selection

We carefully selected and integrated several proven technologies to build our streamhouse solution.The following diagram depicts Yelp’s new data architecture.

After extensive evaluation, we assembled a modern streaming lakehouse stack, streamhouse, built on proven open source technologies:

Amazon MSK continues to deliver existing streams as they did before from source applications and services.

Apache Flink on Amazon EKS served as our compute engine, a natural choice given our existing expertise and investment in Flink-based processing. Its powerful stream processing capabilities, exactly-once semantics, and mature framework made it ideal for the computational layer.

Apache Paimon emerged as the key innovation, providing the streaming lakehouse storage layer. Born from the Flink community’s FLIP-188 proposal for built-in dynamic table storage, Paimon was designed from the ground up for streaming workloads. Its LSM-tree-based architecture provided the high-speed ingestion capabilities we needed.

Amazon S3 serves as our streamhouse storage layer, offering highly scalable capacity at a fraction of the cost. The shift from compute-coupled storage (Kafka brokers) to object storage represented a fundamental architectural change that unlocked massive cost savings.

Flink CDC connectors replaced our custom CDC implementations, providing battle-tested integrations with databases like MySQL. These connectors handled the complexity of initial snapshots, incremental updates, and schema changes automatically.

Architectural transformation

The transformation from our legacy architecture to the streamhouse model involved three key architectural shifts:

1. Decoupling ingestion from storage

In our old world, Kafka handled both data ingestion and storage, creating an expensive coupling. Every byte ingested had to be stored on Kafka brokers with replication for reliability. Our new architecture separated these concerns: Flink CDC handled ingestion by immediately writing to Paimon tables backed by S3. This separation reduced our storage costs by over 80% and improved reliability through the 11 nines of durability of S3.

2. Unified data format

The migration from our proprietary CDC format to the industry-standard Debezium format was more than a technical change. It reflected a broader move toward community-supported standards. We built a Data Format Converter that bridged the gap, allowing legacy streams to continue functioning while new streams leveraged standard formats. This approach facilitated backward compatibility while paving the way for future simplification.

3. Streamhouse tables

Perhaps the most radical change was replacing some of our Kafka topics with Paimon tables. These weren’t just storage locations—they were dynamic, versioned, queryable entities that supported:

  • Time travel queries in the table’s snapshot retention period
  • Automatic schema evolution without downtime
  • SQL-based access for both streaming and batch workloads
  • Built-in compaction and optimization

Key design decisions

Several key design decisions shaped our implementation:

SQL as the primary interface: Rather than requiring developers to write Java or Scala code for every transformation, SQL became our lingua franca. This democratized access to streaming data, allowing analysts and data scientists to work with real-time data using familiar tools.

Separation of compute and storage: By decoupling these layers, we could scale them independently. A spike in processing needs no longer meant provisioning more storage, and historical data could be kept indefinitely without impacting compute costs.

Embracing open source standards: The shift from home-grown formats and tools to community-supported projects reduced our maintenance burden and accelerated feature development. When issues arose, our engineers could leverage community knowledge rather than debugging in isolation.

Implementation journey

Our transition to the new streamhouse architecture followed a carefully planned path, encompassing prototype development, phased migration, and systematic validation of each component.

Migration strategy

Our migration to the streamhouse architecture required careful planning and execution. The strategy had to balance the need for transformation with the reality of maintaining critical production systems.

1. Prototype development

Our journey began with building foundational components:

  • Pure Java client library: Removing Scala dependencies were crucial for broader adoption. Our new library removed reliance on Yelp-specific configurations, allowing it to run in many environments.
  • Data Format Converter: This bridge component translated between our proprietary CDC format and the standard Debezium format, making sure existing consumers could continue operating during the migration.
  • Paimon ingestor: A Flink job that could ingest data from Kafka sources into Paimon tables, handling schema evolution automatically.

2. Phased rollout approach

Rather than attempting a “big bang” migration, we adopted a per-use case approach—moving a vertical slice of data rather than the entire system at once. Our phased rollout followed these steps:

  • Select a representative, real-world use case that provides broad coverage of the existing feature set.
    • In our use case, this included data sourced from both databases and event streams, with writes going to Cassandra and Nrtsearch
  • Re-implement the use case on the new stack in a development environment using sample data to test the logic
  • Shadow-launch the new stack in production to test it at scale
    • This was a critical step for us, as we had to iterate through various configuration tweaks before the system could reliably sustain our production traffic.
  • Verify the new production deployment against the legacy system’s output
  • Switch live traffic to the new system only after both the Yelp Platform team and data owners are confident in its performance and reliability
  • Decommission the legacy system for that use case once the migration is complete

This phased approach allowed our team to build confidence, identify issues early, and refine our processes before touching business-critical systems in production.

Technical challenges we overcame

The migration surfaced several technical challenges that required innovative solutions:

System integration: We developed comprehensive monitoring to track end-to-end latencies and built automated alerting to detect any degradation in performance.

Performance tuning: Initial write performance to Paimon tables was suboptimal for our higher-throughput streams. After careful analysis, we identified that Paimon was re-reading manifest files from S3 on every commit. To alleviate this, we enabled Paimon’s sink writer coordinator cache setting, which is disabled by default. This massively reduced the number of S3 calls during commits. We also found that writing parallelism in Paimon is limited by the number of “buckets” within a partition. Selecting the right number of buckets to allow you to scale horizontally, but also not spread your data too thinly is important for balancing write performance against query performance.

Data validation: Validating data consistency between our legacy Yelp CDC streams and the new Debezium-based format presented notable challenges. During the parallel run phase, we implemented comprehensive validation frameworks to make sure the Data Format Convertor accurately transformed messages, while maintaining data integrity, ordering guarantees, and schema compatibility across both systems.

Data migration complexity: For consistency, we developed custom tooling to verify ordering guarantees and implemented parallel running of old and new systems. We chose Spark as the framework to implement our validations as every data source and sink in our framework has mature connectors, and Spark is a well-supported system at Yelp.

Practical wins we achieved

Our implementation delivered transformative results:

Simplified streaming stack: By replacing multiple custom components with standardized tools, we avoided years of technical debt in one migration. We reduced our complexity and thereby simplified our entire streaming architecture, leading to higher reliability and less maintenance overhead. Our Schematizer, encryption layer, and custom CDC format were all replaced by built-in features from Paimon and standard Kafka, along with IAM controls across S3 and MSK.

Fine-grained access management: Moving our analytical use cases read via Iceberg unlocked a huge win for us: the ability to enable AWS Lake Formation on our data lake. Previously, our access management relied on large, complex S3 bucket policy documents that were approaching their size limits. By moving to Lake Formation we could build an access request lifecycle into our in-house Access Hub to automate access granting and revocation.

Built-in data management features: Capabilities that would have required months of custom development came out-of-the-box, such as automatic schema evolution, time travel queries, and incremental snapshots for efficient processing.

Potential for reduced operational costs: We anticipate that transitioning from Kafka storage to S3 in a streamhouse architecture will significantly reduce storage costs. Avoiding long Kafka chains will also simplify data pipelines and reduce compute costs.

Enhanced troubleshooting capabilities: The streamhouse architecture promises built-in observability features that will make debugging easier. Rather than having to manually look through event streams for problematic data, which can be time-consuming and complex for multi-stream pipelines, engineers can now query live data directly from tables using standard SQL.

Lessons learned and best practices

Throughout this transformation, we gained valuable insights about both technical implementation and organizational change management that can benefit others undertaking similar modernization efforts.

Technical insights

Our journey revealed several crucial technical lessons:

Battle-tested open source wins: Choosing Apache Paimon and Flink CDC over custom solutions proved wise. The community support, continuous improvements, and shared knowledge base accelerated our development and reduced risk.

SQL interfaces democratize access: Making streaming data accessible via SQL transformed who could work with real-time data. Engineers and analysts familiar with SQL can now understand how streaming pipelines work. The barrier to entry has been significantly lowered as engineers no longer need to understand Flink-specific APIs to create a streaming application.

Separation of storage and compute is fundamental: This architectural principle unlocked cost savings and operational flexibility that wouldn’t have been possible otherwise. Our teams can now optimize storage and compute independently based on their specific needs.

Organizational learnings

The human side of the transformation was equally important:

Phased migration reduces risk: Our gradual approach allowed teams to build confidence and expertise, while maintaining business continuity. Each successful phase created momentum for the next. Building trust with newer systems helps gain velocity in later stages of migrations.

Backward compatibility enables progress: By maintaining compatibility layers, our teams could migrate at their own pace without forcing synchronized changes across the organization.

Investment in learning pays dividends: Giving our teams space to learn new technologies like Paimon and streaming SQL had some opportunity cost, but they paid off through increased productivity and reduced operational burden.

Conclusion

Our transformation to a streaming lakehouse architecture (streamhouse) has revolutionized Yelp’s data infrastructure, delivering impressive results across multiple dimensions. By implementing Apache Paimon with AWS services like Amazon S3 and Amazon MSK, we reduced our analytics data latencies from 18 hours to just minutes while cutting storage costs by 80%. The migration also simplified our architecture by replacing multiple custom components with standardized tools, significantly reducing maintenance overhead and improving reliability.

Key achievements include the successful implementation of real-time processing capabilities, streamlined CDC handling, and enhanced data management features like automatic schema evolution and time travel queries. The shift to SQL-based interfaces has democratized access to streaming data, while the separation of compute and storage has given us unprecedented flexibility in resource optimization. These improvements have transformed not just our technology stack, but also how our teams work with data.

For organizations facing similar challenges with data processing latency, operational costs, and infrastructure complexity, we encourage you to explore the streamhouse approach. Start by evaluating your current architecture against modern streaming solutions, particularly those leveraging cloud services and open-source technologies like Apache Paimon. Make sure to leverage security best practices when implementing your solution. You can find AWS security best practices here. Visit the Apache Paimon website or AWS documentation to learn more about implementing these solutions in your environment.


About the authors

Umesh Dangat

Umesh Dangat

Umesh is a Senior Principal Engineer for Distributed Services and Systems at Yelp, where he architects and leads the evolution of Yelp’s high-performance distributed systems—spanning streaming, storage, and real-time data retrieval. He oversees the core infrastructure powering Yelp’s search, ranking, and data platforms, driving engineering efficiency by improving platform scalability, reliability, and alignment with business needs. Umesh is also an active open source contributor to projects such as Elasticsearch, NrtSearch, Apache Paimon, and Apache Flink CDC.

Toby Cole

Toby Cole

Toby is a Principle Engineer for Data Processing at Yelp, and has been working with distributed systems for the past 18 years. He has lead groundbreaking efforts to containerize datastores like Cassandra and is currently building Yelps next generation of streaming infrastructure. You can often find him petting cats and taking apart electrical devices for no apparent reason.

Ali Alemi

Ali Alemi

Ali is a Principal Streaming Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Bryan Spaulding

Bryan Spaulding

Bryan is a Senior Solutions Architect at AWS. Bryan works with AdTech customers to advise on their technology strategy, apply best practice AWS architecture patterns, and champion their interests within AWS. Prior to joining AWS, Bryan served in technology leadership roles in various Media & Entertainment and EdTech startups where he was also an AWS customer himself, and early in his career was a consultant in multiple professional services firms.

Amazon MSK Express brokers now support Intelligent Rebalancing for 180 times faster operation performance

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/amazon-msk-express-brokers-now-support-intelligent-rebalancing-for-180-times-faster-operation-performance/

Effective today, all new Amazon Managed Streaming for Apache Kafka (Amazon MSK) Provisioned clusters with Express brokers will support Intelligent Rebalancing at no additional cost. With this new capability you can perform automatic partition balancing operations when scaling Apache Kafka clusters up or down. Intelligent Rebalancing maximizes the capacity utilization of Amazon MSK clusters with Express brokers by optimally rebalancing Kafka resources on them for better performance, eliminating the need to manage partitions independently or by using third-party tools. Intelligent Rebalancing on Amazon MSK Express brokers performs these operations up to 180 times faster compared to Standard brokers.

We launched Amazon MSK Express brokers in November 2024 to reimagine Apache Kafka for ease of use, best-in-class price performance, and predictable availability. Amazon MSK Express brokers are designed to deliver up to three times more throughput per-broker, scale up to 20 times faster, and reduce recovery time by 90 percent as compared to Standard brokers running Apache Kafka. Since launch, we have expanded Amazon MSK Express brokers to additional AWS Regions, instance types, and most recently increased support to 5x more partitions per Express broker, improving price-performance by up to 50% for partition-bound workloads.

With Intelligent Rebalancing, Amazon MSK Express broker clusters are continuously monitored for resource imbalance or overload based on intelligent Amazon MSK defaults to maximize cluster performance. When required, brokers are efficiently scaled, without affecting cluster availability for clients to produce and consume data. Customers can now take full advantage of the scaling and performance benefits of Amazon MSK Provisioned clusters for Express brokers while simplifying cluster management operations.

In this post we’ll introduce the Intelligent Rebalancing feature and show an example of how it works to improve operation performance.

When to use Intelligent Rebalancing

With Intelligent Rebalancing, Amazon MSK Express brokers now offer a fully automated solution for managing and scaling Kafka clusters, requiring no additional tools or configuration. Intelligent Rebalancing is enabled by default on all new Amazon MSK Express brokers clusters, so we recommend always keeping it on. Intelligent Rebalancing uses Amazon MSK best practices to trigger automatic rebalancing during the following situations:

  • Scaling in and out clusters: When customers add or remove brokers from their Amazon MSK Express brokers clusters, Intelligent Rebalancing automatically redistributes partitions to balance resource utilization across the brokers. This ensures that the cluster continues to operate at peak performance, making scaling in and out possible with a single update operation.
  • Steady-state rebalancing: Even during normal operations, Intelligent Rebalancing continuously monitors the Amazon MSK Express brokers cluster and triggers rebalancing when it detects resource imbalances or hotspots. For example, if certain brokers become overloaded due to uneven distribution of partitions or skewed traffic patterns, Intelligent Rebalancing will automatically move partitions to less utilized brokers to restore balance.

How to use Intelligent Rebalancing

To demonstrate the power of Intelligent Rebalancing, let’s run a few tests on an Amazon MSK Express brokers cluster:

Scaling test: We’ll start by creating an Amazon MSK Express brokers cluster with 3 brokers. We’ll then rapidly scale the cluster up to 6 brokers and back down to 3 brokers, simulating a sudden spike in workload. With Intelligent Rebalancing enabled, you’ll see that the rebalancing of partitions is completed within 5-10 minutes, so that the cluster can sustain the increased throughput without any drop in performance.


You can track the current and historical rebalancing operations using the metric RebalanceInProgress. In the picture below, you can also see that the clients on the producer side are not impacted during this rebalancing.

Next, we’ll create an imbalance in the cluster by directing a large portion of the traffic to a single broker. You’ll see that Intelligent Rebalancing detects this imbalance within minutes and automatically redistributes the partitions, restoring the cluster to an optimal state.

The intelligent rebalancing feature detects hotspots and automatically redistributes affected partitions across other brokers to optimize resource utilization. Without Intelligent Rebalancing, the resource imbalance would persist, potentially leading to performance issues or the need for manual intervention by the customer.

These tests showcase how Intelligent Rebalancing with Amazon MSK Express brokers enables scaling Kafka clusters seamlessly while maintaining consistently high performance, even under varying workload conditions.

Conclusion

Intelligent Rebalancing for Amazon MSK Provisioned clusters with Express brokers are currently being rolled out over the next few weeks in all AWS Regions where Amazon MSK Express brokers are supported. This feature is automatically enabled for all new Amazon MSK Provisioned clusters with Express brokers at no additional cost.

To get started, visit the Amazon MSK console. For more information, see the Amazon MSK Developer Guide.


About the authors

Swapna Bandla

Swapna Bandla

Swapna is a Senior Streaming Solutions Architect at AWS. With a deep understanding of real-time data processing and analytics, she partners with customers to architect scalable, cloud-native solutions that align with AWS Well-Architected best practices. Swapna is passionate about helping organizations unlock the full potential of their data to drive business value. Beyond her professional pursuits, she cherishes quality time with her family.

Masudur Rahaman Sayem

Masudur Rahaman Sayem

Masudur is a Streaming Data Architect at AWS with over 25 years of experience in the IT industry. He collaborates with AWS customers worldwide to architect and implement sophisticated data streaming solutions that address complex business challenges. He has a keen interest and passion for distributed architecture, which he applies to designing enterprise-grade solutions at internet scale.

Shakhi Hali

Shakhi Hali

Shakhi is a Principal Product Manager for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She is passionate about helping customers generate business value from real-time data. Before joining MSK, Shakhi was a PM with Amazon S3. In her free time, Shakhi enjoys traveling, cooking, and spending time with family.

Introducing AWS Lambda event source mapping tools in the AWS Serverless MCP Server

Post Syndicated from Ben Freiberg original https://aws.amazon.com/blogs/compute/introducing-aws-lambda-event-source-mapping-tools-in-the-aws-serverless-mcp-server/

Modern serverless applications increasingly rely on event-driven architectures, where AWS Lambda functions process events from various sources like Amazon Kinesis, Amazon DynamoDB Streams, Amazon Simple Queue Service (Amazon SQS), Amazon Managed Streaming for Apache Kafka (Amazon MSK), and self-managed Apache Kafka.

Although event source mappings (ESM) offer a powerful mechanism for integrating AWS Lambda with stream and queue-based sources, configuring them to align with high-level architectural goals can sometimes involve navigating a broad set of options and parameters. Achieving an optimal configuration typically requires mapping developer intent to several technical settings, which can introduce inefficiencies or operational overhead.

In May 2025, AWS launched the AWS Serverless MCP Server, which provided AI-powered assistance for serverless application development, including infrastructure provisioning, deployment automation, and architectural guidance. Building on this foundation, AWS is now expanding the Serverless MCP Server to include specialized ESM tools.

These new dedicated tools in the AWS Serverless Model Context Protocol (MCP) Server combine the power of AI assistance with ESM expertise to enhance how developers build and manage event-driven serverless applications using Lambda. The new ESM tools provide contextual guidance specific to ESM configuration that address the challenges of event-driven development.

This post describes how the new tools under Serverless MCP Server work with AI coding assistants to streamline event source mapping management. Learn how to use this solution to accelerate your event-driven development workflow and build robust, high-performing applications more efficiently.

Overview

An event source mapping is a Lambda resource that reads items from stream and queue-based services and invokes a function with batches of records. Within an event source mapping, resources called event pollers actively poll for new messages and invoke functions. Using ESMs, AWS Lambda functions can automatically consume events from various sources without requiring custom polling infrastructure. Lambda handles the complexity of scaling, batching, filtering, and error handling, helping developers focus on business logic.

Navigating ESM configurations

Configuring these mappings optimally, especially for virtual private cloud (VPC)-based sources like Apache Kafka, requires additional understanding of networking, permissions, and performance tuning.

When working with event source mappings, developers need to address several technical considerations. For Kafka Streams using VPC-based Amazon Managed Streaming for Apache Kafka or self-managed Apache Kafka, configurations involve networking setup to enable Lambda access to Kafka topics. Developers must manage bootstrap servers, AWS Identity and Access Management (IAM) permissions, and topic access settings, while also handling authentication including SASL/SCRAM credentials, mTLS certificate management, and Kafka ACL permissions.

Developers need to know how to translate performance requirements, such as processing 1,000 events per second, into specific ESM parameter configurations. Depending on the stream source, this involves determining appropriate batch sizes, parallelization factors, and retry policies while managing iterator age, offset lag and potential timeout issues. Additionally, developers need visibility into configuration effectiveness and other diagnostic information to optimize resource allocation and ensure reliable event processing.

Dedicated event source mapping tools

The new ESM tools in the open source AWS Serverless MCP Server address these challenges by providing AI assistants with proven knowledge of event source mapping patterns and best practices. These tools guide developers through the entire ESM lifecycle, from initial setup to optimization and troubleshooting. They also enhance the event-driven development experience by translating the developers intent into detailed, technical configuration, helping developers express high-level goals such as desired throughput, latency, or reliability requirements. The new tools cover all areas of event source mapping management:

  • Setup and configuration: Developers initialize new event source mapping configurations using AWS Serverless Application Model (AWS SAM) templates, select appropriate event source settings, and configure networking requirements for VPC-based sources like Amazon MSK.
  • Optimization and tuning: As applications evolve, the tools assists with fine-tuning ESM parameters like batch size, batching window, retry policies, and parallelization factors based on performance goals and telemetry data.
  • Troubleshooting and diagnostics: Specialized tools diagnose ESM connectivity issues, analyze Amazon CloudWatch Logs and metrics, and recommend solutions for common problems like VPC misconfigurations or permission errors.

Event source mapping tools in action

This example walks you through a scenario of creating, optimizing, and troubleshooting an event source mapping for Amazon MSK to demonstrate the capabilities of the new ESM tools.

Prerequisites and installation

To get started, download or update the AWS Serverless MCP Server from GitHub or Python Package Index (PyPi) and follow the installation instructions. You can use this MCP server with any AI coding assistant of your choice, such as Amazon Q Developer, Cursor, Cline, Kiro, and more.

Add the following code to your MCP client configuration:

{
  "mcpServers": {
    "awslabs.aws-serverless-mcp-server": {
      "command": "uvx",
      "args": [
        "awslabs.aws-serverless-mcp-server@latest"
      ],
      "env": { 
        "AWS_PROFILE": "your-aws-profile",
        "AWS_REGION": "us-east-1",
        "FASTMCP_LOG_LEVEL": "ERROR"
      }
    }
  }
}

The Serverless MCP Server incorporates built-in guardrails to ensure secure and controlled development. By default, the server operates in a read-only mode, allowing only non-mutating actions. With this safety-first approach, you can explore ESM capabilities and architectural patterns while preventing unintended changes to your applications or infrastructure.

Creating and configuring an event source mapping

Imagine you want to set up a Lambda function to process events from an Amazon MSK cluster. Start by prompting your AI assistant:

Create a new Kafka cluster and a VPC named <your-vpc-name> in <your-aws-region>. The cluster should be in the VPC’s private subnets. Then, create a Lambda function to consume from the stream within the same VPC cluster. Prefix all created resources with <your-prefix>.

AI prompt to create a new Kafka cluster and ESM

The agent uses the esm_guidance to receive tailored guidance based on your use case and performance requirements. The tool analyzes your intent and provides step-by-step instructions for setting up the ESM with optimal configurations.

Apart from creating deployment and initialization scripts and supporting documentation, properly configured IAM polices and security groups rules to access the cluster are also generated. The assistant then validates the ESM parameters against AWS limits and best practices.

Next, you want to understand the networking requirements:

My Kafka cluster is in a VPC. What networking configuration do I need for Lambda to access it?

AI assistant prompt for setting up Kafka ESM networking connectivity

The Serverless MCP Server provides specialized guidance for VPC-based Kafka configurations using the esm_guidance tool with guidance_type=”networking”. This guidance provides detailed information about subnet requirements, security group rules, and NAT gateway setup, and it validates your network topology for reliable connectivity.

Optimizing event source mapping performance

After your ESM is running, you notice that processing latency is higher than expected. You can ask for optimization guidance:

I have an ESM with UUID <your-esm-uuid> in <your-aws-region>. My target throughput is between 10 MB/s and 100 MB/s. Please update my ESM configuration to meet these throughput requirements while optimizing the cost of the event pollers.

AI prompt to optimize Kinesis ESM throughput
The server uses the esm_optimize tool to analyze your current configuration and provide optimization recommendations. The tool supports three main actions:

  • Analysis mode: (action="analyze") Analyzes configuration tradeoffs for your optimization targets (throughput, latency, cost, failure rate)
  • Validation mode: (action="validate") Validates your ESM configuration against AWS limits and event source restrictions
  • Template generation: (action="generate_template") Creates updated AWS SAM templates with optimized configurations

You can use this tool to get guidance on your event source mapping configurations for Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. Here are two examples:

I have a Kinesis stream with 100 shards receiving 100 MB/s of data. My Lambda function processes each record in about 50ms. Currently, my ESM has ParallelizationFactor=1 and BatchSize=100, but I’m seeing high iterator age (over 60 seconds) during peak times. How should I optimize my ESM configuration to reduce processing latency and handle the throughput?

AI prompt to optimize Kinesis ESM throughput

I have an SQS standard queue that receives 50,000 messages per hour during peak times. Each message takes about 2 seconds to process. My current ESM configuration has BatchSize=10 and no ScalingConfig set. I’m seeing message delays during peak hours. How should I optimize my ESM configuration for better throughput while keeping costs reasonable?

The tool generates updated AWS Serverless Application Model (AWS SAM) templates with the recommended configurations, making it easy to apply the changes through your deployment pipeline. However, it always requires explicit user confirmation before any deployment.

Troubleshooting event source mapping issues

When an issue arises, the ESM tools provide diagnostic capabilities. For example, if your ESM stops processing events:

I have a cluster called <your-kafka-cluster-name> and a consumer Lambda function named <your-lambda-function-name>in <your-aws-region>. Please investigate why my ESM (UUID: <your-esm-uuid>) trigger is not working and provide updated configurations to resolve the issue.

AI assistant prompt for investigation an issue with Kafka ESM

The server uses the esm_kafka_troubleshoot tool to provide comprehensive troubleshooting for Apache Kafka clusters. The tool supports two main modes:

  • Diagnostic mode: (issue_type="diagnosis") Analyzes your ESM status and provides diagnostic indicators. This helps identify whether timeouts occur before or after reaching Kafka brokers. It categorizes issues into specific types for targeted resolution.
  • Resolution mode: Provides step-by-step resolution guidance for specific issues.

AI prompt to start debugging an issue with a Kafka ESM

The tool automatically detects your event source type and provides tailored guidance. It validates VPC connectivity, examines IAM permissions, checks security group configurations, and analyzes CloudWatch Logs to provide a detailed diagnosis report with specific remediation steps.

Key benefits

The event source mapping tools in the AWS Serverless MCP Server provide unique advantages over traditional event source mapping configuration approaches:

  • AI-powered configuration translation: The tools translate high-level developer intent (such as process 1,000 events per second) into specific ESM parameters like batch size, parallelization factor, and batching window.
  • Complete infrastructure-as-code generation: Unlike generic AWS CLI tools that provide individual commands, ESM tools generate complete AWS SAM templates, initialization scripts, cleanup scripts, and validation scripts for end-to-end automation.
  • Proactive network validation: For VPC-based event sources like Amazon MSK or self-managed Kafka, the tools validate network topology, security group rules, and connectivity before deployment, preventing common silent failures.
  • Context-aware troubleshooting: The diagnostic tools correlate ESM status, CloudWatch metrics, VPC configuration, and IAM permissions to provide comprehensive root cause analysis with specific remediation steps.

New tools available in the Serverless MCP Server

The event source mapping tools are designed to minimize trust permission prompts by using a small set of primary tools that internally call specialized functions. The tools can be classified into three main categories:

  • esm_guidance: This tool provides comprehensive guidance on creating and configuring event source mappings for all event sources (DynamoDB, Kinesis, Kafka, SQS). It handles setup, networking guidance, and troubleshooting based on the guidance_type parameter. The tool automatically generates AWS SAM templates, IAM policies, and security group configurations.
  • esm_optimize: This advanced optimization tool analyzes configuration tradeoffs, validates ESM settings, and generates AWS SAM templates for performance tuning. It supports three actions:
    • analyze: Provides configuration tradeoff analysis for failure rate, latency, throughput, and cost optimization
    • validate: Validates ESM configurations against AWS limits and event source restrictions
    • generate_template: Creates AWS SAM templates with optimized configurations
  • esm_kafka_troubleshoot: This specialized troubleshooting tool for Kafka ESM issues supports both Amazon MSK and self-managed Apache Kafka clusters. It also provides diagnostic capabilities and step-by-step resolution guidance for connectivity, authentication, and network issues.

The primary tools internally call specialized helper functions to provide comprehensive functionality that help generate IAM polices, security groups, scaling and concurrency configurations, and validate configurations.

Visit the Serverless MCP Server documentation for the full list of tools and resources.

Best practices and considerations

When building event-driven applications with the AWS Serverless MCP Server, start by using its guidance tools for architectural decisions. The server helps you choose appropriate event sources, understand networking requirements, and configure optimal settings based on your performance goals.For Kafka-based ESMs, pay special attention to VPC configuration. Use the server’s network troubleshooting tools to validate connectivity before deployment. The server can detect common issues like missing NAT gateways, incorrect security group rules, or subnet routing problems.Monitor your event source mappings continuously using the server’s diagnostic tools. Set up alerts for key metrics like iterator age, error rates, and throttling. The server can help you interpret these metrics and recommend configuration adjustments to maintain optimal performance.

Conclusion

The new event source mapping tools in the open-source AWS Serverless MCP Server simplify event source mapping management throughout the development lifecycle, from initial setup to ongoing optimization and troubleshooting. By combining AI assistance with ESM expertise, it helps developers build and deploy event-driven applications more efficiently while avoiding common configuration pitfalls.

As organizations continue to adopt event-driven serverless computing, tools that simplify ESM management and accelerate delivery become increasingly valuable.

To get started, visit the GitHub repository and explore the documentation. Share your experiences and suggestions through the GitHub repository to improve the MCP server’s capabilities and help shape the future of AI-assisted event-driven development.

For more serverless learning resources, visit Serverless Land.

Unlock real-time data insights with schema evolution using Amazon MSK Serverless, Iceberg, and AWS Glue streaming

Post Syndicated from Nitin Kumar original https://aws.amazon.com/blogs/big-data/unlock-real-time-data-insights-with-schema-evolution-using-amazon-msk-serverless-iceberg-and-aws-glue-streaming/

Efficient real-time synchronization of data within data lakes present challenges. Any data inaccuracies or latency issues can significantly compromise analytical insights and subsequent business strategies. Organizations increasingly require synchronized data in near real-time to extract actionable intelligence and respond promptly to evolving market dynamics. Additionally, scalability remains a concern for data lake implementations, which must accommodate expanding volumes of streaming data and maintain optimal performance without incurring high operational costs.

Schema evolution is the process of modifying the structure (schema) of a data table to accommodate changes in the data over time, such as adding or removing columns, without disrupting ongoing operations or requiring a complete data rewrite. Schema evolution is vital in streaming data environments for several reasons. Unlike batch processing, streaming pipelines operate continuously, ingesting data in real time from sources that are actively serving production applications. Source systems naturally evolve over time as businesses add new features, refine data models, or respond to changing requirements. Without proper schema evolution capabilities, even minor changes to source schemas can force streaming pipeline shutdowns, requiring developers to manually reconcile schema differences and rebuild tables.

Such disruptions reduce the core value proposition of streaming architectures—continuous, low-latency data processing. Organizations can maintain uninterrupted data flows and keep source systems evolving independently by using the seamless schema evolution provided by Apache Iceberg. This reduces operational friction and maintains the availability of real-time analytics and applications even as underlying data structures change.

Apache Iceberg is an open table format, delivering essential capabilities for streaming workloads, including robust schema evolution support. This critical feature enables table schemas to adapt dynamically as source database structures evolve, maintaining operational continuity. Consequently, when database columns undergo additions, removals, or modifications, the data lake accommodates these changes seamlessly without requiring manual intervention or risking data inconsistencies.

Our comprehensive solution showcases an end-to-end real-time CDC pipeline that enables immediate processing of data modifications from Amazon Relational Database Service (Amazon RDS) for MySQL, streaming altered records directly to AWS Glue streaming jobs using Amazon Managed Streaming for Apache Kafka (Amazon MSK) Serverless. These jobs continually process incoming changes and update Iceberg tables on Amazon Simple Storage Service (Amazon S3) so that the data lake reflects the current state of the operational database environment in real time. By using Apache Iceberg’s comprehensive schema evolution support, our ETL pipeline automatically adapts to database schema modifications, providing data lake consistency and currentness without manual intervention. This approach combines complete process control with instantaneous analytics on operational data, eliminating traditional latency, and future-proofs the solution to address evolving organizational data needs. The architecture’s inherent flexibility facilitates adaptation to diverse use cases requiring immediate data insights.

Solution overview

To effectively address streaming challenges, we propose an architecture using Amazon MSK Serverless, a comprehensive managed Apache Kafka service that autonomously provisions and scales computational and storage resources. This solution offers a frictionless mechanism for ingesting and processing streaming data without the complexity of capacity management. Our implementation uses Amazon MSK Connect with the Debezium MySQL connector to capture and stream database modifications in real time. Rather than employing traditional batch processing methodologies, we implement an AWS Glue streaming job that directly consumes data from Kafka topics, processes CDC events as they occur, and writes transformed data to Apache Iceberg tables on Amazon S3.

The workflow consists of the following:

  1. Data flows from Amazon RDS through Amazon MSK Connect using the Debezium MySQL connector to Amazon MSK Serverless. This represents a CDC pipeline that captures database changes from the relational database and streams them to Kafka.
  2. From Amazon MSK Serverless, the data then moves to AWS Glue job, which processes the data and stores it in Amazon S3 as Iceberg tables. The AWS Glue job interacts with the AWS Glue Data Catalog to maintain metadata about the datasets.
  3. Analyze the data using the serverless interactive query service Amazon Athena, which can be used to query the iceberg table created in Data Catalog. This allows for interactive data analysis without managing infrastructure.

The following diagram illustrates the architecture that we implement through this post. Each number corresponds to the preceding list and shows major components that you implement.

Prerequisites

Before getting started, make sure you have the following:

  • An active AWS account with billing enabled
  • An AWS Identity and Access Management (IAM) user with specific permissions to create and manage resources, such as a virtual private cloud (VPC), subnet, security group, IAM roles, NAT gateway, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, MSK Serverless, MSK Connector and its plugin AWS Glue job, and S3 buckets.
  • Sufficient VPC capacity in your chosen AWS Region.

For this post, we create the solution resources in the US East (N. Virginia) – us-east-1 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configuring CDC and processing using AWS CloudFormation

In this post, you use the CloudFormation template vpc-msk-mskconnect-rds-client-gluejob.yaml. This template sets up the streaming CDC pipeline resources such as a VPC, subnet, security group, IAM roles, NAT, internet gateway, EC2 client, MSK Serverless, MSK Connect, Amazon RDS, S3 buckets, and AWS Glue job.

To create the solution resources for the CDC pipeline, complete the following steps:

  1. Launch the stack vpc-msk-mskconnect-rds-client-gluejob.yaml using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.

    A B C
    1 Parameters Description Sample value
    2 EnvironmentName An environment name that is prefixed to resource names. msk-iceberg-cdc-pipeline
    3 DatabasePassword Database admin account password. ****
    4 InstanceType MSK client EC2 instance type. t2.micro
    5 LatestAmiId Latest AMI ID of Amazon Linux 3 for ec2 instance. You can use the default value. /aws/service/ami-amazon-linux-latest/al2023-ami-kernel-default-x86_64
    6 VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
    7 PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
    8 PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
    9 PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
    10 PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24
    11 NumberOfWorkers Number of workers for AWS Glue streaming job. 3
    12 GlueWorkerType Worker type for AWS Glue streaming job. For example, G.1X. G.1X
    13 GlueDatabaseName Name of the AWS Glue Data Catalog database. glue_cdc_blogdb
    14 GlueTableName Name of the AWS Glue Data Catalog table. iceberg_cdc_tbl

The stack creation process can take approximately 25 minutes to complete. You can check the Outputs tab for the stack after the stack is created, as shown in the following screenshot.

Following the successful deployment of the CloudFormation stack, you now have a fully operational Amazon RDS database environment. The database instance contains the salesdb database with the customer table populated with 30 data records.

These records have been streamed to the Kafka topic through the Debezium MySQL connector implementation, establishing a reliable CDC pipeline. With this foundation in place, proceed to the next phase of the data architecture: near real-time data processing using the AWS Glue streaming job.

Run the AWS Glue streaming job

To transfer the data load from the Kafka topic (created by the Debezium MySQL connector for database table customer) to the Iceberg table, run the AWS Glue streaming job configured by the CloudFormation setup. This process will migrate all existing customer data from the source database table to the Iceberg table. Complete the following steps:

  1. On the CloudFormation console, choose the stack vpc-msk-mskconnect-rds-client-gluejob.yaml
  2. On the Outputs tab, retrieve the name of the AWS Glue streaming job from the GlueJobName row. In the following screenshot, the name is IcebergCDC-msk-iceberg-cdc-pipeline.
  3. On the AWS Glue console, choose ETL jobs in the navigation pane.
  4. Search for the AWS Glue job named IcebergCDC-msk-iceberg-cdc-pipeline.
  5. Choose the job name to open its details page.
  6. Choose Run to start the job. On the Runs tab, confirm if the job ran without failure.

You need to wait approximately 2 minutes for the job to process before continuing. This pause allows the jobrun to fully process records from the Kafka topic (initial load) and create the Iceberg table.

Query the Iceberg table using Athena

After the AWS Glue streaming job has successfully started and the Iceberg table has been created in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database glue_cdc_blogdb.
  4. To validate the data, enter the following query to preview the data and find the total count:
    SELECT id, name, mktsegment FROM "glue_cdc_blogdb"."iceberg_cdc_tbl" order by id desc limit 40;
    
    SELECT count(*) as total_rows FROM "glue_cdc_blogdb"."iceberg_cdc_tbl";

    The following screenshot shows the output of the example query.

After performing the preceding steps, you’ve established a complete near real-time data processing pipeline by running an AWS Glue streaming job that transfers data from Kafka topics to an Apache Iceberg table, then verified the successful data migration by querying the results through Amazon Athena.

Upload incremental (CDC) data for further processing

Now that you’ve successfully completed the initial full data load, it’s time to focus on the dynamic aspects of the data pipeline. In this section, we explore how the system handles ongoing data modifications such as insertions, updates, and deletions in Amazon RDS for MySQL database. These changes won’t go unnoticed. Our Debezium MySQL connector stands ready to capture each modification event, transforming database changes into a continuous stream of data. Working in tandem with our AWS Glue streaming job, this architecture is designed to promptly process and propagate every change in our source database through our data pipeline.Let’s see this real-time data synchronization mechanism in action, demonstrating how our modern data infrastructure maintains consistency across systems with minimal latency. Follow these steps:

  1. On the Amazon EC2 console, access the EC2 instance that you created using the CloudFormation template named as KafkaClientInstance.
  2. Log in to the EC2 instance using AWS Systems Manager Agent (SSM Agent). Select the instance named as KafkaClientInstance and then choose Connect.
  3. Enter the following commands to insert the data into the RDS table. Use the same database password you entered when you created the CloudFormation stack.
    sudo su - ec2-user
    RDS_AURORA_ENDPOINT=`aws rds describe-db-instances --region us-east-1 | jq -r '.DBInstances[] | select(.DBName == "salesdb") | .Endpoint.Address'`
    mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password

  4. Now perform the insert, update, and delete in the CUSTOMER table.
    use salesdb;
    
    INSERT INTO customer VALUES(31, 'Customer Name 31', 'Market segment 31');
    INSERT INTO customer VALUES(32, 'Customer Name 32', 'Market segment 32');
    
    UPDATE customer SET name='Customer Name update 29', mktsegment='Market segment update 29' WHERE id = 29;
    UPDATE customer SET name='Customer Name update 30', mktsegment='Market segment update 30' WHERE id = 30;
    
    DELETE FROM customer WHERE id = 27;
    DELETE FROM customer WHERE id = 28;
    

  5. Validate the data to verify the insert, update, and delete records in the Iceberg table from Athena, as shown in the following screenshot.

After performing the preceding steps, you’ve learned how our CDC pipeline handles ongoing data modifications by performing insertions, updates, and deletions in the MySQL database and verifying how these changes are automatically captured by Debezium MySQL connector, streamed through Kafka, and reflected in the Iceberg table in near real time.

Schema evolution: Adding new columns to the Iceberg table

The schema evolution mechanism in this implementation provides an automated approach to detecting and adding new columns from incoming data to existing Iceberg tables. Although Iceberg inherently supports robust schema evolution capabilities (including adding, dropping, and renaming columns, updating types, and reordering), this code specifically automates the column addition process for streaming environments. This automation uses Iceberg’s underlying schema evolution capabilities, which guarantee correctness through unique column IDs that ensure new columns never read existing values from another column. By handling column additions programmatically, the system reduces operational overhead in streaming pipelines where manual schema management would create bottlenecks. However, dropping and renaming columns, updating types, and reordering still required manual intervention.

When new data arrives through Kafka streams, the handle_schema_evolution() function orchestrates a four-step process to ensure seamless table schema updates.

  1. It analyzes the incoming batch DataFrame to infer its schema structure, cataloging all column names and their corresponding data types.
  2. It retrieves the existing Iceberg table’s schema from the AWS Glue catalog to establish a baseline for comparison.
  3. The system then performs a schema comparison using method compare_schemas() between batch schema with existing table schema.
    1. If the incoming frame contains fewer columns than the catalog table, no action is taken.
    2. It identifies any new columns present in the incoming data that don’t exist in the current table structure and returns a list of new columns that need to be added.
    3. New columns will be added at the last.
    4. Handle type evolution isn’t supported. If needed, you can handle the same at comment # Handle type evolution in the compare_schemas() method.
    5. If the destination table has columns that are dropped in the source table, it doesn’t drop those columns. If that is required for your use case, you can use drop column manually using ALTER TABLE ... DROP COLUMN.
    6. Renaming the column isn’t supported. To rename the column use case, manually evolve the schema using ALTER TABLE … RENAME COLUMN.
  4. Finally, if new columns are discovered, the function executes ALTER TABLE … ADD COLUMN statements to evolve the Iceberg table schema, adding the new columns with their appropriate data types.

This approach eliminates the need for manual schema management and prevents data pipeline failures that would typically occur when encountering unexpected fields in streaming data. The implementation also includes proper error handling and logging to track schema evolution events, making it particularly valuable for environments where data structures frequently change.

def infer_schema_from_batch(batch_df):
    """
    Infer schema from the batch DataFrame
    Returns a dictionary with column names and their inferred types
    """
    schema_dict = {}
    for field in batch_df.schema.fields:
        schema_dict[field.name] = field.dataType
    return schema_dict

def get_existing_table_schema(spark, table_identifier):
    """
    Read the existing table schema from the Iceberg table
    Returns a dictionary with column names and their types
    """
    try:
        existing_df = spark.table(table_identifier)
        schema_dict = {}
        for field in existing_df.schema.fields:
            schema_dict[field.name] = field.dataType
        return schema_dict
    except Exception as e:
        print(f"Error reading existing table schema: {e}")
        return {}

def compare_schemas(batch_schema, existing_schema):
    """
    Compare batch schema with existing table schema
    Returns a list of new columns that need to be added
    """
    new_columns = []
    
    for col_name, col_type in batch_schema.items():
        if col_name not in existing_schema:
            new_columns.append((col_name, col_type))
        elif existing_schema[col_name] != col_type:
            # Handle type evolution if needed
            print(f"Warning: Column {col_name} type mismatch - existing: {existing_schema[col_name]}, new: {col_type}")
    
    return new_columns

def spark_type_to_sql_string(spark_type):
    """
    Convert Spark DataType to SQL string representation for ALTER TABLE
    """
    type_mapping = {
        'IntegerType': 'INT',
        'LongType': 'BIGINT',
        'StringType': 'STRING',
        'BooleanType': 'BOOLEAN',
        'DoubleType': 'DOUBLE',
        'FloatType': 'FLOAT',
        'TimestampType': 'TIMESTAMP',
        'DateType': 'DATE'
    }
    
    type_name = type(spark_type).__name__
    return type_mapping.get(type_name, 'STRING')

def evolve_table_schema(spark, table_identifier, new_columns):
    """
    Alter the Iceberg table to add new columns
    """
    if not new_columns:
        return
    
    try:
        for col_name, col_type in new_columns:
            sql_type = spark_type_to_sql_string(col_type)
            alter_sql = f"ALTER TABLE {table_identifier} ADD COLUMN {col_name} {sql_type}"
            print(f"Executing schema evolution: {alter_sql}")
            spark.sql(alter_sql)
            print(f"Successfully added column {col_name} with type {sql_type}")
    except Exception as e:
        print(f"Error during schema evolution: {e}")
        raise e

def handle_schema_evolution(spark, batch_df, table_identifier):
    """
    schema evolution steps
    1. Infer schema from batch DataFrame
    2. Read existing table schema
    3. Compare schemas and identify new columns
    4. Alter table if schema evolved
    """
    # Step 1: Infer schema from batch DataFrame
    batch_schema = infer_schema_from_batch(batch_df)
    print(f"Batch schema: {batch_schema}")
    
    # Step 2: Read existing table schema
    existing_schema = get_existing_table_schema(spark, table_identifier)
    print(f"Existing table schema: {existing_schema}")
    
    # Step 3: Compare schemas
    new_columns = compare_schemas(batch_schema, existing_schema)
    
    # Step 4: Evolve schema if needed
    if new_columns:
        print(f"Schema evolution detected. New columns: {new_columns}")
        evolve_table_schema(spark, table_identifier, new_columns)
        return True
    else:
        print("No schema evolution needed")
        return False

In this section, we demonstrate how our system handles structural changes to the underlying data model by adding a new status column to the customer table and populating it with default values. Our architecture is designed to seamlessly propagate these schema modifications throughout the pipeline so that downstream analytics and processing capabilities remain uninterrupted while accommodating the enhanced data model. This flexibility is essential for maintaining a responsive, business-aligned data infrastructure that can evolve alongside changing organizational needs.

  1. Add a new status column to the customer table and populate it with default values as Green.
    use salesdb;
    
    ALTER TABLE customer ADD COLUMN status VARCHAR(20) NOT NULL;
    
    UPDATE customer SET status = 'Green';
    

  2. Use the Athena console to validate the data and schema evolution, as shown in the following screenshot.

When schema evolution occurs in an Iceberg table, the metadata.json file undergoes specific updates to track and manage these changes. In job when schema evolution detected, it ran the following query to evolve the schema for the Iceberg table.

ALTER TABLE glue_catalog.glue_cdc_blogdb.iceberg_cdc_tbl ADD COLUMN status string

We checked the metadata.json file in Amazon S3 for iceberg table location, and the following screenshot shows how the schema evolved.

We now explain how our implementation handles schema evolution by automatically detecting and adding new columns from incoming data streams to existing Iceberg tables. The system employs a four-step process that analyzes incoming data schemas, compares them with existing table structures, identifies new columns, and executes the necessary ALTER TABLE statements to evolve the schema without manual intervention, though certain schema changes still require manual handling.

Clean up

To clean up your resources, complete the following steps:

  1. Stop the running AWS Glue streaming job:
    1. On the AWS Glue console, choose ETL jobs in the navigation pane.
    2. Search for the AWS Glue job named IcebergCDC-msk-iceberg-cdc-pipeline.
    3. Choose the job name to open its details page.
    4. On the Runs tab, select running jobrun and choose Stop job run. Confirm that the job stopped successfully.
  2. Remove the AWS Glue database and table:
    1. On the AWS Glue console, choose Tables in the navigation pane, select iceberg_cdc_tbl, and choose Delete.
    2. Choose Databases in the navigation pane, select glue_cdc_blogdb, and choose Delete.
  3. Delete the CloudFormation stack vpc-msk-mskconnect-rds-client-gluejob.yaml.

Conclusion

This post showcases a solution that businesses can use to access real-time data insights without the traditional delays between data creation and analysis. By combining Amazon MSK Serverless, Debezium MySQL connector, AWS Glue streaming, and Apache Iceberg tables, the architecture captures database changes instantly and makes them immediately available for analytics through Amazon Athena. A standout feature is the system’s ability to automatically adapt when database structures change—such as adding new columns—without disrupting operations or requiring manual intervention. This eliminates the technical complexity typically associated with real-time data pipelines and provides business users with the most current information for decision-making, effectively bridging the gap between operational databases and analytical systems in a cost-effective, scalable way.


About the Authors

Nitin Kumar

Nitin Kumar

Nitin is a Cloud Engineer (ETL) at AWS, specializing in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. In his free time, he likes to watch movies and spend time with his family.

Shubham Purwar

Shubham Purwar

Shubham is an Analytics Specialist Solutions Architect at AWS. He helps organizations unlock the full potential of their data by designing and implementing scalable, secure, and high-performance analytics solutions on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Noritaka Sekiyama

Noritaka Sekiyama

Noritaka is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Stream mainframe data to AWS in near real time with Precisely and Amazon MSK

Post Syndicated from Supreet Padhi original https://aws.amazon.com/blogs/big-data/stream-mainframe-data-to-aws-in-near-real-time-with-precisely-and-amazon-msk/

This is a guest post by Supreet Padhi, Technology Architect, and Manasa Ramesh, Technology Architect at Precisely in partnership with AWS.

Enterprises rely on mainframes to run mission-critical applications and store essential data, enabling real-time operations that help achieve business objectives. These organizations face a common challenge: how to unlock the value of their mainframe data in today’s cloud-first world while maintaining system stability and data quality. Modernizing these systems is critical for competitiveness and innovation.

The digital transformation imperative has made mainframe data integration with cloud services a strategic priority for enterprises worldwide. Organizations that can seamlessly bridge their mainframe environments with modern cloud platforms gain significant competitive advantages through improved agility, reduced operational costs, and enhanced analytics capabilities. However, implementing such integrations presents unique technical challenges that require specialized solutions. Some of the challenges include converting EBCDIC data to ASCII, where the handling of data types is unique to the mainframe, such as binary data and COMP data. Data stored in Virtual Storage Access Method (VSAM) files can be quite complex due to practices to store multiple different record types in a single file. To address these challenges, Precisely—a global leader in data integrity, serving over 12,000 customers—has partnered with Amazon Web Services (AWS) to enable real-time synchronization between mainframe systems and Amazon Relational Database Service (Amazon RDS). For more on this collaboration, check out our previous blog post: Unlock Mainframe Data with Precisely Connect and Amazon Aurora.

In this post, we introduce an alternative architecture to synchronize mainframe data to the cloud using Amazon Managed Streaming for Apache Kafka (Amazon MSK) for greater flexibility and scalability. This event-driven approach provides additional possibilities for mainframe data integration and modernization strategies.

A key enhancement in this solution is the use of the AWS Mainframe Modernization – Data Replication for IBM z/OS Amazon Machine Image (AMI) available in AWS Marketplace, which simplifies deployment and reduces implementation time.

Real-time processing and event-driven architecture benefits

Real-time processing makes data actionable within seconds rather than waiting for batch processing cycles. For example, financial institutions such as Global Payments have leveraged this solution to modernize mission-critical banking operations, including payments processing. By migrating these operations to the AWS Cloud, they enhanced user experience, improved scalability and maintainability, while enabling advanced fraud detection – all without impacting the performance of existing mainframe systems. Change data capture (CDC) enables this by identifying database changes and delivering them in real time to cloud environments.

CDC offers two key advantages for mainframe modernization:

  • Incremental data movement – Eliminates disruptive bulk extracts by streaming only changed data to cloud targets, minimizing system impact and ensuring data currency
  • Real-time synchronization – Keeps cloud applications in sync with mainframe systems, enabling immediate insights and responsive operations

Solution overview

In this post, we provide a detailed implementation guide for streaming mainframe data changes from DB2z through AWS Mainframe Modernization – Data Replication for IBM z/OS AMI to Amazon MSK and then applying those changes to Amazon Relational Database Service (Amazon RDS) for PostgreSQL using MSK Connect with the Confluent JDBC Sink Connector.

By introducing Amazon MSK into architecture and streamlining deployment through the AWS Marketplace AMI, we create new possibilities for data distribution, transformation, and consumption that expand upon our previously demonstrated direct replication approach. This streaming-based architecture offers several additional benefits:

  • Simplified deployment – Accelerate implementation using the preconfigured AWS Marketplace AMI
  • Decoupled systems – Separate the concern of data extraction from data consumption, allowing both sides to scale independently
  • Multi-consumer support – Enable multiple downstream applications and services to consume the same data stream according to their own requirements
  • Extensibility – Create a foundation that can be extended to support additional mainframe data sources such as IMS and VSAM, as well as additional AWS targets using MSK Connect sink connectors

The following diagram illustrates the solution architecture.

Precisely MSK architecture diagram

  1. Capture/Publisher – Connect CDC Capture/Publisher captures Db2 changes from Db2 logs using IFI 306 Read and communicates captured data changes to a target engine through TCP/IP.
  2. Controller Daemon – The Controller Daemon authenticates all connection requests, managing secure communication between the source and target environments.
  3. Apply Engine – The Apply Engine is a multifaceted and multifunctional component in the target environment. It receives the changes from the Publisher agent and applies the changed data to the target Amazon MSK.
  4. Connect CDC Single Message Transform (SMT) – Performs all necessary data filtering, transformation, and augmentation required by the sink connector.
  5. JDBC Sink Connector – As data arrives, an instance of the JDBC Sink Connector along with Apache Kafka writes the data to target tables in Amazon RDS.

This architecture provides a clean separation between the data capture process and the data consumption process, allowing each to scale independently. The use of MSK as an intermediary enables multiple systems to consume the same data stream, opening possibilities for complex event processing, real-time analytics, and integration with other AWS services.

Prerequisites

To complete the solution, you need the following prerequisites:

  1. Install AWS Mainframe Modernization – Data Replication for IBM z/OS
  2. Have access to Db2z on mainframe from AWS using your approved connectivity between AWS and your mainframe

Solution walkthrough

The following code content shouldn’t be deployed to production environments without additional security testing.

Configure the AWS Mainframe Modernization Data Replication with Precisely AMI on Amazon EC2

Follow the steps defined at Precisely AWS Mainframe Modernization Data Replication. Upon the initial launch of the AMI, use the following command to connect to the Amazon Elastic Compute Cloud (Amazon EC2) instance:

ssh -i ami-ec2-user.pem ec2-user@$AWS_AMI_HOST

Configure the serverless cluster

To create an Amazon Aurora PostgreSQL-Compatible Edition Serverless v2 cluster, complete the following steps:

  1. Create a DB cluster by using the following AWS Command Line Interface (AWS CLI) command. Replace the placeholder strings with values that correspond to your cluster’s subnet and subnet group IDs.
    aws rds create-db-cluster \
       --db-cluster-identifier cdc-serverless-pg-cluster \
       --engine aurora-postgresql \
       --serverless-v2-scaling-configuration MinCapacity=1,MaxCapacity=2 \
       --master-username connectcdcuser \
       --manage-master-user-password \
       --db-subnet-group-name "<subnet-security-group-id>" \
       --vpc-security-group-ids "<cluster-security-group-id>"

  2. Verify the status of the cluster by using the following command:
    aws rds describe-db-clusters --db-cluster-identifier cdc-serverless-pg-cluster

  3. Add a writer DB instance to the Aurora cluster:
    aws rds create-db-instance \
       --db-cluster-identifier cdc-serverless-pg-cluster \
       --db-instance-identifier cdc-serverless-pg-instance \
       --db-instance-class db.serverless \
       --engine aurora-postgresql

  4. Verify the status of the writer instance:
    aws rds describe-db-instances --db-instance-identifier cdc-serverless-pg-instance

Create a database in the PostgreSQL cluster

After your Aurora Serverless v2 cluster is running, you need to create a database for your replicated mainframe data. Follow these steps:

  1. Install the psql client:
    sudo yum install postgresql16

  2. Retrieve the password from secret manager:
    aws secretsmanager get-secret-value --secret-id '<cdc-serverless-pg-cluster-secret ARN>' --query 'SecretString' --output text

  3. Create a new database in PostgreSQL:
    PGPASSWORD="password" psql --host=<DATABASE-HOST> --username=connectcdcuser --dbname=postgres -c "CREATE DATABASE dbcdc"

Configure the serverless MSK cluster

To create a serverless MSK cluster, complete the following steps:

  1. Copy the following JSON and paste it into a new file create-msk-serverless-cluster.json. Replace the placeholder strings with values that correspond to your cluster’s subnet and security group IDs.
       {
         "VpcConfigs": [
           {
             "subnets": [
               "<cluster-subnet-1>",
               "<cluster-subnet-2>",
               "<cluster-subnet-3>"
             ],
             "securityGroups": ["<cluster-security-group-id>"]
           }
         ],
         "ClientAuthentication": {
           "Sasl": {
             "Iam": {
               "Enabled": true
             }
           }
         }
       }

  2. Invoke the following AWS CLI command in the folder where you saved the JSON file in the previous step:
    aws kafka create-cluster-v2 --cluster-name pgsqlmsk --serverless file://create-msk-serverless-cluster.json

  3. Verify cluster status by invoking the following AWS CLI command:
    aws kafka list-clusters-v2 --cluster-type-filter SERVERLESS

  4. Get the bootstrap broker address by invoking the following AWS CLI command:
    aws kafka get-bootstrap-brokers --cluster-arn "<msk-serverless-cluster-arn>"

  5. Define the environment variable to store the bootstrap servers of the MSK cluster and locally install Kafka in the path environment variable:
    export BOOTSTRAP_SERVERS=<kafka_bootstrap_servers_with_ports>

Create a topic on the MSK cluster

To create a Kafka topic, you need to install the Kafka CLI first. Follow these steps:

  1. Download the binary distribution of Apache Kafka and extract the archive in folder kafka:
    wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
       tar -xzf kafka_2.13-3.9.0.tgz
       ln -sfn kafka_2.13-3.9.0 kafka

  2. To use IAM to authenticate with the MSK cluster, download the Amazon MSK Library for IAM and copy to the local Kafka library directory as shown in the following code. For complete instructions, refer to Configure clients for IAM access control.
    wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.3.1/aws-msk-iam-auth-2.3.1-all.jar
    cp aws-msk-iam-auth-2.3.1-all.jar kafka/libs

  3. In the directory, create a file to configure a Kafka client to use IAM authentication for the Kafka console producer and consumers:
    security.protocol=SASL_SSL
       sasl.mechanism=AWS_MSK_IAM
       sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

  4. Create the Kafka topic, which you defined in the connector config:
    kafka/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVERS --command-config kafka/config/client-config.properties --partitions 1 --topic pgsql-sink-topic

Configure the MSK Connect plugin

Next, create a custom plugin available in the AMI at /opt/precisely/di/packages/sqdata-msk_connect_1.0.1.zip which contains the following:

  • JDBC Sink Connector from Confluent
  • MSK Config provider
  • AWS Mainframe Modernization – Data Repication for IBM z/OS Custom SMT

Follow these steps:

  1. Invoke the following to upload the .zip file to an S3 bucket to which you have access:
    aws s3 cp /opt/precisely/di/packages/sqdata-msk_connect_1.0.1.zip s3://<bucket>/

  2. Copy the following JSON and paste it into a new file create-custom-plugin.json. Replace the placeholder strings with values that correspond to your bucket.
    {
         "contentType": "ZIP",
         "description": "jdbc sink connector",
         "location": {
           "s3Location": {
             "bucketArn": "arn:aws:s3:::<bucket>",
             "fileKey": "sqdata-msk_connect_1.0.1.zip"
           }
         },
         "name": "jdbc-sink-connector"
       }

  3. Invoke the following AWS CLI command in the folder where you saved the JSON file in the previous step:
    aws kafkaconnect create-custom-plugin --cli-input-json file://create-custom-plugin.json

  4. Verify plugin status by invoking the following AWS CLI command:
    aws kafkaconnect list-custom-plugins

Configure the JDBC Sink Connector

To configure the JDBC Sink Connector, follow these steps:

  1. Copy the following JSON and paste it into a new file create-connector.json. Replace the placeholder strings with appropriate values:
    {
         "connectorConfiguration": {
           "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
           "connection.url": "jdbc:postgresql://<postgresql-endpoint>
    /dbcdc?currentSchema=public",
           "config.providers": "secretsmanager",
           "config.providers.secretsmanager.class": "com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider",
           "connection.user": "${secretsmanager:MySecret-1234:username}",
           "connection.password": "${secretsmanager:MySecret-1234:password}",
           "config.providers.secretsmanager.param.region": "<region>",
           "tasks.max": "1",
           "topics": "pgsql-sink-topic",
           "insert.mode": "upsert",
           "delete.enabled": "true",
           "pk.mode": "record_key",
           "auto.evolve": "true",
           "auto.create": "true",
           "value.converter": "org.apache.kafka.connect.storage.StringConverter",
           "key.converter": "org.apache.kafka.connect.storage.StringConverter",
           "transforms": "ConnectCDCConverter",
           "transforms.ConnectCDCConverter.type": "com.precisely.kafkaconnect.ConnectCDCConverter",
           "transforms.ConnectCDCConverter.cdc.multiple.tables.enabled": "true",
           "transforms.ConnectCDCConverter.cdc.source.table.name.ignore.schema": "true"
         },
         "connectorName": "pssql-sink-connector",
         "kafkaCluster": {
           "apacheKafkaCluster": {
             "bootstrapServers": "<msk-bootstrap-servers-string>",
             "vpc": {
               "subnets": [
                 "<cluster-subnet-1>",
                 "<cluster-subnet-2>",
                 "<cluster-subnet-3>"
               ],
               "securityGroups": ["<cluster-security-group-id>"]
             }
           }
         },
         "capacity": {
           "provisionedCapacity": {
             "mcuCount": 1,
             "workerCount": 1
           }
         },
         "kafkaConnectVersion": "3.7.x",
         "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>",
         "plugins": [
           {
             "customPlugin": {
               "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>",
               "revision": 1
             }
           }
         ],
         "kafkaClusterEncryptionInTransit": {"encryptionType": "TLS"},
         "kafkaClusterClientAuthentication": {"authenticationType": "IAM"},
         "logDelivery": {
           "workerLogDelivery": {
             "cloudWatchLogs": {
               "enabled": true,
               "logGroup": "<loggroup>"
             }
           }
         }
       }

  2. Invoke the following AWS CLI command in the folder where you saved the JSON file in the previous step:
    aws kafkaconnect create-connector --cli-input-json file://create-connector.json

  3. Verify connector status by invoking the following AWS CLI command:
    aws kafkaconnect list-connectors

Set up Db2 Capture/Publisher on Mainframe

To establish the Db2 Capture/Publisher on the mainframe for capturing changes to the DEPT table, follow these structured steps that build upon our previous blog post, Unlock Mainframe Data with Precisely Connect and Amazon Aurora:

  1. Prepare the source table. Before configuring the Capture/Publisher, ensure the DEPT source table exists on your mainframe Db2 system. The table definition should match the structure defined at \$SQDATA_VAR_DIR/templates/dept.ddl. If you need to create this table on your mainframe, use the DDL from this file as a reference to ensure compatibility with the replication process.
  2. Access the Interactive System Productivity Facility (ISPF) interface. Sign in to your mainframe system and access the AWS Mainframe Modernization – Data Repication for IBM z/OS ISPF panels through the supplied ISPF application menu. Select option 3 (CDC) to access the CDC configuration panels, as demonstrated in our previous blog post.
  3. Add source tables for capture:
    1. From the CDC Primary Option Menu, choose option 2 (Define Subscriptions).
    2. Choose option 1 (Define Db2 Tables) to add source tables.
    3. On the (Add DB2 Source Table to CAB File panel), enter a wildcard value (%) or the specific table name DEPT in the (Table Name) field.
    4. Press Enter to display the list of available tables.
    5. Type S next to the DEPT table to select it for replication, then press Enter to confirm.

This process is like the table selection process shown in figure 3 and figure 4 of our previous post but now focuses specifically on the DEPT table structure.

With the completion of both the Db2 Capture/Publisher setup on the mainframe and the AWS environment configuration (Amazon MSK, Apply Engine, and MSK Connect JDBC Sink Connector), you now have a fully functional pipeline ready to capture data changes from the mainframe and stream them to the MSK topic. Inserts, updates, or deletions to the DEPT table on the mainframe will be automatically captured and pushed to the MSK topic in near real time. From there, the MSK Connect JDBC Sink Connector and the custom SMT will process these messages and apply the changes to the PostgreSQL database on Amazon RDS, completing the end-to-end replication flow.

Configure Apply Engine for Amazon MSK integration

Configure the AWS side components to receive data from the mainframe and forward it to Amazon MSK. Follow these steps to define and manage a new CDC pipeline from DB2 z/OS to Amazon MSK:

  1. Use the following command to switch to the connect user:
    sudo su connect

  2. Create the apply engine directories:
    mkdir -p \$SQDATA_VAR_DIR/apply/DB2ZTOMSK/ddl
         connect> mkdir -p \$SQDATA_VAR_DIR/apply/DB2ZTOMSK/scripts

  3. Copy the sample script from dept.ddl:
    cp \$SQDATA_VAR_DIR/templates/dept.ddl \$SQDATA_VAR_DIR/apply/DB2ZTOMSK/ddl/

  4. Copy the following content and paste it in a new file $SQDATA_VAR_DIR/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.sqd. Replace the placeholder strings with values that correspond to the DB2z endpoint:
    -----------------------------------------------------------------------
       Name: DB2TOKAF: Z/OS DB2 To Kafka
       -----------------------------------------------------------------------
       SUBSTITUTION PARMS USED IN THIS SCRIPT:
       ---------------------------------------------------------------------
       JOBNAME DB2TOKAFKA;
       -----------------------------
       TABLE DESCRIPTIONS
       ---------------------------
       BEGIN GROUP SOURCE_TABLES;
       DESCRIPTION Db2SQL /var/precisely/di/sqdata/apply/DB2ZTOMSK/ddl/dept.ddl AS DEPT KEY IS DEPTNO;
       END GROUP;
       -------------------------------------------------------------
       DATASTORE SECTION
       -------------------------------------------------------------
       SOURCE DATASTORE
       DATASTORE cdc://<DB2z endpoint with port>/dbcg/DBCG_TBTSS388T6 OF UTSCDC AS CDCIN DESCRIBED BY GROUP SOURCE_TABLES;
       -- TARGET DATASTORE
       DATASTORE kafka:///pgsql-sink-topic/table_key OF JSON AS TARGET KEY IS DEPTNO DESCRIBED BY GROUP SOURCE_TABLES;
       ---------------------------------
       PROCESS INTO TARGET
       SELECT { REPLICATE(TARGET) } FROM CDCIN;

  5. Create the working directory:
    mkdir -p /var/precisely/di/sqdata_logs/apply/DB2ZTOMSK

  6. Add the following to $SQDATA_DAEMON_DIR/cfg/sqdagents.cfg:
    [DB2ZTOMSK]
       type=engine
       program=sqdata
       args=/var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.prc --log-level=8
       working_directory=/var/precisely/di/sqdata_logs/apply/DB2ZTOMSK
       stdout_file=stdout.txt
       stderr_file=stderr.txt
       auto_start=0
       comment=Apply Engine for MSK from Db2z

  7. After the preceding code is added to the sqdagents.cfg section, reload for the changes to take effect:
    sqdmon reload

  8. Validate the apply engine job script by using the SQData parse command to create the compiled file expected by the SQData engine:
    sqdparse $SQDATA_VAR_DIR/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.sqd $SQDATA_VAR_DIR/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.prc

    The following is an example of the output that you get when you invoke the command successfully:

    SQDC042I mounting/running sqdparse with arguments:
    SQDC041I args[0]:sqdparse
    SQDC041I args[1]:/var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.sqd
    SQDC041I args[2]:/var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.prc
    SQDC000I *******************************************************
    SQDC021I sqdparse Version 5.0.1-rel (Linux-x86_64)
    SQDC022I Build-id 4f2d7c16728aa2e40c610db7d5a6e373476a9889
    SQDC023I (c) 2001, 2025 Syncsort Incorporated. All rights reserved.
    SQDC000I *******************************************************
    SQDC000I
    SQD0000I 2025-03-31 00:59:10
    >>> Start Preprocessed /var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.sqd
    000001 ----------------------------------------------------------------------
    000002 -- Name: DB2TOKAF:  Z/OS DB2 To Kafka
    000003 ----------------------------------------------------------------------
    000004 --  SUBSTITUTION PARMS USED IN THIS SCRIPT:
    000005 ----------------------------------------------------------------------
    000006
    000007 JOBNAME DB2TOKAFKA;
    000008
    000009 ----------------------------
    000010 -- TABLE DESCRIPTIONS
    000011 ----------------------------
    000012 BEGIN GROUP SOURCE_TABLES;
    000013 DESCRIPTION Db2SQL /var/precisely/di/sqdata/apply/DB2ZTOMSK/ddl/dept.ddl  AS DEPT
    000014 KEY IS DEPTNO;
    000015 END GROUP;
    000016
    000017 ------------------------------------------------------------
    000018 --       DATASTORE SECTION
    000019 ------------------------------------------------------------
    000020
    000021 -- SOURCE DATASTORE
    000022 DATASTORE /var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB0A.ENGINE3.DEPT.COPY
    000023           OF UTSCDC
    000024           AS CDCIN
    000025           DESCRIBED BY GROUP SOURCE_TABLES;
    000026
    000027 -- TARGET DATASTORE
    000028 DATASTORE 
    000029           OF JSON
    000030           AS TARGET
    000031           KEY IS DEPTNO
    000032           DESCRIBED BY GROUP SOURCE_TABLES;
    000033
    000034 ----------------------------------
    000035
    000036 PROCESS INTO TARGET
    000037 SELECT
    000038 {
    000039     REPLICATE(TARGET)
    000040 }
    000041 FROM CDCIN;
    <<< End Preprocessed /var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB2ZTOMSK.sqd
    >>> Start Preprocessed /var/precisely/di/sqdata/apply/DB2ZTOMSK/ddl/dept.ddl
    000001 CREATE TABLE DEPARTMENT
    000002 (
    000003    DEPTNO char(3) NOT NULL,
    000004    DEPTNAME varchar(36) NOT NULL,
    000005    MGRNO char(6),
    000006    ADMRDEPT char(3) NOT NULL,
    000007    LOCATION char(16),
    000008    CONSTRAINT PK_DEPTNO PRIMARY KEY (DEPTNO)
    000009 ) ;
    <<< End Preprocessed /var/precisely/di/sqdata/apply/DB2ZTOMSK/ddl/dept.ddl
    Number of Data Stores...................: 2
    Data Store..............................: /var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB0A.ENGINE3.DEPT.COPY
      Alias.................................: CDCIN
      Type..................................: UTS Change Data Capture
      Number of Records.....................: 1
        Record Name.........................: DEPARTMENT
        Record Description Alias............: DEPT
        Record Description Length...........: 72
        Number of Fields....................: 5
          ................................... TYPE            OFF   LEN   XLEN  EXT
          ................................... ---------- ----- ----- ----- -----
          DEPTNO............................: CHAR(3)             0     3     3
          DEPTNAME..........................: VARCHAR(36)         3    38    38
          MGRNO.............................: CHAR(6)             7     6     6
          ADMRDEPT..........................: CHAR(3)            14     3     3
          LOCATION..........................: CHAR(16)           17    16    16
    Data Store..............................: 
      Alias.................................: TARGET
      Type..................................: JSON
      Number of Records.....................: 1
        Record Name.........................: DEPARTMENT
        Record Description Alias............: DEPT
        Record Description Length...........: 70
        Number of Fields....................: 5
          ................................... TYPE            OFF   LEN   XLEN  EXT
          ................................... ---------- ----- ----- ----- -----
          DEPTNO............................: CHAR(3)             0     3     3
          DEPTNAME..........................: VARCHAR(36)         3    38    38
          MGRNO.............................: CHAR(6)            41     6     6
          ADMRDEPT..........................: CHAR(3)            47     3     3
          LOCATION..........................: CHAR(16)           50    16    16
    Section.................................: SQDSTP000
      Number of steps.......................: 1
    SQDC017I sqdparse(pid=4023) terminated successfully

  9. Copy the following content and paste it in a new file /var/precisely/di/sqdata_logs/apply/DB2ZTOMSK/sqdata_kafka_producer.conf. Replace the placeholder strings with values that correspond to your bootstrap server and AWS Region.
    metadata.broker.list=<kafka_bootstrap_servers_with_ports>
         security.protocol=SASL_SSL
         sasl.mechanism=OAUTHBEARER
         sasl.oauthbearer.config="extension_AWSMSKCB=python3,/usr/lib64/python3.9/site-packages/aws_msk_iam_sasl_signer/cli.py,--region,<region>"
         sasl.oauthbearer.method="default"

  10. Start the apply engine using the controller daemon by using the following command:
    sqdmon start ///DB2ZTOMSK

  11. Monitor the apply engine through the controller daemon by using the following command:
    sqdmon display ///DB2ZTOMSK --format=details

    The following is an example of the output that you get when you invoke the command successfully:

    Engine..................................: DB2ZTOMSK
    version.................................: 5.0.1-rel (Linux-x86_64)
    git.....................................: f021c29a84c1a99f59144288aeeb2cb8fa494485
    jobname.................................: DB2TOKAFKA
    parsed..................................: 20250320172610278108
    started.................................: 2025-03-20.17.47.23.444474
    started (UTC)...........................: 2025-03-20.17.47.23.444474 (1742492843444)
    updated (UTC)...........................: 2025-03-20.17.47.25.901018 (1742492845901)
    Input Datastore.........................: /var/precisely/di/sqdata/apply/DB2ZTOMSK/scripts/DB0A.ENGINE3.DEPT.COPY
    Alias...................................: CDCIN
    Type....................................: UTS Change Data Capture
      Records Read..........................: 14
      Records Selected......................: 14
      Bytes Read............................: 2892
    Output Datastore........................: kafka:///pgsql-sink-topic/table_key
    Alias...................................: TARGET
    Type....................................: JSON
      Records Inserted......................: 14
      Records Updated.......................: 0
      Records Deleted.......................: 0
      Formatted bytes.......................: 3458
      Unformatted bytes.....................: 448
    Total Output Formatted bytes............: 3458
    Total Output Unformatted bytes..........: 448
    SQDC017I sqdmon(pid=123540) terminated successfully

    Logs can also be found at /var/precisely/di/sqdata_logs/apply/DB2ZTOMSK.

Verify data in the MSK topic

Invoke the Kafka CLI command to verify the JSON data in the MSK topic:

kafka/bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config kafka/config/client-config.properties --topic pgsql-sink-topic --from-beginning --property print.key=true

Verify data in the PostgreSQL database

Invoke the following command to verify the data in the PostgreSQL database:

PGPASSWORD="password" psql --host=<DATABASE-HOST> --username=<user> --dbname=<database> -c "select * from \"DEPT\""

With these steps completed, you’ve successfully set up end-to-end data replication from DB2z to RDS for PostgreSQL, using AWS Mainframe Modernization – Data Replication for IBM z/OS AMI, Amazon MSK, MSK Connect, and the Confluent JDBC Sink Connector.

Cleanup

When you’re finished testing this solution, you can clean up the resources to avoid incurring additional charges. Follow these steps in sequence to ensure proper cleanup.

Step 1: Delete the MSK Connect components

Follow these steps:

  1. List existing connectors:
    aws kafkaconnect list-connectors

  2. Delete the sink connector:
    aws kafkaconnect delete-connector --connector-arn "<arn-of-connector>"

  3. List custom plugins:
    aws kafkaconnect list-custom-plugins

  4. Delete the custom plugin:
    aws kafkaconnect delete-custom-plugin --custom-plugin-arn "<arn-of-custom-plugin>"

Step 2: Delete the MSK cluster

Follow these steps:

  1. List MSK clusters:
    aws kafka list-clusters-v2 --cluster-type-filter SERVERLESS

  2. Delete the MSK serverless cluster:
    aws kafka delete-cluster --cluster-arn "<arn-of-msk-serverless-cluster>"

Step 3: Delete the Aurora resources

Follow these steps:

  1. Delete the Aurora DB instance:
    aws rds delete-db-instance --db-instance-identifier cdc-serverless-pg-instance --skip-final-snapshot

  2. Delete the Aurora DB cluster:
    aws rds delete-db-cluster --db-cluster-identifier cdc-serverless-pg-cluster --skip-final-snapshot.

Conclusion

By capturing changed data from DB2z and streaming it to AWS targets, organizations can modernize their legacy mainframe data stores, enabling operational insights and AI initiatives. Businesses can use this solution to take advantage of cloud-based applications with mainframe data to provide scalability, cost-efficiency, and enhanced performance.

The integration of AWS Mainframe Modernization – Data Replication for IBM z/OS AMI with Amazon MSK and RDS for PostgreSQL provides an enhanced framework for real-time data synchronization that maintains data integrity. This architecture can be extended to support additional mainframe data sources such as VSAM and IMS, as well as other AWS targets. Organizations can then tailor their data integration strategy to specific business needs. Data consistency and latency challenges can be effectively managed through AWS and Precisely’s monitoring capabilities. By adopting this architecture, organizations keep their mainframe data continually available for analytics, machine learning (ML), and other advanced applications.Streaming mainframe data to AWS in near real time represents a strategic step toward modernizing legacy systems while unlocking new opportunities for innovation, with data transfers occurring in subseconds. With Precisely and AWS, organizations can effectively navigate their modernization journey and maintain their competitive advantage.

Learn more about AWS Mainframe Modernization – Data Replication for IBM z/OS AMI in the Precisely documentation. AWS Mainframe Modernization Data Replication is available for purchase in AWS Marketplace. For more information about the solution or to see a demonstration, contact Precisely.


About the authors

Supreet Padhi

Supreet Padhi

Supreet is a Technology Architect at Precisely. He has been with Precisely for more than 14 years, with specialty in streaming data use cases and technology, with emphasis on data warehouse architecture. He is responsible for research and development in areas such as Change Data Capture (CDC), streaming ETL, metadata management, and VectorDBs.

Manasa Ramesh

Manasa Ramesh

Manasa is a Technology Architect at Precisely, with over 15 years of experience in software development. She has worked on several innovation-driven projects in Metadata Management, Data Governance and Data Integration space. She is currently responsible for research, design and development of metadata discovery framework.

Tamara Astakhova

Tamara Astakhova

Tamara is a Sr. Partner Solutions Architect in Data and Analytics at AWS, brings over two decades of expertise in architecting and developing large-scale data analytics systems. In her current role, she collaborates with strategic partners to design and implement sophisticated AWS-optimized architectures. Her deep technical knowledge and experience make her an invaluable resource in helping organizations transform their data infrastructure and analytics capabilities.

Modernization of real-time payment orchestration on AWS

Post Syndicated from Neeraj Kaushik original https://aws.amazon.com/blogs/architecture/modernization-of-real-time-payment-orchestration-on-aws/

The global real-time payments market is experiencing significant growth. According to Fortune Business Insights, the market was valued at USD 24.91 billion in 2024 and is projected to grow to USD 284.49 billion by 2032, with a CAGR of 35.4%. Similarly, Grand View Research reports that the global mobile payment market, valued at USD 88.50 billion in 2024, is expected to grow at a CAGR of 38.0% from 2025 to 2030. (Disclaimer: Third-party market research and statistics are provided for informational purposed only. AWS and IBM make no representations about the accuracy of this information.)

This rapid expansion underscores the urgency for financial institutions to modernize their payment processing infrastructure. Financial institutions often need to process high volume of transactions with near-zero latency to meet stringent service level agreements (SLAs) to support surging mobile payments volume.

However, traditional payment orchestration systems, often built on monolithic architectures, struggle to meet these demands due to latency, availability, and scalability challenges. Additionally, their reliance on on-premises infrastructure leads to higher costs and an impediment to innovation, reinforcing the need for modernization.

As sustainability becomes a priority, organizations are turning to cloud-based solutions to optimize infrastructure, reduce carbon footprints, and enhance energy efficiency. This shift provides scalability and performance, and aligns with global sustainability goals, securing the future of real-time payments.

In this post, we discuss the real-time payment orchestration framework. It uses an event-driven architecture and AWS serverless services to enhance the resiliency, efficiency, and scalability of real-time payments. By decomposing payment processing into distinct business capabilities, financial institutions can improve modularity and flexibility. Implementing tenant-based segregation helps with data isolation and security. Additionally, adopting asynchronous communication through Amazon Managed Streaming for Apache Kafka (Amazon MSK) enhances scalability and resilience.

Traditional real-time payment orchestration

Payment orchestration serves as a middleware solution, streamlining transaction processing across multiple payment methods, gateways, and financial institutions. It orchestrates key business functions such as payment authorization, payment processing, settlement and clearing, compliance and risk management, and account management for both inbound and outbound payment flows.

The following diagram depicts the high-level business capabilities supported by payment orchestrators across various payment flows, including real-time payments, digital disbursements, tax payments, wires, and more.

Payment processing system flowchart showing main components from acceptance to billing

Detailed flowchart depicting a payment processing system with multiple components. The diagram shows primary payment types at the top (including Realtime Payments, Digital Disbursement, Credit Transfer, and Peer to Peer Payments) flowing down through core processing stages including Payment Acceptance, Execution, Clearing, Reporting, Tracking, Reversals, and Billing.

Many financial institutions adopt a tenant-based approach organized by geography due to varying clearing processes, localized regulations, and transaction requirements across AWS Regions. However, without proper separation of services, teams often continue to add region-specific logic to existing services, gradually increasing their monolithic complexity and using the same infrastructure for all payment flows.

Traditional payment systems process transactions linearly, with each step waiting for the previous one to complete. However, analysis of payment workflows reveals numerous opportunities for parallel execution:

  • Sanctions screening and fraud detection – Compliance and fraud checks can run simultaneously with initial routing decisions, rather than sequentially blocking all subsequent processing
  • Payment routing and authorization requests – When basic validations are complete, routing and authorization can proceed in parallel rather than one after another
  • Payment execution and ledger updates – The actual payment execution doesn’t need to wait for ledger records to be updated—these can occur concurrently
  • Settlement, reconciliation, and tracking – These post-transaction processes can be initiated independently as soon as the primary transaction is complete

This parallel approach can dramatically improve throughput and reduce latency compared to traditional queue-based systems where operations form a sequential chain that extends processing time and creates bottlenecks.

Most legacy payment orchestration systems rely heavily on on-premises virtual machines (VMs), leading to several challenges:

  • Multi-Region support for disaster recovery and multi-tenancy resulting in significant capital expenditure and operational overhead
  • High latency and SLA issues caused by sequential message processing and delays between globally separated data centers
  • Limited reusability of payment flows as monolithic architectures require region-specific changes for local clearing mechanisms and regulations, increasing complexity and costs
  • Scalability challenges and high memory consumption due to inefficient resource utilization and execution of irrelevant logic across regions
  • Complex cross-border payment routing caused by variations in clearing rules, transaction limits, and local regulations, increasing latency and routing errors
  • Integration challenges with diverse data formats because legacy systems rely on proprietary standards (for example, ISO 20022, SWIFT MT), complicating data conversion and compliance
  • High deployment complexity for new payment flows due to monolithic architectures requiring extensive region-specific modifications, slowing time to market
  • Environmental impact and high carbon footprint from on-premises infrastructure consuming excessive energy, whereas cloud-based approaches improve efficiency

Solution overview

To overcome these challenges, the proposed architecture embraces the following design principles to build a future-ready, real-time payment orchestration solution:

  • Performance at scale – Handling over 1,000 transactions per second (TPS) with consistent low latency under varying load conditions.
  • High availability – Achieving 99.999% uptime to meet the strict requirements of financial transactions.
  • Geographic resilience – Supporting global operations with region-specific compliance while maintaining consistent performance.
  • Cost optimization – Reducing total cost of ownership through efficient resource utilization and serverless technologies.
  • Security and compliance – Supporting data protection and regulatory adherence across different jurisdictions.
  • Operational simplicity – Streamlining deployment, monitoring, and maintenance across the payment ecosystem.
  • Microservices – Decomposing payment processing into distinct business capabilities, so financial institutions can improve modularity and flexibility. This microservices-based approach allows for independent scaling and development of critical components.

The following diagram depicts the high-level solution architecture for real-time payments. The existing channels using synchronous or asynchronous APIs can be modified to use edge-optimized endpoints to reduce latency.

Event-driven payment orchestration system with pub/sub channels connecting multiple payment processing modules

Architecture diagram detailing an AWS-based payment orchestration platform utilizing event-driven principles. Features reusable components across two regions, with dedicated modules for payment initiation, execution, reconciliation, billing, and risk management. Implements pub/sub messaging patterns for inter-component communication and connects to enterprise systems including accounting, compliance, and analytics.

An event-driven architecture is used for payment orchestration, which handles communication through a pub/sub pattern. This architecture maintains persistent connections, improving performance of the end-to-end real-time payment processing.

The event-driven architecture for real-time payment processing allows multiple payment operations to occur simultaneously using different adaptors, as opposed to the traditional systems where payment processes are sequential and flow through a single pipeline. Payment events are distributed to specialized payment processor microservices based on their function (initiation, execution, tracking, settlements), enabling each to process independently without waiting for others to complete.

Because we’re transitioning from sequential processing to distributed, maintaining transaction traceability is crucial. The payment tracking adapters shown in the preceding diagram connect to enterprise analytics systems, creating a specialized layer for monitoring transactions. The pub/sub model allows for attaching correlation IDs to events, enabling systems to track related events across different topics and processing stages.

A standardized event schema serves as the foundation for this architecture, providing consistency across regional deployments while allowing for customization at the adapter level. This schema defines uniform event structures containing tenant-specific metadata and supports versioning to accommodate evolving requirements. By isolating region-specific variations to the adapter layer, the solution maintains core functionality while interfacing with diverse enterprise systems through configuration-driven customization rather than code changes.

For most payment processes, especially those with independent processing steps that can run in parallel, this architecture delivers net performance gains despite the topic switching overhead, particularly for complex transactions where multiple independent validations or processing steps are required.

Deployment on the AWS Cloud

The solution uses edge-optimized Amazon API Gateway for channels. An edge-optimized API endpoint routes requests to the nearest Amazon CloudFront Point of Presence (POP), which can help in cases where your clients are geographically distributed to enable efficient routing within each geographical region, enhancing global responsiveness by minimizing network round trips and making sure requests take the shortest possible path before transitioning from the public internet to the client network.

The following diagram illustrates the high-level solution architecture for real-time payments.

Multi-region AWS payment architecture with managed Kafka topics connecting Lambda microservices and DynamoDB storage

Comprehensive AWS payment orchestration solution implementing modern cloud-native architecture principles. Core processing logic implemented as Lambda functions covering initiation, execution, reconciliation, billing, tracking, risk management, and settlement workflows. Leverages Amazon MSK for reliable event streaming between components, with dedicated Kafka topics for each processing stage. Data persistence handled by Amazon DynamoDB, supporting cross-region operations. Architecture demonstrates AWS best practices for financial services, including regional redundancy, serverless computing, managed services, and event-driven design patterns. System integrates with external banking infrastructure and enterprise systems while maintaining separation of concerns through microservices architecture. Features built-in support for compliance monitoring, risk management, and payment tracking through specialized Lambda functions.

The solution uses Amazon MSK to implement an event-driven architecture that efficiently handles both inbound and outbound channels traffic through API requests and asynchronous message-based events. Amazon MSK communicates using a high-performance binary protocol between producers, consumers, and brokers, providing low latency and high throughput. Real-time payments are logically partitioned across multiple tenants within geographical regions—North America, EMEA, LATAM, and Asia-Pacific.

Each real-time payment tenant follows an active/active disaster recovery strategy by deploying MSK clusters across multiple AWS Regions, designed to achieve high availability and resilience. Amazon MSK offer both serverless and provisioned cluster options. The team can decide to select one or the other depending on the non-functional requirements and team expertise. Amazon MSK automatically manages partition leadership with leaders in primary Regions and followers in secondary Regions. During failover, leaders are re-elected in healthy Regions, designed to help maintain processing capabilities during regional incidents. Sticky partitioning uses consistent hashing for deterministic routing, and cooperative rebalancing enables efficient failover. Multi-AZ deployment provides zone redundancy and isolated clusters per Region for data sovereignty compliance through programmatic AWS Identity and Access Management (IAM) and virtual private cloud (VPC) boundaries.

To support seamless cross-Region replication and maintain message continuity, Amazon MSK Replicator—a fully managed feature of Amazon MSK—is used to replicate topics and synchronize consumer group offsets across clusters. MSK Replicator simplifies the process of building multi-Region Kafka applications by not needing custom code, open-source tool configuration, or infrastructure management. It automatically provisions and scales the necessary resources, so teams can focus on business logic while only paying for the data being replicated. In the event of a regional outage or failover, traffic can be automatically redirected to a healthy Region without data loss or service disruption, providing near-zero Recovery Time Objectives (RTOs) and uninterrupted operations for downstream services such as payment processors and audit trail consumers.

In addition to regional redundancy, the architecture uses an event-driven architecture to enable parallel and decoupled processing of payment transactions. Events such as transaction initiation, validation, and settlement are emitted asynchronously and consumed by various microservices independently, which drastically reduces end-to-end latency.

To process these events at scale, the architecture can use AWS Lambda, Amazon Elastic Container Service (Amazon ECS), or Amazon Elastic Kubernetes Service (Amazon EKS) depending upon non-functional requirements. Automatic scaling responds to Amazon CloudWatch metrics, and exponential backoff retry logic with dead-letter queues (DLQs) handles throttling scenarios. Circuit breakers prevent cascade failures during high error rates.

One of the key benefits of the solution is the reusability of payment flows across different regions. Although each region has its own unique compliance requirements and settlement rules, the core functionalities of real-time payments (payment authorization, payment processing, settlement and clearing) are largely similar. This reusability enables rapid deployment of payment solutions across new regions without rearchitecting the entire system. For example, the real-time payment system in the US and UK might share similar business logic for real-time gross settlement but differ in the clearing and compliance requirements. The solution treats these as bounded contexts within the microservices architecture, providing flexibility while making sure each region can handle its own specific rules and regulations.

Sustainability

AWS relentlessly innovates its infrastructure design, build, and operations to make progress towards net-zero carbon by 2040 and being water positive by 2030. Amazon MSK with AWS Graviton based instances use up to 60% less energy than comparable M5 instances, helping you achieve your sustainability goals. Lambda is inherently sustainable by design. Its serverless model makes sure compute resources are only used when needed, drastically reducing idle infrastructure and wasted energy. Instead of keeping always-on servers for infrequent tasks, Lambda provisions compute power just-in-time, achieving near-zero idle capacity.

Security and compliance in financial services

Given the sensitive nature of payment transactions and financial data, you should apply the security controls required to meet financial regulations such as AWS PCI DSS and AWS Federal Information Processing Standard (FIPS) 140-3 according to your organization’s needs.

The solution should incorporate multi-layered security controls, continuous monitoring, and automated compliance auditing to meet the rigorous expectations of banking regulators and internal risk teams. For more information, refer to Security Guidance.

Conclusion

The modernization of payment orchestration systems using an event-driven architecture and AWS serverless technologies marks a significant advancement in meeting the demands of today’s rapidly evolving financial services landscape. This solution addresses the key challenges faced by traditional payment systems while delivering substantial benefits in performance, scalability, cost optimization, global resilience, sustainability, and compliance. By using cutting-edge cloud technologies and robust security controls, financial institutions can now build a future-ready foundation that adapts to evolving business needs while maintaining the highest standards of performance, security, and reliability. As the real-time payments market continues its explosive growth, this modern architecture provides a solution that meets today’s demands and is also well-positioned to support tomorrow’s payment innovations. Organizations looking to modernize their payment infrastructure can use this blueprint to accelerate their digital transformation journey, supporting sustainable, secure, and efficient payment processing at scale in an increasingly competitive global marketplace.

The architecture presented here is for reference purposes only. IBM will work closely with you to deploy the solution in accordance with industry standards and compliance requirements.For additional resources, refer to:

IBM Consulting is an AWS Premier Tier Services Partner that helps customers who use AWS to harness the power of innovation and drive their business transformation. They are recognized as a Global Systems Integrator (GSI) for over 22 competencies, including Financial Services Consulting. For additional information, please contact an IBM Representative.

How Laravel Nightwatch handles billions of observability events in real time with Amazon MSK and ClickHouse Cloud

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/how-laravel-nightwatch-handles-billions-of-observability-events-in-real-time-with-amazon-msk-and-clickhouse-cloud/

Laravel, one of the world’s most popular web frameworks, launched its first-party observability platform, Laravel Nightwatch, to provide developers with real-time insights into application performance. Built entirely on AWS managed services and ClickHouse Cloud, the service already processes over one billion events per day while maintaining sub-second query latency, giving developers instant visibility into the health of their applications.

By combining Amazon Managed Streaming for Apache Kafka (Amazon MSK) with ClickHouse Cloud and AWS Lambda, Laravel Nightwatch delivers high-volume, low-latency monitoring at scale, while maintaining the simplicity and developer experience Laravel is known for.

The challenge: Delivering real-time monitoring for a global developer community

The Laravel framework powers millions of applications worldwide, serving billions of requests each month. Each request can generate potentially hundreds of observability events, such as database queries, queued jobs, cache lookups, emails, notifications, and exceptions. For Nightwatch’s launch, Laravel anticipated instant adoption from its global community, with tens of thousands of applications sending events around the clock from day one.

Laravel Nightwatch needed an architecture that could:

  • Ingest millions of JSON events per second from customer applications reliably.
  • Provide sub-second analytical queries for real-time dashboards.
  • Scale horizontally to handle unpredictable traffic spikes.
  • Deliver all of this in a cost-effective, low-maintenance manner.

The challenge was to process data on a global scale and provide deep insights into application health without compromising on a straightforward setup experience for developers.

The solution: A decoupled streaming and analytics pipeline

Laravel Nightwatch implemented a dual-database, streaming-first architecture, shown in the preceding figure, that separates transactional and analytical workloads.

  • Transactional workloads – user accounts, organization settings, billing, and similar workloads run on Amazon RDS for PostgreSQL.
  • Analytical workloads – telemetry events, metrics, query logs, and request traces are handled by ClickHouse Cloud.

Key components

The key components of the solution include the following:

  1. Ingestion layer
    • Amazon API Gateway receives telemetry from Laravel agents embedded in customer applications
    • Lambda validates and enriches events. Validated and enriched events are published to Amazon MSK, partitioned for scalability
  2. Streaming to analytics
    • ClickPipes in ClickHouse Cloud subscribe directly to MSK topics, reducing the need to build and manage extract, transform, and load (ETL) pipelines
    • Materialized views in ClickHouse pre-aggregate and transform raw JSON into query-ready formats
  3. Dashboards and delivery

Why Amazon MSK and ClickHouse Cloud?

Nightwatch requires a durable, horizontally scalable, and low maintenance streaming backbone.

With Amazon MSK Express brokers, we have achieved over 1 million events per second during load testing, benefiting from low-latency, elastic scaling, and simplified operations. MSK Express brokers require no storage sizing or provisioning, scale up to 20 times faster, and recover 90% quicker than standard Apache Kafka brokers—all while enforcing best-practice defaults and client quotas for reliable performance. Its seamless integration with other AWS services—such as Lambda, Amazon Simple Storage Service (Amazon S3), and Amazon CloudWatch—made it straightforward to build a resilient, end-to-end streaming architecture.

To ingest and transform these events in real time, Nightwatch uses ClickHouse Cloud and its managed integration platform, ClickPipes. ClickHouse Cloud excels at analytical workloads by delivering up to 100 times faster query performance for analytics compared to traditional row-based databases. Its advanced compression algorithms provide up to 90% storage savings, significantly reducing infrastructure costs while maintaining high performance. With its columnar architecture and optimized execution engine, ClickHouse Cloud can query billions of rows in under 1 second, enabling Laravel Nightwatch to serve real-time dashboards and analytics at global scale.

By integrating Amazon MSK and ClickHouse using ClickPipes, Laravel also reduced the operational burden of building and managing ETL pipelines, reducing latency and complexity.

Overcoming challenges

Testing complexity

While synthetic benchmarking and test datasets yield useful results, a more realistic workload is required to rigorously test infrastructure and code before deployment to production. The team used Terraform to manage infrastructure alongside application code, creating multiple dev and test environments, and allowing them to test the platform internally with their own applications before each release.

Multi-region infrastructure

The need to cater to multiple data storage regions also brought challenges—with latency, complexity, and cost the foremost concerns. However, the AWS, ClickHouse Cloud, and Cloudflare stack made available a powerful set of networking tools and scaling options. While VPC peering, RDS replication, and global server load balancing did the heavy lifting on the networking side, the ability to scale and right-size each resource kept costs to a minimum.

Query performance at scale

Materialized views, intelligent time-series partitioning, and specialized ClickHouse codecs helped ensure that queries remained sub-second even as data volumes grew into the billions. Meanwhile, compute separation allowed distinct workloads to scale separately while accessing the same data, with clusters right-sized horizontally and vertically depending on the requirements of each load.

Results

Laravel Nightwatch’s launch exceeded expectations:

  • 5,300 users registered in the first 24 hours
  • 500 million events processed on day one
  • 97 ms average dashboard request latency
  • 760,000 exceptions logged and analyzed in real time

By building on Amazon MSK and ClickHouse Cloud, we were able to scale from zero to billions of events without sacrificing performance or developer experience.

What’s next

Laravel plans to expand Nightwatch with:

  • More regions to cater to customers with data sovereignty requirements outside the US and EU
  • Broader data collection to provide even deeper insight into customers’ applications
  • SOC 2 certification to cater to customers with tighter compliance requirements
  • More advanced monitoring and analysis to identify issues before they affect users

The current architecture comfortably supports applications of all sizes, from hobby to enterprise (including a generous free tier), and is designed to handle over one trillion monthly events without performance degradation.

Conclusion

Laravel Nightwatch demonstrates how Amazon MSK, ClickHouse Cloud, and AWS serverless technologies can be combined to build a cost-effective, real-time monitoring platform at global scale. By designing for scale from day one, Laravel delivered sub-second analytics across billions of events, while maintaining the developer-friendly experience their community expects.


About the authors

Jess Archer

Jess Archer

Jess is an Engineering Manager and Head of Nightwatch at Laravel, focusing on application observability, performance monitoring, and developer experience. She leads the Nightwatch team while staying hands-on in the codebase. Prior to Laravel, Jess worked on clinical data collection platforms, software for law enforcement, and anti-phishing solutions in banking. She later contributed extensively to Laravel’s open-source ecosystem before moving into her current leadership role. Jess is deeply passionate about open source and creating tools that make developers more productive.

James Carpenter

James Carpenter

James is a Senior Infrastructure Engineer joined Laravel in 2024 as Infrastructure Lead for the Nightwatch team, bringing experience from 15 years in sport and healthcare. Specialising in DevOps and Infrastructure, he is passionate about solving complex problems and creating exceptional experiences for both customers and developers.

Johnny Mirza

Johnny Mirza

Johnny is a Solution Architect with ClickHouse, working with users across APAC. With over 20 years of background in solutions engineering, he’s experienced in architecting and enabling solutions for enterprise clients in the telecommunications, media, insurance, and financial services sectors. Johnny has a high level of expertise of integration between both public cloud and on-premise infrastructure, while focussing on service assurance, monitoring platforms, and open-source technologies. Prior to ClickHouse, Johnny was part of the solution engineering teams at Confluent, Splunk, and Optus, to name a few.

Masudur Rahaman Sayem

Masudur Rahaman Sayem

Masudur is a Streaming Data Architect at AWS with over 25 years of experience in the IT industry. He collaborates with AWS customers worldwide to architect and implement sophisticated data streaming solutions that address complex business challenges. As an expert in distributed computing, Sayem specializes in designing large-scale distributed systems architecture for maximum performance and scalability. He has a keen interest and passion for distributed architecture, which he applies to designing enterprise-grade solutions at internet scale.

Export JMX metrics from Kafka connectors in Amazon Managed Streaming for Apache Kafka Connect with a custom plugin

Post Syndicated from Jaydev Nath original https://aws.amazon.com/blogs/big-data/export-jmx-metrics-from-kafka-connectors-in-amazon-managed-streaming-for-apache-kafka-connect-with-a-custom-plugin/

Organizations use streaming applications to process and analyze data in real time and adopt the Amazon MSK Connect feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK) to run fully managed Kafka Connect workloads on AWS. Message brokers like Apache Kafka allow applications to handle large volumes and diverse types of data efficiently and enable timely decision-making and instant insights. It’s crucial to monitor the performance and health of each component to help ensure the seamless operation of data streaming pipelines.

Amazon MSK is a fully managed service that simplifies the deployment and operation of Apache Kafka clusters on AWS. It simplifies building and running applications that use Apache Kafka to process streaming data. Amazon MSK Connect simplifies the deployment, monitoring, and automatic scaling of connectors that transfer data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. Amazon MSK Connect is fully compatible with Kafka Connect and supports Amazon MSK, Apache Kafka, and Apache Kafka compatible clusters. Amazon MSK Connect uses a custom plugin as the container for connector implementation logic.

Custom MSK connect plugins use Java Management Extensions (JMX) to expose runtime metrics. While Amazon MSK Connect sends a set of connect metrics to Amazon CloudWatch, it currently does not support exporting the JMX metrics emitted by the connector plugins natively. These metrics can be exported by modifying the custom connect plugin code directly, but it requires maintenance overhead because the plugin code needs to be modified every time it’s updated. In this post, we demonstrate an optimal approach by extending a custom connect plugin with additional modules to export JMX metrics and publish them to CloudWatch as custom metrics. These additional JMX metrics emitted by the custom connectors provide rich insights into their performance and health of the connectors. In this post, we demonstrate how you can export the JMX metrics for Debezium connector when used with MSK Connect.

Understanding JMX

Before we dive deep into exporting JMX metrics, let’s understand how JMX works. JMX is a technology that you can use to monitor and manage Java applications. Key components involved in JMX monitoring are:

  • Managed beans (MBeans) are Java objects that represent the metrics of the Java application being monitored. They contain the actual data points of the resources being monitored.
  • JMX server creates and registers the MBeans with the PlatformMBeanServer. The Java application that is being monitored acts as the JMX server and exposes the MBeans.
  • MBeanServer or JMX registry is the central registry that keeps track of all the registered MBeans in the JMX server. It is the access point for all the MBeans within the Java virtual machine (JVM).
  • JMXConnectorServer acts as a bridge between the JMX client and the JMX server and enables remote access to the exposed MBeans. JMXConnectorServerFactory creates and manages the JMXConnectorServer. It allows for the customization of the server’s properties and uses the JMXServiceURL to define the endpoint where the JMX client can connect to the JMX server.
  • JMXServiceURL provides the necessary information such as the protocol, host, and port for the client to connect to the JMX server and access the desired MBeans.
  • JMX client is an external application or tool that connect to the JMX server to access and monitor the exposed metrics.

JMX monitoring involves the steps shown in the following figure:

JMX architecture diagram showing connection flow from client to server with MBeans

JMX monitoring steps include:

  1. The Java application acting as the JMX server creates and configures MBeans for the desired metrics.
  2. JMX server registers the MBeans with the JMX registry.
  3. JMXConnectorServerFactory creates the JMXConnectorServer that defines the JMXServiceURL that provides the entry point details for the JMX client.
  4. JMXClient connects to the JMX registry in the JMX server using the JMXServiceURL and the JMXConnectorServer.
  5. The JMX server handles client requests, interacting with the JMX registry to retrieve the MBean data.

Solution overview

This method of wrapping supported Kafka connectors with custom code that exposes connector-specific operational metrics enables teams to get better insights by correlating various connector metrics with cloud-centered metrics in monitoring systems such as Amazon CloudWatch. This approach enables consistent monitoring across different components of the change data capture (CDC) pipeline, ultimately feeding metrics into unified dashboards while respecting each connector’s architectural philosophy. The consolidated metrics can be delivered to CloudWatch or the monitoring tool of your choice including partner specific application performance management (APM) tools such as Datadog, New Relic, and so on.

We have the working implementation of this same approach with two popular connectors: Debezium source connector and MongoDB Sink Connector. You can find the Github sample and ready to use plugins built for each in the repository. Review the README file for this custom implementation for more details.

For example, our custom implementation for the MongoDB Sink Connector adds a metrics export layer that calculates critical performance indicators such as latest-kafka-time-difference-ms – which measures the latency between Kafka message timestamps and connector processing time by subtracting the connector’s current clock time from the last received record’s timestamp. This custom wrapper around the MongoDB Sink Connector enables exporting relevant JMX metrics and publishing them as custom metrics to CloudWatch. We’ve open sourced this solution on GitHub, along with a ready-to-use plugin and detailed configuration guidance in the README.

CDC is the process of identifying and capturing changes made in a database and delivering those changes in real time to a downstream system. Debezium is an open source distributed platform built on top of Apache Kafka that provides CDC functionality. It provides a set of connectors to track and stream changes from databases to Kafka.

In the next section, we dive deep into the implementation details of how to export JMX metrics from Debezium MySQL Connector deployed as a custom plugin in Amazon MSK Connect. The connector plugin takes care of creating and configuring the MBeans and registering them with the JMX registry.

The following diagram shows the workflow of using Debezium MySQL Connector as a custom plugin in Amazon MSK Connect for CDC from an Amazon Aurora MySQL-Compatible Edition data source.

Data flow diagram illustrating custom Amazon MSK Connect plugin integrating Aurora, Kafka, and CloudWatch metrics

  1. MySQL binary log (binlog) is enabled in Amazon Aurora for MySQL to record all the operations in the order in which they are committed to the database.
  2. The Debezium connector plugin component of the MSK Connect custom plugin continuously monitors the MySQL database, captures the row-level changes by reading the MySQL bin logs, and streams them as change events to Kafka topics in Amazon MSK.
  3. We’ll build a custom module to enable JMX monitoring on the Debezium connector. This module will act as a JMX client to retrieve the JMX metrics from the connector and publish them as custom metrics to CloudWatch.

The Debezium connector provides three types of metrics in addition to the built-in support for default Kafka and Kafka Connect JMX metrics.

  • Snapshot metrics provide information about connector operation while performing a snapshot.
  • Streaming metrics provide information about connector operation when the connector is reading the binlog.
  • Schema history metrics provide information about the status of the connector’s schema history.

In this solution, we export the MilliSecondsBehindSource streaming metrics emitted by the Debezium MySQL connector. This metric provides the number of milliseconds that the connector is lagging behind the change events in the database.

Prerequisites

Following are the prerequisites you need:

  • Access to the AWS account where you want to set up this solution.
  • You have set up the source database and MSK cluster by following this setup instructions in the MSK Connect workshop.

Create a custom plugin

Creating a custom plugin for Amazon MSK Connect for the solution involves the following steps:

  1. Create a custom module: Create a new Maven module or project that will contain your custom code to:
    1. Enable JMX monitoring in the connector application by starting the JMX server.
    2. Create a Remote Method Invocation (RMI) registry to enable the access to the JMX metrics to the clients.
    3. Create a JMX metrics exporter to query the JMX metrics by connecting to the JMX server and push the metrics to CloudWatch as custom metrics.
    4. Schedule to run the JMX metrics exporter at a configured interval.
  2. Package and deploy the custom module as an MSK Connect custom plugin.
  3. Create a connector using the custom plugin to capture CDC from the source, stream it and validate the metrics in Amazon CloudWatch.

This custom module extends the connector functionality to export the JMX metrics without requiring any changes in the underlying connector implementation. This helps ensure that upgrading the custom plugin requires only upgrading the plugin version in the pom.xml of the custom module.

Let’s deep dive and understand the implementation of each step mentioned above.

1. Create a custom module

Create a new Maven project with dependencies on Debezium MySQL Connector to enable JMX monitoring, Kafka Connect API for configuration, and CloudWatch AWS SDK to push the metrics to CloudWatch.
Set up a JMX connector server to enable JMX monitoring: To enable JMX monitoring, the JMX server needs to be started at the time of initializing the connector. This is usually done by setting the environment variables with JMX options as described in Monitoring Debezium. In the case of an Amazon MSK Connect custom plugin, JMX monitoring is enabled programmatically at the time of connector plugin initialization. To achieve this:

  • Extend the MySqlConnector class and override the start which is the connector’s entry point to execute custom code.
public class DebeziumMySqlMetricsConnector extends MySqlConnector{
@Override
	public void start(Map<String, String> props) {
  • In the start method of the custom connector class (DebeziumMySqlMetricsConnector) that we are creating, set the following parameters to allow customization of the JMX Server properties by retrieving connector configuration from a config file.

connect.jmx.port – The port number on which the RMI registry needs to be created. JMXConnectorServer would listen to the incoming connections on this port.

database.server.name – Name of the database that is the source for the CDC.

It also retrieves the CloudWatch configuration related properties that will be used while pushing the JMX metrics to CloudWatch.

cloudwatch.namespace.name – CloudWatch NameSpace to which the metrics need to be pushed as custom metrics

cloudwatch.region – CloudWatch Region where the custom namespace is created in your AWS account

connectJMXPort = Integer.parseInt(props.getOrDefault(CONNECT_JMX_PORT_KEY, String.valueOf(DEFAULT_JMX_PORT)));
databaseServerName = props.getOrDefault(DATABASE_SERVER_NAME_KEY, "");
cwNameSpace = props.getOrDefault(CW_NAMESPACE_KEY, DEFAULT_CW_NAMESPACE);
cwRegion = props.getOrDefault(CW_REGION_KEY, null);
  • Create an RMI registry on the specified port (connectJMXPort). This registry is used by the JMXConnectorServer to store the RMI objects corresponding to the MBeans in the JMX registry. This allows the JMX clients to look up and access the MBeans on the PlatformMBeanServer.

LocateRegistry.createRegistry(connectJMXPort);

  • Retrieve the PlatformMBeanServer and construct the JMXServiceURL which is in the format service:jmx:rmi://localhost/jndi/rmi://localhost:<<jmx.port>>/jmxrmi. Create a new JMXConnectorServer instance using the JMXConnectorServerFactory and the JMXServiceURL and start the JMXConnectorServer instance.
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
String jmxServiceURL = String.format(JMX_URL_TEMPLATE, connectJMXPort);
JMXServiceURL url = new JMXServiceURL(jmxServiceURL);
JMXConnectorServer svr = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);
svr.start();

Implement JMX metrics exporter: Create a JMX client to connect to the JMX server, query the MilliSecondBehindSource metric from the JMX server, convert it into the required format, and export it to CloudWatch.

  • Connect to the JMX Server using the JMXConnectorFactory and JMXServiceURL
JMXServiceURL jmxUrl = new JMXServiceURL(String.format(JMX_URL_TEMPLATE,DebeziumMySqlMetricsConnector.getConnectJMXPort()));
JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null);
jmxConnector.connect();
  • Query the MBean object that holds the corresponding metric, for example, MilliSecondsBehindSource, and retrieve the metric value using sample code provided in msk-connect-custom-plugin-jmx. (you can choose one or more metrics).
  • Schedule the execution of your JMX metrics exporter at regular intervals.

getScheduler().schedule(new JMXMetricsExporter(), SCHEDULER_INITIAL_DELAY, SCHEDULER_PERIOD);

Export metrics to CloudWatch: Implement the logic to push relevant JMX metrics to CloudWatch. You can use the AWS SDK for Java to interact with the CloudWatch PutMetricData API or use the CloudWatch Logs subscription filter to ingest the metrics from a dedicated Kafka topic.

Dimension dimension = Dimension.builder()
.name("DBServerName")
.value(DebeziumMySqlMetricsConnector.getDatabaseServerName())
.build();
MetricDatum datum = MetricDatum.builder()
	     .metricName("MilliSecondsBehindSource")
	     .unit(StandardUnit.NONE)
	     .value(Double.valueOf(msBehindSource))
	     .timestamp(instant)
	     .dimensions(dimension).build();
PutMetricDataRequest request = PutMetricDataRequest.builder()
	  .namespace(DebeziumMySqlMetricsConnector.getCWNameSpace())
	  .metricData(datum).build();
cw.putMetricData(request);

For more information, see the sample implementation for the custom module in aws-samples in GitHub. This sample also provides custom plugins packaged with two different versions of Debezium MySQL connector (debezium-connector-mysql-2.5.2.Final-plugin and debezium-connector-mysql-2.7.3.Final-plugin) and the following steps would explain the steps to build a custom plugin using your custom code.

2. Package the custom module and Debezium MySQL connector as a custom plugin

Build and package the Maven project with the custom code as a JAR file and include the JAR file in the debezium-connector-mysql-2.5.2.Final-plugin folder downloaded from maven repo. Package the updated debezium-connector-mysql-2.5.2.Final-plugin as a ZIP file (Amazon MSK Connect accepts custom plugins in ZIP or JAR format). Alternatively, you can use the prebuiltcustom-debezium-mysql-connector-plugin.zip available in GitHub.

Choose the Debezium connector version (2.5 or 2.7) that fits your requirement.

When you have to upgrade to a new version of the Debezium MySQL connector, you can update the version of the dependency and build the custom module and deploy it. By doing this, you can maintain the custom plugin without modifying the original connector code. The GitHub samples provide ready-to-use plugins for two Debezium connector versions. However, you can follow the same approach to upgrade to the latest connector version as well.

Create a custom plugin in Amazon MSK

  1. If you have set up your AWS resources by following the Getting Started lab, open Amazon S3 console and locate the bucket msk-lab-${ACCOUNT_ID}-plugins-bucket/debezium .
  2. Upload the custom plugin created in the previous section custom-debezium-mysql-connector-plugin.zip to msk-lab-${ACCOUNT_ID}-plugins-bucket/debezium, as shown in the following figure.

msk-lab-s3-plugin-bucket

  1. Switch to the Amazon MSK console and choose Custom plugins in the navigation pane. Choose Create custom plugin and, browse the S3 bucket that you created above and select the custom plugin ZIP file you just uploaded.

custom-connector-plugin-s3-object

  1. Enter custom-debezium-mysql-connector-plugin for the plugin name. Optionally, enter a description and choose Create Custom Plugin.

msk-connect-create-custom-plugin-console

  1. After a few seconds you should see the plugin is created and the status is Active.
  2. Customize the worker configuration for the connector by following the instructions in the Customize worker configuration lab.

3. Create an Amazon MSK connector

The next step is to create an MSK connector.

  1. From the MSK section choose Connectors, then choose Create connector. Choose custom-debezium-mysql-connector-plugin from the list of Custom plugins, then choose Next.

msk-plugin-create

  1. Enter custom-debezium-mysql-connector in the Name textbox, and a description for the connector.

connector-properties-console-in-MSK-connect

  1. Select the MSKCluster-msk-connect-lab from the listed MSK clusters. From the Authentication dropdown, select IAM.
  2. Copy the following configuration and paste it in the connector configuration textbox.
  • Replace the <Your Aurora MySQL database endpoint>, <Your Database Password>, <Your MSK Bootstrap Server Address>, and <Your CloudWatch Region>placeholders with the corresponding details for the resources in your account.
  • Review the topic.prefix, database.user, topic.prefix, database.server.id, database.server.name, database.port, database.include.listparameters in the configuration. These parameters are configured with the values used in the workshop. Update them with the details corresponding to your configuration if you have customized it in your account.
  • Note that the connector.classparameter is updated with the qualified name of the subclass of MySqlConnector class that you created in the custom module.
  • The connect.jmx.portparameter specifies the default port to start the JMX server. You can configure this to any available port.
connector.class=com.amazonaws.msk.debezium.mysql.connect.DebeziumMySqlMetricsConnector tasks.max=1
include.schema.changes=true
topic.prefix=salesdb
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
database.user=master
database.server.id=123456
database.server.name=salesdb
database.port=3306
key.converter.schemas.enable=false
database.hostname=<Your Aurora MySQL database endpoint>
database.password=<Your Database Password>
value.converter.schemas.enable=false
database.include.list=salesdb
schema.history.internal.kafka.topic=internal.dbhistory.salesdb
schema.history.internal.kafka.bootstrap.servers=<Your MSK Bootstrap Server Address>
schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.security.protocol=SASL_SSL
schema.history.internal.producer.security.protocol=SASL_SSL
connect.jmx.port=7098
cloudwatch.namespace.name=MSK_Connect
cloudwatch.region=<Your CloudWatch Region>

connector-properties-configuration-settings

5. Follow the remaining instructions from the Create MSK Connector lab and create the connector. Verify that the connector status changes to Running.

Debezium MySQL custom connector version (2.7.3) provides additional flexibility to configure optional properties that can be added to your MSK connector configuration and selectively include and exclude metrics to emit to CloudWatch. The following are the example configuration properties that can be used with version 2.7.3 :

  • cloudwatch.debezium.streaming.metrics.include – A comma-separated list of streaming metrics type that must be exported to CloudWatch as custom metrics.
  • cloudwatch.debezium.streaming.metrics.exclude – Specify a comma-separated list of streaming metrics types to exclude from being sent to CloudWatch as custom metrics.
  • Similarly include and exclude properties for snapshot metrics type are cloudwatch.debezium.snapshot.metrics.include and cloudwatch.debezium.snapshot.metrics.exclude
  • Include and exclude properties for schema history metrics type are cloudwatch.debezium.schema.history.metrics.include and cloudwatch.debezium.schema.history.metrics.exclude

The following is a sample configuration excerpt.

  "cloudwatch.debezium.streaming.metrics.include": "LastTransactionId, TotalNumberOfEventsSeen, MilliSecondsBehindSource,CapturedTables",
  "cloudwatch.debezium.streaming.metrics.exclude": "LastTransactionId",
  "cloudwatch.debezium.schema.history.metrics.exclude": "MilliSecondsSinceLastAppliedChange",

Review the GitHub README file for more details on the use of these properties with MSK connector configurations.

Verify the replication in the Kafka cluster and CloudWatch metrics

Follow the instructions in the Verify the replication in the Kafka cluster lab to set up a client and make changes to the source DB and verify that the changes are captured and sent to Kafka topics by the connector.

To verify that the connector has published the JMX metrics to CloudWatch, go to the CloudWatch console and choose Metrics in the navigation pane, then choose All Metrics. Under Custom namespace, you can see MSK_Connect with database name as the dimension. Select the database name to view the metrics.

Amazon CloudWatch interface with time series graph and MSK Connect metric details

Select the MilliSecondBehindSource metric with statistic as Average in the Graphed Metric to plot the graph. You can verify that the MilliSecondBehindSource metric value is greater than zero whenever any operation is being performed on the source database and returns to zero during the idle time.

 Amazon CloudWatch console showing custom metric visualization with detailed controls and timeline analysis

Clean up

Delete the resources that you created such as the Aurora DB, Amazon MSK Cluster and connectors by following the instructions at Cleanup in the Amazon MSK Connect lab if you have been following along to set up the solution on your account.

Conclusion

In this post, we showed you how to extend the Debezium MySQL connector plugin with an additional module to export the JMX metrics to CloudWatch as custom metrics. As a next step, you can create a CloudWatch alarm to monitor the metrics and take remediation actions when the alarm is triggered. In addition to exporting the JMX metrics to CloudWatch, you can export these metrics to third-party applications such as Prometheus or DataDog using CloudWatch Metric Streams. You can follow a similar approach to export the JMX metrics of other connectors from MSK Connect. You can learn more about creating your own connectors by visiting the Connector Developer Guide and how to deploy them as custom plugins in the MSK Connect documentation.


About the authors

Jaydev NathJaydev Nath is a Solutions Architect at AWS, where he works with ISV customers to build secure, scalable, reliable, and cost-efficient cloud solutions. He brings strong expertise in building SaaS architecture on AWS with a focus on Generative AI and data analytics technologies to help deliver practical, valuable business outcomes for customers.

David John Chakram is a Principal Solutions Architect at AWS. He specializes in building data platforms and architecting seamless data ecosystems. With a profound passion for databases, data analytics, and machine learning, he excels at transforming complex data challenges into innovative solutions and driving businesses forward with data-driven insights.

Sharmila Shanmugam is a Solutions Architect at Amazon Web Services. She is passionate about solving the customers’ business challenges with technology and automation and reduce the operational overhead. In her current role, she helps customers across industries in their digital transformation journey and build secure, scalable, performant and optimized workloads on AWS.

How Karrot built a feature platform on AWS, Part 2: Feature ingestion

Post Syndicated from Hyeonho Kim original https://aws.amazon.com/blogs/architecture/how-karrot-built-a-feature-platform-on-aws-part-2-feature-ingestion/

This post is co-written with Hyeonho Kim, Jinhyeong Seo and Minjae Kwon from Karrot.

In Part 1 of this series, we discussed how Karrot developed a new feature platform, which consists of three main components: feature serving, a stream ingestion pipeline, and a batch ingestion pipeline. We discussed their requirements, the solution architecture, and feature serving using a multi-level cache. In this post, we share the stream and batch ingestion pipelines and how they ingest data into an online store from various event sources.

Solution overview

The following diagram illustrates the solution architecture, as introduced in Part 1.

Stream ingestion

Stream ingestion is the process of collecting data from various event sources in real time, transforming it into features, and storing them. It consists of two main components:

Consumers handle not only the source events, but also re-published events. When loading features, they are performed by considering different strategies, such as write-through and write-around, and are loaded in detail considering cardinality, data size, and access patterns.

Most features are generated based on two types of events: events that occur due to real-time user actions, and asynchronous events that occur due to state changes in user and article data. These events and features have an M:N relationship, meaning one event can be the source of multiple features, and one feature can be generated based on multiple events.

The following diagram illustrates the architecture of the stream ingestion pipeline.

To efficiently handle M:N relationships, a structure was needed to receive events and distribute them to multiple feature processing logics. Two core components were designed for this purpose:

  • Dispatcher – Receives events from multiple consumer groups and propagates them to relevant feature processing logic
  • Aggregator – Processes events received from the dispatcher into actual features

This stream processing pipeline enables real-time feature generation and storage.

Message broker optimization: Fast at-least-once delivery

The feature platform processes up to 25,000 events per second, including user behavior log events, at high speed. However, when worker traffic surges, event processing failures or infrastructure failures occasionally cause event loss. To solve this problem, the existing automatic commit mode was changed to manual commit in Amazon MSK. This allows events to be committed only when they are definitely processed, and failed events are sent to a separate retry topic and postprocessed through a dedicated worker.

However, processing large volumes of events synchronously with manual commit resulted in approximately 10 times slower processing speed and increased latency. Although consumer group resources were available, simply increasing the number of partitions in Amazon MSK wasn’t a solution due to team-specific partitioning permissions. The platform designed parallel processing within single partitions and implemented a custom consumer supporting retry functionality. The core of the implementation is to read as many messages from the partition as the fetch size at a time and process them by spawning worker threads in parallel for each message. When processing is complete, the offsets of successful messages are sorted and a manual commit is performed for the largest offset, and failed messages are republished to the retry topic. This enables parallel processing even in a single partition, and the concurrency can be controlled automatically. As a result, the event processing speed is faster than the existing automatic commit method, and it is stably processed without delay even when the number of events increases.

Stream processing

The stream ingestion pipeline performs only simple extract, transform, and load (ETL) logic and validation. There were already many requirements for complex stream processing in the feature platform, and a separate service was created to accommodate them. The feature platform didn’t address these requirements for the following reasons:

  • The purpose of stream ingestion in the feature platform is to collect and store features in real time, whereas the main purpose of stream processing is to process data.
  • Not all features require complex processing. We decided that it wasn’t appropriate to make the entire stream collection process complicated for some features.
  • The result data of stream processing could be used outside the feature platform, and there were requirements to consider this. Therefore, creating a separate service was more suitable for Karrot’s situation.
  • Additionally, some source data didn’t exist in AWS, which could have resulted in significant additional costs if everything was handled within the feature platform.

Although it’s a separate service from the feature platform, the following is a brief introduction to how the feature platform uses data through stream processing:

  • Various content embedding cases – We perform stream processing using models, and use various contents (articles, images, and so on) as input values to pre-trained models to create embeddings. These embeddings are stored in the feature platform and used as features during recommendation to improve recommendation quality.
  • Rich feature generation cases – Some of the processed data is further processed using large language models (LLMs) for use as features. One example is predicting which category a specific second-hand product belongs to and using this prediction value as a feature.

Batch ingestion

Batch ingestion is responsible for processing and storing large amounts of data into features in batches. This is divided into a cron job that runs periodically and a backfill job that loads large amounts of data one time.

For this purpose, AWS Batch based on AWS Fargate is used. AWS Batch jobs running on Fargate are provisioned independently from the other environments, enabling safe large-scale processing. For example, even if more than 1,000 servers or 10,000 vCPUs are used for backfilling large amounts of data, they are operated separately from the other services and can be operated efficiently with a usage-based billing method.

When adding new features, batch loading of past data or periodic loading of large amounts of data is one of the core functions of the feature platform. The main requirements considered in the design are as follows:

  • It must be able to process large amounts of data.
  • It must be able to start at the time desired by the user and finish the work within an appropriate time.
  • It must have low operating costs. It should be a managed service if possible, and it’s better if there is less additional work or specific domain knowledge for operation. Also, it should reuse existing service code as much as possible.
  • Complex operations for features or the configuration of Directed Acyclic Graphs (DAGs) are not necessarily required.

There were several options to choose from, such as Apache Airflow, but AWS Batch was chosen to avoid over-engineering considering the operating cost according to the current requirements.

The following diagram illustrates the architecture of the batch ingestion pipeline.

The key components are as follows:

  • Scheduler – It extracts the targets that need to perform the batch jobs according to the specifications such as FeatureGroupSpec and IngestionSpec written by the user on the feature platform, and registers the corresponding job specifications to an AWS Batch job (submit job).
  • AWS Batch – The jobs submitted by the scheduler are executed using the preconfigured job queue and computing environment. In the case of AWS Batch, you can configure a Fargate environment separately from the other production services, so that even if you provision large-scale resources and perform tasks, you can perform tasks stably without affecting the other production services.

Future improvements for batch ingestion

The current configuration works well and reliably, but there are some areas for improvement:

  • No DAG support – The initial feature platform performed relatively simple tasks, such as parsing batch data sources, converting them to the feature schema, and storing them. However, as the platform became more advanced, more complex operations became necessary, and therefore support for DAG configurations that can process features by sequentially performing various dependent jobs became necessary.
  • Manual configuration for parallel processing – Currently, when processing large-scale data in parallel, the worker must manually estimate the number of jobs to be processed in parallel and provide it in the specification, and the scheduler performs a submit job in parallel based on this. This method is based purely on experience, and in order for the system to become more advanced, the system must be able to automatically abstract and optimize the appropriate level of parallel processing.
  • Limited AWS Batch monitoring usability – AWS Batch monitoring has some limitations, such as jobs don’t transition from Runnable to Running state, a lack of appropriate notification systems for such cases, and the inability to directly check failed jobs through URL parameters when receiving alerts. These aspects should be improved from an operational convenience perspective.

Results

As of February 2025, Karrot has addressed the major problems mentioned in the early stages of feature platform development:

  • Decoupling recommendation logic from flea market server – The recommendation system now uses the feature platform across more than 10 different recommendation spaces and services.
  • Securing scalability of features used in recommendation logic – With more than 1,000 high-quality and rich features acquired from various services such as flea market, advertisements, local jobs, and real estate, we are contributing to the advancement of recommendation logic and making it straightforward for all Karrot engineers to explore and add features.
  • Maintaining the reliability of feature data sources – Through the feature platform, we are providing reliable data using a consistent schema and ingestion pipeline.

Karrot engineers are continuously improving the user experience by advancing recommendations through high-quality features through the feature platform. This has contributed to increasing click-through rates by 30% and conversion rates by 70% compared to before by recommending articles that users might be interested in.

This was possible because the AWS services used in the feature platform were firmly supporting it. Amazon DynamoDB has amazing scalability in all aspects of read, write, and storage, so it was possible to handle dynamically changing workloads without incurring separate operating costs. Amazon ElastiCache showed highly reliable service stability, so we could use it with confidence. In addition, it was straightforward and stable to scale up, down, in, and out, so it was possible to reduce the operational burden. It also seamlessly integrated with the ecosystem of Redis OSS, so we could use open source ecosystems such as Redis Exporter. Amazon MSK also supports reliable operation and seamless integration with the Apache Kafka ecosystem, making the development and operation of the feature platform effortless.

Furthermore, working with AWS enables cost-efficient operations based on their various support and expertise. Recently, we had an over-provisioning problem with our ElastiCache cluster. Right-sizing our ElastiCache cluster with various experts (including Solutions Architects) made it possible to optimize ElastiCache costs by nearly 40%. Such technical human resources from AWS have been invaluable in operating the feature platform using AWS products.

Conclusion

In this series, we discussed how Karrot built a feature platform on AWS. We believe that by combining AWS services and our experience, you can develop and operate a feature store without difficulty by modifying it to suit your company’s requirements. Try out this implementation and let us know your thoughts in the comments.


About the authors

Near real-time streaming analytics on protobuf with Amazon Redshift

Post Syndicated from Konstantinos Tzouvanas original https://aws.amazon.com/blogs/big-data/near-real-time-streaming-analytics-on-protobuf-with-amazon-redshift/

Organizations must often deal with a vast array of data formats and sources in their data analytics workloads. This range of data types, such as structured relational data, semi-structured formats like JSON and XML and even binary formats like Protobuf and Avro, has presented new challenges for companies looking to extract valuable insights.

Protocol Buffers (protobuf) has gained significant traction in industries that require efficient data serialization and transmission, particularly in streaming data scenarios. Protobuf’s compact binary representation, language-agnostic nature, and strong typing make it an attractive choice for companies in sectors such as finance, gaming, telecommunications, and ecommerce, where high-throughput and low-latency data processing is crucial.

Although protobuf offers advantages in efficient data serialization and transmission, its binary nature poses challenges when it comes to analytics use cases. Unlike formats like JSON or XML, which can be directly queried and analyzed, protobuf data requires an additional deserialization step to convert it from its compact binary format into a structure suitable for processing and analysis. This extra conversion step introduces complexity into data analytics pipelines and tools. It can potentially slow down data exploration and analysis, especially in scenarios where near real-time insights are crucial.

In this post, we explore an end-to-end analytics workload for streaming protobuf data, by showcasing how to handle these data streams with Amazon Redshift Streaming Ingestion, deserializing and processing them using AWS Lambda functions, so that the incoming streams are immediately available for querying and analytical processing on Amazon Redshift.

The solution provides a solid foundation for handling protobuf data in Amazon Redshift. You can further enhance the architecture to support schema evolution by incorporating AWS Glue Schema Registry. By integrating the AWS Glue Schema Registry, you can make sure your Lambda function uses the latest schema version for deserialization, even as your data structure changes over time. However, for the purpose of this post and to maintain simplicity, we focus on demonstrating how to invoke Lambda from Amazon Redshift to convert protobuf messages to JSON format, which serves as a solid foundation for handling binary data in near real-time analytics scenarios.

Solution overview

The following architecture diagram describes the AWS services and features needed to set up a fully functional protobuf streaming ingestion pipeline for near real-time analytics.

Protobuf deserialization flow

The workflow consists of the following steps:

  1. An Amazon Elastic Compute Cloud (Amazon EC2) event producer generates events and forwards them to a message queue. The events are created and serialized using protobuf.
  2. A message queue using Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Amazon Kinesis accepts the protobuf messages sent by the event producer. For this post, we use Amazon MSK Serverless.
  3. A Redshift cluster (provisioned or serverless), in which a materialized view with an external schema is configured, points to the message queue. For this post, we use Amazon Redshift Serverless.
  4. A Lambda protobuf deserialization function is triggered by Amazon Redshift during ingestion and deserializes protobuf data into JSON data.

Schema

To showcase protobuf’s deserialization functionality, we use a sample protobuf schema that represents a financial trade transaction. This schema will be used across the AWS services mentioned in this post.

// trade.proto 
syntax = "proto3"; 
message Trade{
   int32 userId = 1;   
   string userName = 2;   
   int32 volume = 3;   
   int32 pair = 4;   
   int32 action = 5;
   string TimeStamp = 6;
}

Amazon Redshift materialized view

In order for Amazon Redshift to ingest streaming data from Amazon MSK or Kinesis, an appropriate role needs to be assigned to Amazon Redshift and a materialized view needs to be properly defined. For detailed instructions on how to accomplish this, refer to Streaming ingestion to a materialized view or Simplify data streaming ingestion for analytics using Amazon MSK and Amazon Redshift.

In this section, we focus on the materialized view definition that makes it possible to deserialize protobuf data. Our example focuses on streaming ingestion from Amazon MSK. Typically, the materialized view ingests the Kafka metadata fields and the actual data (kafka_value) like in the following example:

CREATE MATERIALIZED VIEW trade_events AUTO REFRESH YES AS 
SELECT
     kafka_partition,     
     kafka_offset,
     kafka_timestamp_type,
     kafka_timestamp,
     kafka_key,
     JSON_PARSE(kafka_value) as Data,
     kafka_headers 
FROM     
     "dev"."msk_external_schema"."entity" 
WHERE     
     CAN_JSON_PARSE(kafka_value)

When the incoming kafka_value is of type JSON, you can apply the built-in JSON_PARSE function and create a column of type SUPER so you can directly query the data.

Amazon Redshift Lambda user-defined function

In our case, accepting protobuf encoded data requires some additional steps. The first step is to create an Amazon Redshift Lambda user-defined function (UDF). This Amazon Redshift function is the link to a Lambda function that executes the actual deserialization. This way, when data is ingested, Amazon Redshift calls the Lambda function for deserialization.

Creating or updating our Amazon Redshift Lambda UDF is straightforward, as illustrated in the following code. Additional examples are available in the GitHub repo.

CREATE OR REPLACE EXTERNAL FUNCTION f_deserialize_protobuf(VARCHAR(MAX)) 
RETURNS VARCHAR(MAX) IMMUTABLE 
LAMBDA 'f-redshift-deserialize-protobuf' IAM_ROLE ':RedshiftRole';

Because Lambda functions don’t (at the time of writing) accept binary data as input, you must first convert incoming binary data to its hex representation, prior to calling the function. You can do this by using the TO_HEX Amazon Redshift function.
Considering the hex conversation and with the Lambda UDF available, you can now use it in your materialized view definition:

CREATE MATERIALIZED VIEW trade_events AUTO REFRESH YES AS 
SELECT      
       kafka_partition,
       kafka_offset,
       kafka_timestamp_type,
       kafka_timestamp,
       kafka_key,
       kafka_value,
       kafka_headers,
       JSON_PARSE(f_deserialize_protobuf(to_hex(kafka_value)))::super as json_data 
FROM      
       "dev"."msk_external_schema"."entity";

Lambda layer

Lambda functions require access to appropriate protobuf libraries, so that deserialization can take place. You can implement this through a Lambda layer. The layer is provided as a zip file, respecting the following folder structure, and contains the protobuf library, its dependencies, and user-provided code inside the custom folder, which includes the protobuf generated classes:

python
      custom
      google
      Protobuf-4.25.2.dist-info

Because we implemented the Lambda functions in Python, the root folder of the zip file is the python folder. For additional languages, refer to the documentation on how to properly structure your folder structure.

Lambda function

A Lambda function converts incoming protobuf records to JSON records. As a first step, you must import your custom classes from the lambda Layer custom folder:

# Import generated protobuf classes
 from custom import trade_pb2

You can now deserialize incoming hex encoded binary data to objects. This is implemented in a two-step process. The first step is to decode the hex encoded binary data:

 # convert incoming hex data to binary  
binary_data = bytes.fromhex(record)

Next, you instantiate the protobuf defined classes and execute the actual deserialization process using the protobuf library method ParseFromString:

 # Instantiate class  
trade_event = trade_pb2.Trade()
               
# Deserialize into class  
trade_event.ParseFromString(binary_data)

After you run deserialization and instantiate your objects, you can convert to other formats. In our case, we serialize into JSON format, so that Amazon Redshift ingests the JSON content in a single field of type SUPER:

# Serialize into json  
elems = trade_event.ListFields() 
fields = {} 
for elem in elems:     
       fields[elem[0].name] = elem[1] 
json_elem = json.dumps(fields)

Combining these steps together, the Lambda function should look as follows:

import json

# Import the generated protobuf classes
from custom import trade_pb2  

def lambda_handler(event, context):
    
    results = []
    
    recordSets = event['arguments']
    for recordSet in recordSets:
        for record in recordSet:

            # convert incoming hex data to binary data
            binary_data = bytes.fromhex(record)
            
            # Instantiate class
            trade_event = trade_pb2.Trade()
            
            # Deserialize into class
            trade_event.ParseFromString(binary_data)
            
            # Serialize into json 
            elems = trade_event.ListFields()
            fields = {}
            for elem in elems:
                fields[elem[0].name] = elem[1]
            json_elem = json.dumps(fields)

            # Append to results            
            results.append(json_elem)
    
    print('OK')
    
    return json.dumps({"success": True,"num_records": len(results),"results": results})

Batch mode

In the preceding code sample, Amazon Redshift is calling our function in batch mode, meaning that a number of records are sent during a single Lambda function call. More specifically, Amazon Redshift is batching records into the arguments property of the request. Therefore, you must loop through the incoming array of data and apply your deserialization logic per record. At the time of writing, this behavior is internal to Amazon Redshift and can’t be configured or controlled through a configuration option. An Amazon Redshift streaming consumer client will read new records on the message queue since the last time it read. The following is a sample of the payload the Lambda handler function receives:

     "user": "IAMR:Admin",
     "cluster": "arn:aws:redshift:*********************************",
     "database": "dev",
     "external_function": "fn_lambda_protobuf_to_json",
     "query_id": 5583858,
     "request_id": "17955ee8-4637-42e6-897c-5f4881db1df5",     
     "arguments": [         
         [             
"088a1112087374723a3231383618c806200128093217323032342d30332d32302031303a34363a33382e363932"         ],         [             "08a74312087374723a3836313518f83c200728093217323032342d30332d32302031303a34363a33382e393031"         ],         [             "08b01e12087374723a3338383818f73d20f8ffffffffffffffff0128053217323032342d30332d32302031303a34363a33392e303134"         
]     
],     
      "num_records":3 
}

Insights from ingested data

With your data stored in Amazon Redshift after the deserialization process, you can now execute queries against your streaming data and directly gain insights. In this section, we present some sample queries to illustrate functionality and behavior.

Examine lag query

To examine the difference between the most recent timestamp value of our streaming source vs. the current date/time (wall clock), we calculate the most recent point in time at which we ingested data. Because streaming data is expected to flow into the system continuously, this metric also reveals the ingestion lag between our streaming source and Amazon Redshift.

select top 1      
      (GETDATE() - kafka_timestamp) as ingestion_lag 
from     
      trade_events 
order by
      kafka_timestamp desc

Examine content query: Fraud detection on an incoming stream

By applying the query functionality available in Amazon Redshift, we can discover behavior hidden in our data in real time. With the following query, we try to match opposite trade volumes played by different users during the last 5 minutes that result in a zero sum game and could support a potential fraud detection concept:

select  
json_data.volume, 
LISTAGG(json_data.userid::int, ', ') as users, 
LISTAGG(json_data.pair::int, ', ') as pairs 
from     
        trade_events 
where      
        trade_events.kafka_timestamp >= DATEADD(minute, -5, GETDATE()) 
group by      
        json_data.volume 
having      
        sum(json_data.pair) = 0  
and min(abs(json_data.pair)) = max(abs(json_data.pair)) 
and count(json_data.pair) > 1

This query is a rudimentary example of how we can use live data to protect systems from fraudsters.

For a more comprehensive example, see Near-real-time fraud detection using Amazon Redshift Streaming Ingestion with Amazon Kinesis Data Streams and Amazon Redshift ML. In this use case, an Amazon Redshift ML model for anomaly detection is trained using the incoming Amazon Kinesis Data Streams data that is streamed into Amazon Redshift. After sufficient training (for example, 90% accuracy for the model is achieved), the trained model is put into inference mode for inferencing decisions on the same incoming credit card data.

Examine content query: Join with non-streaming data

Having our protobuf records streaming in Amazon Redshift makes it possible to join streaming with non-streaming data. A typical example is combining incoming trades with user information data already recorded in the system. In the following query, we join the incoming stream of trades with user information, like email, to get a list of possible alerts targets:

select   
    user_info.email 
from     
    trade_events
inner join    
    user_info 
on user_info.userId = trade_events.json_data.userid 
where     
    trade_events.json_data.volume > 1000 
and trade_events.kafka_timestamp >= DATEADD(minute, -5, GETDATE())

Conclusion

The ability to effectively analyze and derive insights from data streams, regardless of their format, is crucial for data analytics. Although protobuf offers compelling advantages for efficient data serialization and transmission, its binary nature can pose challenges and perhaps impact performance when it comes to analytics workloads. The solution outlined in this post provides a robust and scalable framework for organizations seeking to gain valuable insights, detect anomalies, and make data-driven decisions with agility, even in scenarios where high-throughput and low-latency processing is crucial. By using Amazon Redshift Streaming Ingestion in conjunction with Lambda functions, organizations can seamlessly ingest, deserialize, and query protobuf data streams, enabling near real-time analysis and insights.

For more information about Amazon Redshift Streaming Ingestion, refer to Streaming ingestion to a materialized view.


About the authors

Konstantinos Tzouvanas is a Senior Enterprise Architect on AWS, specializing in data science and AI/ML. He has extensive experience in optimizing real-time decision-making in High-Frequency Trading (HFT) and applying machine learning to genomics research. Known for leveraging generative AI and advanced analytics, he delivers practical, impactful solutions across industries.

Marios Parthenios is a Senior Solutions Architect working with Small and Medium Businesses across Central and Eastern Europe. He empowers organizations to build and scale their cloud solutions with a particular focus on Data Analytics and Generative AI workloads. He enables businesses to harness the power of data and artificial intelligence to drive innovation and digital transformation.

Pavlos Kaimakis is a Senior Solutions Architect at AWS who helps customers design and implement business-critical solutions. With extensive experience in product development and customer support, he focuses on delivering scalable architectures that drive business value. Outside of work, Pavlos is an avid traveler who enjoys exploring new destinations and cultures.

John Mousa is a Senior Solutions Architect at AWS. He helps power and utilities and healthcare and life sciences customers as part of the regulated industries team in Germany. John has interest in the areas of service integration, microservices architectures, as well as analytics and data lakes. Outside of work, he loves to spend time with his family and play video games.

Optimize traffic costs of Amazon MSK consumers on Amazon EKS with rack awareness

Post Syndicated from Austin Groeneveld original https://aws.amazon.com/blogs/big-data/optimize-traffic-costs-of-amazon-msk-consumers-on-amazon-eks-with-rack-awareness/

Are you incurring significant cross Availability Zone traffic costs when running an Apache Kafka client in containerized environments on Amazon Elastic Kubernetes Service (Amazon EKS) that consume data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) topics?

If you’re not familiar with Apache Kafka’s rack awareness feature, we strongly recommend starting with the blog post on how to Reduce network traffic costs of your Amazon MSK consumers with rack awareness for an in-depth explanation of the feature and how Amazon MSK supports it.

Although the solution described in that post uses an Amazon Elastic Compute Cloud (Amazon EC2) instance deployed in a single Availability Zone to consume messages from an Amazon MSK topic, modern cloud-native architectures demand more dynamic and scalable approaches. Amazon EKS has emerged as a leading platform for deploying and managing distributed applications. The dynamic nature of Kubernetes introduces unique implementation challenges compared to static client deployments. In this post, we walk you through a solution for implementing rack awareness in consumer applications that are dynamically deployed across multiple Availability Zones using Amazon EKS.

Here’s a quick recap of some key Apache Kafka terminology from the referenced blog. An Apache Kafka client consumer will register to read against a topic. A topic is the logical data structure that Apache Kafka organizes data into. A topic is segmented into a single or many partitions. Partitions are the unit of parallelism in Apache Kafka. Amazon MSK provides high availability by replicating each partition of a topic across brokers in different Availability Zones. Because there are replicas of each partition that reside across the different brokers that make up your MSK cluster, Amazon MSK also tracks whether a replica partition is in sync with the most recent data for that partition. This means there is one partition that Amazon MSK recognizes as containing the most up-to-date data, and this is known as the leader partition. The collection of replicated partitions is called in-sync replicas. This list of in-sync replicas is used internally when the cluster needs to elect a new leader partition if the current leader were to become unavailable.

When consumer applications read from a topic, the Apache Kafka protocol facilitates a network exchange to determine which broker currently has the leader partition that the consumer needs to read from. This means that the consumer could be told to read from a broker in a different Availability Zone than itself, leading to cross-zone traffic charge in your AWS account. To help optimize this cost, Amazon MSK supports the rack awareness feature, using which clients can ask an Amazon MSK cluster to provide a replica partition to read from, within the same Availability Zone as the client, even if it isn’t the current leader partition. The cluster accomplishes this by checking for an in-sync replica on a broker within the same Availability Zone as the consumer.

The challenge with Kafka clients on Amazon EKS

In Amazon EKS, the underlying units of computes are EC2 instances that are abstracted as Kubernetes nodes. The nodes are organized into node groups for ease of management, scaling, and grouping of applications on certain EC2 instance types. As a best practice for resilience, the nodes in a node group are spread across multiple Availability Zones. Amazon EKS uses the underlying Amazon EC2 metadata about the Availability Zone that it’s located in, and it injects that information into the node’s metadata during node configuration. In particular, the Availability Zone (AZ ID) is injected into the node metadata.

When an application is deployed in a Kubernetes Pod on Amazon EKS, it goes through a process of binding to a node that meets the pod’s requirements. As shown in the following diagram, when you deploy client applications on Amazon EKS, the pod for the application can be bound to a node with available capacity in any Availability Zone. Also, the pod doesn’t automatically inherit the Availability Zone information from the node that it’s bound to, a piece of information necessary for rack awareness. The following architecture diagram illustrates Kafka consumers running on Amazon EKS without rack awareness.

AWS Cloud architecture showing MSK brokers, EKS pods, and EC2 instances in three Availability Zones

To set the client configuration for rack awareness, the pod needs to know what Availability Zone it’s located in, dynamically, as it is bound to a node. During its lifecycle, the same pod can be evicted from the node it was bound to previously and moved to a node in a different Availability Zone, if the matching criteria permit that. Making the pod aware of its Availability Zone dynamically sets the rack awareness parameter client.rack during the initialization of the application container that is encapsulated in the pod.

After rack awareness is enabled on the MSK cluster, what happens if the broker in the same Availability Zone as the client (hosted on Amazon EKS or elsewhere) becomes unavailable? The Apache Kafka protocol is designed to support a distributed data storage system. Assuming customers follow the best practice of implementing a replication factor > 1, Apache Kafka can dynamically reroute the consumer client to the next available in-sync replica on a different broker. This resilience remains consistent even after implementing nearest replica fetching, or rack awareness. Enabling rack awareness optimizes the networking exchange to prefer a partition within the same Availability Zone, but it doesn’t compromise the consumer’s ability to operate if the nearest replica is unavailable.

In this post, we walk you through an example of how to use the Kubernetes metadata label, topology.k8s.aws/zone-id, assigned to each node by Amazon EKS, and use an open source policy engine, Kyverno, to deploy a policy that mutates the pods that are in the binding state to dynamically inject the node’s AZ ID into the pod’s metadata as an annotation, as depicted in the following diagram. This annotation, in turn, is used by the container to create an environment variable that is assigned the pod’s annotated AZ ID information. The environment variable is then used in the container postStart lifecycle hook to generate the Kafka client configuration file with rack awareness setting. The following architecture diagram illustrates Kafka consumers running on Amazon EKS with rack awareness.

AWS architecture with MSK, EKS, Kyverno, and EC2 across three Availability Zones, detailing topology

Solution Walkthrough

Prerequisites

For this walkthrough, we use AWS CloudShell to run the scripts that are provided inline as you progress. For a smooth experience, before getting started, make sure to have kubectl and eksctl installed and configured in the AWS CloudShell environment, following the installation instructions for Linux (amd64). Helm is also required to be install on AWS CloudShell, using the instructions for Linux.

Also, check if the envsubst tool is installed in your CloudShell environment by invoking:

which envsubst

If the tool isn’t installed, you can install it using the command:

sudo dnf -y install gettext-devel

We also assume you already have an MSK cluster deployed in an Amazon Virtual Private Cloud (VPC) in three Availability Zones with the name MSK-AZ-Aware. In this walkthrough, we use AWS Identity and Access Management (IAM) authentication for client access control to the MSK cluster. If you’re using a cluster in your account with a different name, replace the instances of MSK-AZ-Aware in the instructions.

We follow the same MSK cluster configuration mentioned in the Rack Awareness blog mentioned previously, with some modifications. (Ensure you’ve set replica.selector.class = org.apache.kafka.common.replica.RackAwareReplicaSelector for the reasons discussed there). In our configuration, we add one line: num.partitions = 6. Although not mandatory, this ensures that topics that are automatically created will have multiple partitions to support clearer demonstrations in subsequent sections.

Finally, we use the Amazon MSK Data Generator with the following configuration:

{
"name": "msk-data-generator",
    "config": {
    "connector.class": "com.amazonaws.mskdatagen.GeneratorSourceConnector",
    "genkp.MSK-AZ-Aware-Topic.with": "#{Internet.uuid}",
    "genv.MSK-AZ-Aware-Topic.product_id.with": "#{number.number_between '101','200'}",
    "genv.MSK-AZ-Aware-Topic.quantity.with": "#{number.number_between '1','5'}",
    "genv.MSK-AZ-Aware-Topic.customer_id.with": "#{number.number_between '1','5000'}"
    }
}

Running the MSK Data Generator with this configuration will automatically create a six-partition topic named MSK-AZ-Aware-Topic on our cluster for us, and it will push data to that topic. To follow along with the walkthrough, we recommend and assume that you deploy the MSK Data Generator to create the topic and populate it with simulated data.

Create the EKS cluster

The first step is to install an EKS cluster in the same Amazon VPC subnets as the MSK cluster. You can modify the name of the MSK cluster by changing that environment variable MSK_CLUSTER_NAME if your cluster is created with a different name than suggested. You can also change the Amazon EKS cluster name by changing EKS_CLUSTER_NAME.

The environment variables that we define here are used throughout the walkthrough.

The last step is to update the kubeconfig with an entry for the EKS cluster:

AWS_ACCOUNT=$(aws sts get-caller-identity --output text --query Account)
export AWS_ACCOUNT
export AWS_REGION=${AWS_DEFAULT_REGION}
export MSK_CLUSTER_NAME=MSK-AZ-Aware
export EKS_CLUSTER_NAME=EKS-AZ-Aware
export EKS_CLUSTER_SIZE=3
export K8S_VERSION=1.32
export POD_ID_VERSION=1.3.5
 
MSK_BROKER_SG=$(aws kafka list-clusters \
  --query  'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].BrokerNodeGroupInfo.SecurityGroups'  \
  --output text | xargs)
export MSK_BROKER_SG

MSK_BROKER_CLIENT_SUBNETS=$(aws kafka list-clusters \
  --query  'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].BrokerNodeGroupInfo.ClientSubnets'  \
  --output text | xargs)
export MSK_BROKER_CLIENT_SUBNETS
 
VPC_ID=$(aws ec2 describe-subnets \
  --subnet-ids "$(echo "${MSK_BROKER_CLIENT_SUBNETS}" | cut -d' ' -f1)" \
  --query 'Subnets[0].VpcId' \
  --output text)
export VPC_ID

EKS_SUBNETS=$(echo ${MSK_BROKER_CLIENT_SUBNETS} | sed 's/ \+/,/g')
export EKS_SUBNETS

# Create a minimal config file for encrypted node volumes
cat > eks-config.yaml << EOF
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: ${EKS_CLUSTER_NAME}
  region: ${AWS_REGION}
  version: "${K8S_VERSION}"
vpc:
  id: "${VPC_ID}"
  subnets:
    public:
$(for subnet in ${MSK_BROKER_CLIENT_SUBNETS}; do
  AZ=$(aws ec2 describe-subnets --subnet-ids "$subnet" --query 'Subnets[0].AvailabilityZone' --output text)
  echo "      $AZ: { id: $subnet }"
done)
nodeGroups:
  - name: ng1
    instanceType: m5.xlarge
    desiredCapacity: ${EKS_CLUSTER_SIZE}
    minSize: ${EKS_CLUSTER_SIZE}
    maxSize: ${EKS_CLUSTER_SIZE}
    securityGroups:
      attachIDs: ["${MSK_BROKER_SG}"]
    volumeSize: 100
    volumeType: gp3
    volumeEncrypted: true
EOF

eksctl create cluster -f eks-config.yaml

aws eks update-kubeconfig \
  --region "${AWS_REGION}" \
  --name ${EKS_CLUSTER_NAME}

Next, you need to create an IAM policy, MSK-AZ-Aware-Policy, to allow access from the Amazon EKS pods to the MSK cluster. Note here that we’re using MSK-AZ-Aware as the cluster name.

Create a file, msk-az-aware-policy.json, with the IAM policy template:

cat > msk-az-aware-policy.json << EOF
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:DescribeClusterDynamicConfiguration",
                "kafka-cluster:AlterClusterDynamicConfiguration"
            ],
            "Resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:cluster/${MSK_CLUSTER_NAME}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:topic/${MSK_CLUSTER_NAME}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT}:group/${MSK_CLUSTER_NAME}/*"
            ]
        }
    ]
}
EOF

To create the IAM policy, use the following command. It first replaces the placeholders in the policy file with values from relevant environment variables, and then creates the IAM policy:

envsubst < msk-az-aware-policy.json | \
xargs -0 -I {} aws iam create-policy \
            --policy-name MSK-AZ-Aware-Policy \
            --policy-document {}

Configure EKS Pod Identity

Amazon EKS Pod Identity offers a simplified experience for obtaining IAM permissions for pods on Amazon EKS. This requires installing an add-on Amazon EKS Pod Identity Agent to the EKS cluster:

eksctl create addon \
  --cluster ${EKS_CLUSTER_NAME} \
  --name eks-pod-identity-agent \
  --version ${POD_ID_VERSION}

Confirm that the add-on has been installed and its status is ACTIVE and that the status of all the pods associated with the add-on is Running.

eksctl get addon \
  --cluster ${EKS_CLUSTER_NAME} \
  --region "${AWS_REGION}" \
  --name eks-pod-identity-agent -o json

kubectl get pods \
  -n kube-system \
  -l app.kubernetes.io/instance=eks-pod-identity-agent

After you’ve installed the add-on, you need to create a pod identity association between a Kubernetes service account and the IAM policy created earlier:

eksctl create podidentityassociation \
  --namespace kafka-ns \
  --service-account-name kafka-sa \
  --role-name EKS-AZ-Aware-Role \
  --permission-policy-arns arn:aws:iam::"${AWS_ACCOUNT}":policy/MSK-AZ-Aware-Policy \
  --cluster ${EKS_CLUSTER_NAME} \
  --region "${AWS_REGION}"

Install Kyverno

Kyverno is an open source policy engine for Kubernetes that allows for validation, mutation, and generation of Kubernetes resources using policies written in YAML, thus simplifying the enforcement of security and compliance requirements. You need to install Kyverno to dynamically inject metadata into the Amazon EKS pods as they enter the binding state to inform them of Availability Zone ID.

In AWS CloudShell, create a file named kyverno-values.yaml. This file defines the Kubernetes RBAC permissions for Kyverno’s Admission Controller to read Amazon EKS node metadata because the default Kyverno (v. 1.13 onwards) settings don’t allow this:

cat > kyverno-values.yaml << EOF
admissionController:
  rbac:
    clusterRole:
      extraResources:
        - apiGroups:
            - ""
          resources:
            - "nodes"
          verbs:
            - get
            - list
            - watch
EOF

After this file is created, you can install Kyverno using helm and providing the values file created in the previous step:

helm repo add kyverno https://kyverno.github.io/kyverno/
helm repo update

helm install kyverno kyverno/kyverno \
  -n kyverno \
  --create-namespace \
  --version 3.3.7 \
  -f kyverno-values.yaml

Starting with Kyverno v 1.13, the Admission Controller is configured to ignore the AdmissionReview requests for pods in binding state. This needs to be changed by editing the Kyverno ConfigMap:

kubectl -n kyverno edit configmap kyverno

The kubectl edit command uses the default editor configured in your environment (in our case Linux VIM).

This will open the ConfigMap in a text editor.

As highlighted in the following screenshot, [Pod/binding,*,*] should be removed from the resourceFilters field for the Kyverno Admission Controller to process AdmissionReview requests for pods in binding state.

Kubernetes YAML configuration detailing Kyverno policy resource filters and cluster roles

If Linux VIM is your default editor, you can delete the entry using VIM command 18x, meaning delete (or cut) 18 characters from the current cursor position. Save the modified configuration using the VIM command :wq, meaning write (or save) the file and quit.

After deleting, the resourceFilters field should look similar to the following screenshot.

Kubernetes YAML configuration with ReplicaSet resource filter highlighted for Kyverno policy management

If you have a different editor configured in your environment, follow the appropriate steps to achieve a similar outcome.

Configure Kyverno policy

You need to configure the policy that will make the pods rack aware. This policy is adapted from the suggested approach in the Kyverno blog post, Assigning Node Metadata to Pods. Create a new file with the name kyverno-inject-node-az-id.yaml:

cat > kyverno-inject-node-az-id.yaml  << EOF
apiVersion: kyverno.io/v2beta1
kind: ClusterPolicy
metadata:
  name: inject-node-az-id
spec:
  background: false
  rules:
    - name: inject-node-az-id
      match:
        any:
        - resources:
            kinds:
            - Pod/binding
      context:
      - name: node
        variable:
          jmesPath: request.object.target.name
          default: ''
      - name: node_az_id
        apiCall:
          urlPath: "/api/v1/nodes/{{node}}"
          jmesPath: "metadata.labels.\"topology.k8s.aws/zone-id\" || 'empty'"
      mutate:
        patchStrategicMerge:
          metadata:
            annotations:
              node_az_id: "{{ node_az_id }}"
EOF

It instructs Kyverno to watch for pods in binding state. After Kyverno receives the AdmissionReview request for a pod, it sets the variable node to the name of the node to which the pod is being bound. It also sets another variable node_az_id to the Availability Zone ID by calling the Kubernetes API /api/v1/nodes/node to get the node metadata label topology.k8s.aws/zone-id. Finally, it defines a mutate rule to inject the obtained AZ ID into the pod’s metadata as an annotation node_az_id.
After you’ve created the file, apply the policy using the following command:

kubectl apply -f kyverno-inject-node-az-id.yaml

Deploy a pod without rack awareness

Now let’s visualize the problem statement. To do this, connect to one of the EKS pods and check how it interacts with the MSK cluster when you run a Kafka consumer from the pod.

First, get the bootstrap string of the MSK cluster. Look up the Amazon Resource Names (ARNs) of the MSK cluster:

MSK_CLUSTER_ARN=$(
    aws kafka list-clusters \
      --query 'ClusterInfoList[?ClusterName==`'${MSK_CLUSTER_NAME}'`].ClusterArn' \
      --output text)
export MSK_CLUSTER_ARN

Using the cluster ARN, you can get the bootstrap string with the following command:

BOOTSTRAP_SERVER_LIST=$(
    aws kafka get-bootstrap-brokers \
        --cluster-arn "${MSK_CLUSTER_ARN}" \
        --query 'BootstrapBrokerStringSaslIam' \
        --output text)
export BOOTSTRAP_SERVER_LIST

Create a new file named kafka-no-az.yaml:

cat > kafka-no-az.yaml << EOF
apiVersion: v1
kind: Namespace
metadata:
 name: kafka-ns
---
apiVersion: v1
kind: ServiceAccount
metadata:
 name: kafka-sa
 namespace: kafka-ns
automountServiceAccountToken: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-no-az
  namespace: kafka-ns
  labels:
    app: kafka-no-az
  annotations:
    node_az_id: ''
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka-no-az
  template:
    metadata:
      labels:
        app: kafka-no-az
    spec:
      serviceAccountName: kafka-sa
      containers:
      - image: bitnami/kafka:3.8.0
        name: kafka-no-az
        command: ["/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"]
        env:
        - name: BootstrapServerString
          value: ${BOOTSTRAP_SERVER_LIST}
        - name: MSK_TOPIC
          value: "MSK-AZ-Aware-Topic"
        - name: KAFKA_HOME
          value: /opt/bitnami/kafka
        - name: KAFKA_BIN
          value: /opt/bitnami/kafka/bin
        - name: KAFKA_CONFIG
          value: /opt/bitnami/kafka/config
        - name: KAFKA_LIBS
          value: /opt/bitnami/kafka/libs
        - name: KAFKA_LOG4J_OPTS
          value: "-Dlog4j.configuration=file:/opt/bitnami/kafka/config/log4j.properties"
        lifecycle:
          postStart:
            exec:
              command: 
              - "sh"
              - "-c"
              - |
                export KAFKA_HOME=/opt/bitnami/kafka
                export KAFKA_BIN=\${KAFKA_HOME}/bin
                export KAFKA_CONFIG=\${KAFKA_HOME}/config
                cat > \${KAFKA_CONFIG}/client.properties << EOF1
                security.protocol=SASL_SSL
                sasl.mechanism=AWS_MSK_IAM
                sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
                sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
                EOF1
                
                cat >> \${KAFKA_CONFIG}/log4j.properties << EOF2
                #
                # Enable logging of Kafka Client to stderr
                #
                log4j.rootLogger=WARN, stderr
                log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractFetch=DEBUG
                log4j.appender.stderr=org.apache.log4j.ConsoleAppender
                log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
                log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
                log4j.appender.stderr.Target=System.err
                EOF2
                cd \${KAFKA_HOME}/libs
                /usr/bin/curl -sS -L https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar --output \${KAFKA_LIBS}/aws-msk-iam-auth-2.2.0-all.jar
EOF

This pod manifest doesn’t make use of the Availability Zone ID injected into the metadata annotation and hence doesn’t add client.rack to the client.properties configuration.

Deploy the pods using the following command:

kubectl apply -f kafka-no-az.yaml

Run the following command to confirm that the pods have been deployed and are in the Running state:

kubectl -n kafka-ns get pods

Select a pod id from the output of the previous command, and connect to it using:

kubectl -n kafka-ns exec -it POD_ID -- sh

Run the Kafka consumer:

"${KAFKA_BIN}"/kafka-console-consumer.sh \
  --bootstrap-server  "${BootstrapServerString}" \
  --consumer.config  "${KAFKA_CONFIG}"/client.properties \
  --topic "${MSK_TOPIC}" \
  --from-beginning /tmp/non-rack-aware-consumer.log 2>&1 &

This command will dump all the resulting logs into the file, non-rack-aware-consumer.log. There’s a lot of information in those logs, and we encourage you to open them and take a deeper look. Next, examine the EKS pod in action. To do this, run the following command to tail the file to view fetch request results to the MSK cluster. You’ll notice a handful of meaningful logs to review as the consumer access various partitions of the Kafka topic:

grep -E "DEBUG.*Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-[0-9]+" /tmp/rack-aware-consumer.log | tail -5

Observe your log output, which should look similar to the following:

[2025-03-12 23:59:05,308] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-3 at position FetchPosition{offset=100, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,308] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-0 at position FetchPosition{offset=83, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,542] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-5 at position FetchPosition{offset=100, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,542] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-2 at position FetchPosition{offset=107, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,720] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-4 at position FetchPosition{offset=84, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,720] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-1 at position FetchPosition{offset=85, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,811] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-3 at position FetchPosition{offset=100, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-12 23:59:05,811] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-24102] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-0 at position FetchPosition{offset=83, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)

You’ve now connected to a specific pod in the EKS cluster and run a Kafka consumer to read from the MSK topic without rack awareness. Remember that this pod is running within a single Availability Zone.

Reviewing the log output, you find rack: values as use1-az2, use1-az4, and use1-az6 as the pod makes calls to different partitions of the topic. These rack values represent the Availability Zone IDs that our brokers are running within. This means that our EKS pod is creating networking connections to brokers across three different Availability Zones, which would be accruing networking charges in our account.

Also notice that you have no way to check which node, and therefore Availability Zone, this EKS pod is running in. You can observe in the logs that it’s calling to MSK brokers in different Availability Zones, but there is no way to know which broker is in the same Availability Zone as the EKS pod you’ve connected to. Delete the deployment when you’re done:

kubectl -n kafka-ns delete -f kafka-no-az.yaml

Deploy a pod with rack awareness

Now that you have experienced the consumer behavior without rack awareness, you need to inject the Availability Zone ID to make your pods rack-aware.

Create a new file named kafka-az-aware.yaml:

cat > kafka-az-aware.yaml << EOF
apiVersion: v1
kind: Namespace
metadata:
 name: kafka-ns
---
apiVersion: v1
kind: ServiceAccount
metadata:
 name: kafka-sa
 namespace: kafka-ns
automountServiceAccountToken: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-az-aware
  namespace: kafka-ns
  labels:
    app: kafka-az-aware
  annotations:
    node_az_id: ''
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka-az-aware
  template:
    metadata:
      labels:
        app: kafka-az-aware
    spec:
      serviceAccountName: kafka-sa
      containers:
      - image: bitnami/kafka:3.8.0
        name: kafka-az-aware
        command: ["/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"]
        env:
        - name: BootstrapServerString
          value: ${BOOTSTRAP_SERVER_LIST}
        - name: MSK_TOPIC
          value: "MSK-AZ-Aware-Topic"
        - name: KAFKA_HOME
          value: /opt/bitnami/kafka
        - name: KAFKA_BIN
          value: /opt/bitnami/kafka/bin
        - name: KAFKA_CONFIG
          value: /opt/bitnami/kafka/config
        - name: KAFKA_LIBS
          value: /opt/bitnami/kafka/libs
        - name: KAFKA_LOG4J_OPTS
          value: "-Dlog4j.configuration=file:/opt/bitnami/kafka/config/log4j.properties"
        - name: NODE_AZ_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.annotations['node_az_id']
        lifecycle:
          postStart:
            exec:
              command: 
              - "sh"
              - "-c"
              - |
                export KAFKA_HOME=/opt/bitnami/kafka
                export KAFKA_BIN=\${KAFKA_HOME}/bin
                export KAFKA_CONFIG=\${KAFKA_HOME}/config
                cat > \${KAFKA_CONFIG}/client.properties << EOF1
                security.protocol=SASL_SSL
                sasl.mechanism=AWS_MSK_IAM
                sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
                sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
                EOF1
                if [ \$NODE_AZ_ID ]
                then
                  echo "client.rack=\$NODE_AZ_ID" >> \${KAFKA_CONFIG}/client.properties
                fi
                
                cat >> \${KAFKA_CONFIG}/log4j.properties << EOF2
                #
                # Enable logging of Kafka Client to stderr
                #
                log4j.rootLogger=WARN, stderr
                log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractFetch=DEBUG
                log4j.appender.stderr=org.apache.log4j.ConsoleAppender
                log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
                log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
                log4j.appender.stderr.Target=System.err
                EOF2
                
                /usr/bin/curl -sS -L https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar --output \${KAFKA_LIBS}/aws-msk-iam-auth-2.2.0-all.jar
EOF

As you can observe, the pod manifest defines an environment variable NODE_AZ_ID, assigning it the value from the pod’s own metadata annotation node_az_id that was injected by Kyverno. The manifest then uses the pod’s postStart lifecycle script to add client.rack into the client.properties configuration, setting it equal to the value in the environment variable NODE_AZ_ID.

Deploy the pods using the following command:

kubectl apply -f kafka-az-aware.yaml

Run the following command to confirm that the pods have been deployed and are in the Running state:

kubectl -n kafka-ns get pods

Verify that Availability Zone Ids have been injected into the pods

for pod in $(kubectl -n kafka-ns get pods --field-selector=status.phase==Running -o=name | grep "pod/kafka-az-aware-" | xargs)
do
  kubectl -n kafka-ns get "$pod" -o yaml | grep "node_az_id:"
done

Your output should look similar to:

node_az_id: use1-az2
node_az_id: use1-az4
node_az_id: use1-az6

Or:

AWS CloudShell showing Kafka namespace pods and node assignments in Kubernetes cluster

Select a pod id from the output of the get pods command and shell-in to it.

kubectl -n kafka-ns exec -it POD_ID -- sh

The output of the get $pod command matches the order of results from the get pods command. This matching will help you understand what Availability Zone your pod is running in so you can compare it to log outputs later.

After you’ve connected to your pod, run the Kafka consumer:

"${KAFKA_BIN}"/kafka-console-consumer.sh \
  --bootstrap-server  "${BootstrapServerString}" \
  --consumer.config  "${KAFKA_CONFIG}"/client.properties \
  --topic "${MSK_TOPIC}" \
  --from-beginning /tmp/non-rack-aware-consumer.log 2>&1 &

Similar to before, this command will dump all the resulting logs into the file, rack-aware-consumer.log. You create a new file so there’s no overlap between the Kafka consumers you’ve run. There’s a lot of information in those logs, and we encourage you to open them and take a deeper look. If you want to see the rack awareness of your EKS pod in action, run the following command to tail the file to view fetch request results to the MSK cluster. You can observe a handful of meaningful logs to review here as the consumer access various partitions of the Kafka topic:

grep -E "DEBUG.*Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-[0-9]+" /tmp/rack-aware-consumer.log | tail -5

Observe your log output, which should look similar to the following:

[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-5 at position FetchPosition{offset=527, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-4 at position FetchPosition{offset=509, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-3 at position FetchPosition{offset=527, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-2 at position FetchPosition{offset=522, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-1.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 1 rack: use1-az4)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-1 at position FetchPosition{offset=533, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-3.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 3 rack: use1-az2)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-03-13 00:47:51,695] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-86303] Added read_uncommitted fetch request for partition MSK-AZ-Aware-Topic-0 at position FetchPosition{offset=520, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6)], epoch=0}} to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)

For each log line, you can now observe two rack: values. The first rack: value shows the current leader, the second rack: shows the rack that is being used to fetch messages.

For example, look at MSK-AZ-Aware-Topic-5. The leader is identified as rack: use1-az4, but the fetch request is sent to use1-az6 as indicated by to node b-2.mskazaware.hxrzlh.c6.kafka.us-east-1.amazonaws.com:9098 (id: 2 rack: use1-az6) (org.apache.kafka.clients.consumer.internals.AbstractFetch)

You’ll notice something similar in all other log lines. The fetch is always to the broker in use1-az6, which maps to our expectation, given the pod we connected to was in this Availability Zone.

Congratulations! You’re consuming from the closest replica on Amazon EKS.

Clean Up

Delete the deployment when finished:

kubectl -n kafka-ns delete -f kafka-az-aware.yaml

To delete the EKS Pod Identity association:

eksctl delete podidentityassociation \
--cluster ${EKS_CLUSTER_NAME} \
--namespace kafka-ns \
--service-account-name kafka-sa

To delete the IAM policy:

aws iam delete-policy \
  --policy-arn arn:aws:iam::"${AWS_ACCOUNT}":policy/MSK-AZ-Aware-Policy

To delete the EKS cluster:

eksctl delete cluster -n ${EKS_CLUSTER_NAME} --disable-nodegroup-eviction

If you followed along with this post using the Amazon MSK Data Generator, be sure to delete your deployment so it’s no longer attempting to generate and send data after you delete the rest of your resources.

Clean up will depend on which deployment option you used. To read more about the deployment options and the resources created for the Amazon MSK Data Generator, refer to Getting Started in the GitHub repository.

Creating an MSK cluster was a prerequisite of this post, and if you’d like to clean up the MSK cluster as well, you can use the following command:

aws kafka delete-cluster --cluster-arn "${MSK_CLUSTER_ARN}"

There is no additional cost to using AWS CloudShell, but if you’d like to delete your shell, refer to the Delete a shell session home directory in the AWS CloudShell User Guide.

Conclusion

Apache Kafka nearest replica fetching, or rack awareness, is a strategic cost-optimization technique. By implementing it for Amazon MSK consumers on Amazon EKS, you can significantly reduce cross-zone traffic costs while maintaining robust, distributed streaming architectures. Open source tools such as Kyverno can simplify complex configuration challenges and drive meaningful savings.The solution we’ve demonstrated provides a powerful, repeatable approach to dynamically injecting Availability Zone information into Kubernetes pods, optimize Kafka consumer routing, and minimize reduce transfer costs.

Additional resources

To learn more about rack awareness with Amazon MSK, refer to Reduce network traffic costs of your Amazon MSK consumers with rack awareness.


About the authors

Austin Groeneveld is a Streaming Specialist Solutions Architect at Amazon Web Services (AWS), based in the San Francisco Bay Area. In this role, Austin is passionate about helping customers accelerate insights from their data using the AWS platform. He is particularly fascinated by the growing role that data streaming plays in driving innovation in the data analytics space. Outside of his work at AWS, Austin enjoys watching and playing soccer, traveling, and spending quality time with his family.

Farooq Ashraf is a Senior Solutions Architect at AWS, specializing in SaaS, Generative AI, and MLOps. He is passionate about blending multi-tenant SaaS concepts with Cloud services to innovate scalable solutions for the digital enterprise, and has several blog posts, and workshops to his credit.

Near real-time baggage operational insights for airlines using Amazon Kinesis Data Streams

Post Syndicated from Subhash Sharma original https://aws.amazon.com/blogs/big-data/near-real-time-baggage-operational-insights-for-airlines-using-amazon-kinesis-data-streams/

To provide a seamless travel experience, aviation enterprises must streamline baggage handling to be as efficient as possible. Traditional baggage analytics systems often struggle with adaptability, real-time insights, data integrity, operational costs, and security, limiting their effectiveness in dynamic environments. Real-time analytics can help in several aspects, such as improving staffing decisions, baggage rerouting, payload planning, and predictive maintenance of Internet of Things (IoT) sensors and belt loaders.

In this post, we explore a framework developed by IBM to modernize baggage analytics using Amazon Web Services (AWS) managed services such as Amazon Kinesis Data Streams, Amazon DynamoDB Streams, Amazon Managed Service for Apache Flink, Amazon QuickSight, Amazon Q in QuickSight, AWS Glue, Amazon SageMaker, and Amazon Aurora within a serverless architecture. This approach delivers significant cost savings, enhanced scalability, and improved performance while providing better security and operational efficiency to meet the evolving needs of airlines. Before diving into the solution’s architecture, we first examine the traditional baggage analytics process and the need for modernization.

Importance of baggage analytics

Baggage management is a process that starts at baggage check-in and ends with the passenger claiming their baggage in a happy path scenario. The following figure explains the high-level baggage management process and respective key performance indicators (KPI). The illustration highlights the critical role of payload planning (part 1), baggage loading (part 2), and below wing payload closeout (part 3) in the flight departure process, all of which directly impact the flight on-time departure metric (part 4). Enhancing the KPIs associated with these essential steps is vital for airlines to optimize operations.

Baggage analytics KPIs

Figure 1: Baggage analytics KPIs

Common KPIs for baggage loading include baggage handling time, turnaround time impact, mishandled baggage rate, baggage accuracy rate, and baggage loading error rate. Similarly, the baggage check-in process plays a crucial role in enhancing the passenger experience. Analyzing variations in this metric across different stations and time periods provides valuable insights for identifying potential bottlenecks and improving efficiency.Airlines can measure performance KPIs using the following business process metrics:

  • Wait times – Wait times are the duration that a process step is waiting on an upstream dependency and are an important factor affecting the overall wait time. Analytics can help identify the potential areas (for example, stations, bag rooms, pier locations, belt loaders, or baggage types) where the processes and system can be fine-tuned to improve the overall wait time.
  • Error rate – Error rate is the time spent on correcting errors or defects. Within these processes, error rate is usually a result of data inconsistencies across multiple systems, manual data entries because of system unavailability or limited aircraft turn-around time, and inconsistencies between payload planning rules and loading procedures. Analytics can help classify these errors among system availability issues, outdated rules, inconsistent data between systems, and other factors. The classification can help prioritize fine-tuning and removing redundancies across systems, rules, and data.
  • Rework time – Rework time is time spent on correcting errors or defects. It can be improved but can’t be avoided, considering last-minute baggage, wheelchairs, ski equipment, and ship or aircraft changes that result in a new payload plan. Analytics can help classify the type, time, and frequency of rework activities across stations, staff members, baggage types, and scenarios related to flight delays and ship changes.
  • Cycle time – Cycle time is the time it takes to complete the process. You can improve the payload planning process cycle time by automating the payload distribution process. To do so, you need to identify and improve the time taken by the payload planning, loading, and closeout processes to reduce the complete departure process cycle time. In many cases, you can improve cycle time by adjusting the processes and adding extra resources, such as workforce, or in other cases by introducing automation. Analytics can identify these time-consuming steps and can be extended to use predictive models to apply mitigation strategies.

Traditional baggage analytics

As explained in the following figure, the traditional baggage handling solution uses monolithic databases with several upstream and downstream dependencies. Upstream dependencies include bags, flight and passenger event feeds to subscribe to the real-time changes in flight, checked bags, and passenger itinerary changes. Downstream dependencies include staffing and customer notifications. The core application interfaces include belt loaders, IoT devices, kiosks, handheld scanners, and web applications for monitoring and reporting. The airline typically stores the reports in the operational database referred to in the diagram as baggage handling (relational database), retaining historical data spanning multiple years, and makes them available to all personnel on the airline’s network. The traditional approach to baggage analytics entails nightly processing of data batches into an enterprise data warehouse (EDW) to generate performance metrics related to airlines’ baggage handling processes.

Traditional baggage analytics

Figure 2: Traditional baggage analytics

Need for modernization

Modernizing baggage analytics is crucial for airlines to achieve growth and enhance operational efficiency. Key factors influencing the modernization are as follows:

  • Inefficiencies in near real-time decision-making – Current systems can’t process and analyze data in real time, leading to delayed responses to operational issues. Integration and data silos hinder insights, preventing proactive decision-making on baggage handling, routing, and anomaly detection.
  • Limitations of traditional ETL solutions – Legacy extract, transform, and load (ETL) processes are batch-driven, slow, and resource-intensive, making them unsuitable for dynamic airline operations. High maintenance costs and frequent failures reduce system reliability and availability.
  • Challenges in proactive anomaly detection and resolution during irregular operations – Airlines struggle to anticipate baggage issues during irregular operations, such as flight delays and weather disruptions. Without predictive analytics, preemptive actions remain a challenge in optimizing staffing, reducing mishandled baggage, and enhancing operational efficiency.

Solution

The modernization of baggage operations must include breaking down the monolithic database into distinct databases based on business capabilities to address performance bottlenecks. Business capabilities can be described as fundamental abilities or competencies that a business possesses and that enable it to achieve its objectives and deliver value to its customers.

As explained in the following figure, the business capabilities for baggage management can be defined as baggage acceptance (check-in), baggage loading, baggage offloading, baggage tracking, baggage mishandling and claims, baggage rerouting, and more. [part 1]. The solution proposes Amazon DynamoDB for an operational database across all baggage management capabilities. DynamoDB global tables provide 99.999% availability with near-zero Recovery Time Objective (RTO) and Recovery Point Objective (RPO), which is crucial for mission-critical baggage handling systems. More details related to baggage operational database modernization can be found at Enhance the reliability of airlines’ mission-critical baggage handling using Amazon DynamoDB in the AWS Database Blog.

The proposed logical solution for baggage operational analytics suggests segregating operational data from historical data, referred to in the diagram as baggage analytics and historical reporting database, to enhance efficiency and alleviate the burden on the operational database [part 3].

Modern baggage analytics

Figure 3: Modern baggage analytics

The solution further uses streaming architecture for the ongoing transfer of data from the operational database to the baggage analytics and historical reporting database [part 2]. This approach aims to facilitate near real-time analytics.The key features for a robust streaming architecture include:

  • Low-latency processing to enable near real-time updates
  • Scalability and elasticity to handle dynamic workloads efficiently
  • Fault tolerance and durability to promote data reliability with replication
  • The ability for multiple consumers to process the same data in parallel at full speed without bottlenecks or interference
  • Exactly one-time processing to avoid duplication and maintain data integrity
  • Ability to replay messages

Real-time streaming on AWS Cloud

The solution uses either Kinesis Data Streams or DynamoDB Streams as a viable streaming solution for processing for change data capture (CDC) within milliseconds. For further information, refer to Streaming options for change data capture and Choose the right change data capture strategy for your Amazon DynamoDB applications.

In this architecture, Kinesis Data Streams is selected to enable fan-out to multiple downstream consumers, extended data retention, and integration with Amazon Managed Service for Apache Flink. Amazon Managed Service for Apache Flink performs stateful stream processing—such as windowed aggregation, filtering, and anomaly detection—before passing data to DynamoDB or Aurora for further analytical aggregation and reporting. Although DynamoDB Streams could also have been used, Kinesis Data Streams provides greater flexibility and throughput for the scale of event processing required here. Additionally, Kinesis Data Streams data retention allows message replays for improved reliability and analysis.

Baggage analytics on AWS Cloud

The solution will use Amazon Simple Storage Service (Amazon S3) for structured and unstructured data storage and Amazon Aurora PostgreSQL-Compatible Edition for relational aggregations. Aurora is well-suited for handling complex aggregations across multiple dimensions (such as month, year, station, and shift) with efficient indexing and SQL functions optimized for reporting. Its relational capabilities support analytical queries needed for performance metrics while providing scalability and efficiency

The following figure explains the high-level cloud architecture for baggage analytics using AWS services.

Baggage real-time analytic architecture on AWS

Figure 4: Near real-time baggage analytics architecture on AWS

The solution can support the following analytics:

  • Interactive and investigative analytics which can produce charts and graphs and discover patterns and anomalies in the baggage data used by product owners. The solution proposes using Amazon QuickSight, which is an interactive tool. Additionally, the solution proposes Amazon Q in QuickSight for natural language queries using a chat-based interface. Amazon QuickSight can be configured using an AWS Glue crawler to automatically discover and extract metadata from various data stores such as Amazon S3 and Amazon Aurora and catalog it in a centralized repository. Amazon QuickSight can be configured to use Amazon Athena to read the data catalog.
  • Predictive analytics used by data scientists involves analyzing historical data to predict future events or behaviors. It uses statistical algorithms and machine learning (ML) techniques to forecast outcomes. The proposed solution is to use a SageMaker notebook to perform predictive analytics on baggage data.

Conclusion

Cloud-based solutions such as Kinesis Data Streams, Athena, and QuickSight revolutionize baggage analytics with scalable, cost-effective infrastructure. By integrating real-time data streaming, analysis, and visualization, they eliminate data silos and enable data-driven decision-making.This modernization optimizes processes, proactively resolving issues to minimize passenger disruptions. Embracing cloud-powered analytics isn’t just a necessity but a strategic step toward greater efficiency, resilience, and customer satisfaction.With this solution, airlines can enhance preemptive issue resolution in baggage operations. Real-time analytics enables better workforce planning, allowing airlines to predict staffing needs at departure and arrival stations, reducing labor costs while ensuring smooth operations. Additionally, data-driven insights help identify inefficiencies during irregular operations, enabling informed decisions for traffic diversion and process optimization.

Check out more AWS Partners or contact an AWS Representative to know how we can help accelerate your business.

Further reading

IBM Consulting is an AWS Premier Tier Services Partner that helps customers who use AWS to harness the power of innovation and drive their business transformation. They are recognized as a Global Systems Integrator (GSI) for over 22 competencies, including travel and hospitality consulting. For more information, please contact an IBM Representative.


About the authors

Neeraj Kaushik is an Open Group Certified Distinguish Architect at IBM with two decades of experience in client-facing delivery roles. His experience spans several industries, including travel and transportation, banking, retail, education, healthcare, and anti-human trafficking. As a trusted advisor, he works directly with the client executive and architects on business strategy to define a technology roadmap. As a hands-on Chief Architect AWS Professional Certified Solution Architect, AWS Certified Machine Learning Specialist and Natural Language Processing Expert, he has led multiple complex cloud modernization programs and AI initiatives.

Jay Pandya is a Senior Partner Solutions Architect in the Global Systems Integrator (GSI) team at Amazon Web Services (AWS). He has over 30 years of IT experience and is helping and providing guidance to AWS GSI partners to build, design, and architect agile, scalable, highly available, and secure solutions on AWS. Outside of the office, Jay enjoys spending time with his family and traveling, and he is an aviation enthusiast and avid sports and Formula 1 fan.

Vijay Gokarn is a Senior Solution Architect at IBM with extensive experience across industries including financial services, healthcare, industrial, retail, and travel and hospitality. He leads complex AWS transformation initiatives, drawing on his hands-on expertise as an AWS Certified Solutions Architect Associate. Vijay specializes in serverless architectures, event-driven systems, and enterprise modernization. As a skilled architect and team leader, he has delivered impactful solutions in cloud modernization, digital banking, and intelligent automation. His passion lies in bridging business strategy with technical execution to drive scalable digital transformation.

Subhash Sharma is Sr. Partner Solutions Architect at AWS. He has more than 25 years of experience in delivering distributed, scalable, highly available, and secured software products using Microservices, AI/ML, the Internet of Things (IoT), and Blockchain using a DevSecOps approach. In his spare time, Subhash likes to spend time with family and friends, hike, walk on beach, and watch TV.

Overcome your Kafka Connect challenges with Amazon Data Firehose

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/overcome-your-kafka-connect-challenges-with-amazon-data-firehose/

Apache Kafka is a popular open source distributed streaming platform that is widely used in the AWS ecosystem. It’s designed to handle real-time, high-throughput data streams, making it well-suited for building real-time data pipelines to meet the streaming needs of modern cloud-based applications.

For AWS customers looking to run Apache Kafka, but don’t want to worry about the undifferentiated heavy lifting involved with self-managing their Kafka clusters, Amazon Managed Streaming for Apache Kafka (Amazon MSK) offers fully managed Apache Kafka. This means Amazon MSK provisions your servers, configures your Kafka clusters, replaces servers when they fail, orchestrates server patches and upgrades, makes sure clusters are architected for high availability, makes sure data is durably stored and secured, sets up monitoring and alarms, and runs scaling to support load changes. With a managed service, you can spend your time developing and running streaming event applications.

For applications to use data sent to Kafka, you need to write, deploy, and manage application code that consumes data from Kafka.

Kafka Connect is an open-source component of the Kafka project that provides a framework for connecting with external systems such as databases, key-value stores, search indexes, and file systems from your Kafka clusters. On AWS, our customers commonly write and manage connectors using the Kafka Connect framework to move data out of their Kafka clusters into persistent storage, like Amazon Simple Storage Service (Amazon S3), for long-term storage and historical analysis.

At scale, customers need to programmatically manage their Kafka Connect infrastructure for consistent deployments when updates are required, as well as the code for error handling, retries, compression, or data transformation as it is delivered from your Kafka cluster. However, this introduces a need for investment into the software development lifecycle (SDLC) of this management software. Although the SDLC is a cost-effective and time-efficient process to help development teams build high-quality software, for many customers, this process is not desirable for their data delivery use case, particularly when they could dedicate more resources towards innovating for other key business differentiators. Beyond SDLC challenges, many customers face fluctuating data streaming throughput. For instance:

  • Online gaming businesses experience throughput variations based on game usage
  • Video streaming applications see changes in throughput depending on viewership
  • Traditional businesses have throughput fluctuations tied to consumer activity

Striking the right balance between resources and workload can be challenging. Under-provisioning can lead to consumer lag, processing delays, and potential data loss during peak loads, hampering real-time data flows and business operations. On the other hand, over-provisioning results in underutilized resources and unnecessary high costs, making the setup economically inefficient for customers. Even the action of scaling up your infrastructure introduces additional delays because resources need to be provisioned and acquired for your Kafka Connect cluster.

Even when you can estimate aggregated throughput, predicting throughput per individual stream remains difficult. As a result, to achieve smooth operations, you might resort to over-provisioning your Kafka Connect resources (CPU) for your streams. This approach, though functional, might not be the most efficient or cost-effective solution.

Customers have been asking for a fully serverless solution that will not only handle managing resource allocation, but transition the cost model to only pay for the data they are delivering from the Kafka topic, instead of underlying resources that require constant monitoring and management.

In September 2023, we announced a new integration between Amazon and Amazon Data Firehose, allowing builders to deliver data from their MSK topics to their destination sinks with a fully managed, serverless solution. With this new integration, you no longer needed to develop and manage your own code to read, transform, and write your data to your sink using Kafka Connect. Data Firehose abstracts away the retry logic required when reading data from your MSK cluster and delivering it to the desired sink, as well as infrastructure provisioning, because it can scale out and scale in automatically to adjust to the volume of data to transfer. There are no provisioning or maintenance operations required on your side.

At release, the checkpoint time to start consuming data from the MSK topic was the creation time of the Firehose stream. Data Firehose couldn’t start reading from other points on the data stream. This caused challenges for several different use cases.

For customers that are setting up a mechanism to sink data from their cluster for the first time, all data in the topic older than the timestamp of Firehose stream creation would need another way to be persisted. For example, customers using Kafka Connect connectors, like These users were limited in using Data Firehose because they wanted to sink all the data from the topic to their sink, but Data Firehose couldn’t read data from earlier than the timestamp of Firehose stream creation.

For other customers that were running Kafka Connect and needed to migrate from their Kafka Connect infrastructure to Data Firehose, this required some extra coordination. The release functionality of Data Firehose means you can’t point your Firehose stream to a specific point on the source topic, so a migration requires stopping data ingest to the source MSK topic and waiting for Kafka Connect to sink all the data to the destination. Then you can create the Firehose stream and restart the producers such that the Firehose stream can then consume new messages from the topic. This adds additional, and non-trivial, overhead to the migration effort when attempting to cut over from an existing Kafka Connect infrastructure to a new Firehose stream.

To address these challenges, we’re happy to announce a new feature in the Data Firehose integration with Amazon MSK. You can now specify the Firehose stream to either read from the earliest position on the Kafka topic or from a custom timestamp to begin reading from your MSK topic.

In the first post of this series, we focused on managed data delivery from Kafka to your data lake. In this post, we extend the solution to choose a custom timestamp for your MSK topic to be synced to Amazon S3.

Overview of Data Firehose integration with Amazon MSK

Data Firehose integrates with Amazon MSK to offer a fully managed solution that simplifies the processing and delivery of streaming data from Kafka clusters into data lakes stored on Amazon S3. With just a few clicks, you can continuously load data from your desired Kafka clusters to an S3 bucket in the same account, eliminating the need to develop or run your own connector applications. The following are some of the key benefits to this approach:

  • Fully managed service – Data Firehose is a fully managed service that handles the provisioning, scaling, and operational tasks, allowing you to focus on configuring the data delivery pipeline.
  • Simplified configuration – With Data Firehose, you can set up the data delivery pipeline from Amazon MSK to your sink with just a few clicks on the AWS Management Console.
  • Automatic scaling – Data Firehose automatically scales to match the throughput of your Amazon MSK data, without the need for ongoing administration.
  • Data transformation and optimization – Data Firehose offers features like JSON to Parquet/ORC conversion and batch aggregation to optimize the delivered file size, simplifying data analytical processing workflows.
  • Error handling and retries – Data Firehose automatically retries data delivery in case of failures, with configurable retry durations and backup options.
  • Offset select option – With Data Firehose, you can select the starting position for the MSK delivery stream to be delivered within a topic from three options:
    • Firehose stream creation time – This allows you to deliver data starting from Firehose stream creation time. When migrating from to Data Firehose, if you have an option to pause the producer, you can consider this option.
    • Earliest – This allows you to deliver data starting from MSK topic creation time. You can choose this option if you’re setting a new delivery pipeline with Data Firehose from Amazon MSK to Amazon S3.
    • At timestamp – This option allows you to provide a specific start date and time in the topic from where you want the Firehose stream to read data. The time is in your local time zone. You can choose this option if you prefer not to stop your producer applications while migrating from Kafka Connect to Data Firehose. You can refer to the Python script and steps provided later in this post to derive the timestamp for the latest events in your topic that were consumed by Kafka Connect.

The following are benefits of the new timestamp selection feature with Data Firehose:

  • You can select the starting position of the MSK topic, not just from the point that the Firehose stream is created, but from any point from the earliest timestamp of the topic.
  • You can replay the MSK stream delivery if required, for example in the case of testing scenarios to select from different timestamps with the option to select from a specific timestamp.
  • When migrating from Kafka Connect to Data Firehose, gaps or duplicates can be managed by selecting the starting timestamp for Data Firehose delivery from the point where Kafka Connect delivery ended. Because the new custom timestamp feature isn’t monitoring Kafka consumer offsets per partition, the timestamp you select for your Kafka topic should be a few minutes before the timestamp at which you stopped Kafka Connect. The earlier the timestamp you select, the more duplicate records you will have downstream. The closer the timestamp to the time of Kafka Connect stopping, the higher the likelihood of data loss if certain partitions have fallen behind. Be sure to select a timestamp appropriate to your requirements.

Overview of solution

We discuss two scenarios to stream data.

In Scenario 1, we migrate to Data Firehose from Kafka Connect with the following steps:

  1. Derive the latest timestamp from MSK events that Kafka Connect delivered to Amazon S3.
  2. Create a Firehose delivery stream with Amazon MSK as the source and Amazon S3 as the destination with the topic starting position as Earliest.
  3. Query Amazon S3 to validate the data loaded.

In Scenario 2, we create a new data pipeline from Amazon MSK to Amazon S3 with Data Firehose:

  1. Create a Firehose delivery stream with Amazon MSK as the source and Amazon S3 as the destination with the topic starting position as At timestamp.
  2. Query Amazon S3 to validate the data loaded.

The solution architecture is depicted in the following diagram.

Prerequisites

You should have the following prerequisites:

  • An AWS account and access to the following AWS services:
  • An MSK provisioned or MSK serverless cluster with topics created and data streaming to it. The sample topic used in this is order.
  • An EC2 instance configured to use as a Kafka admin client. Refer to Create an IAM role for instructions to create the client machine and IAM role that you will need to run commands against your MSK cluster.
  • An S3 bucket for delivering data from Amazon MSK using Data Firehose.
  • Kafka Connect to deliver data from Amazon MSK to Amazon S3 if you want to migrate from Kafka Connect (Scenario 1).

Migrate to Data Firehose from Kafka Connect

To reduce duplicates and minimize data loss, you need to configure your custom timestamp for Data Firehose to read events as close to the timestamp of the oldest committed offset that Kafka Connect reported. You can follow the steps in this section to visualize how the timestamps of each committed offset will vary by partition across the topic you want to read from. This is for demonstration purposes and doesn’t scale as a solution for workloads with a large number of partitions.

Sample data was generated for demonstration purposes by following the instructions referenced in the following GitHub repo. We set up a sample producer application that generates clickstream events to simulate users browsing and performing actions on an imaginary ecommerce website.

To derive the latest timestamp from MSK events that Kafka Connect delivered to Amazon S3, complete the following steps:

  1. From your Kafka client, query Amazon MSK to retrieve the Kafka Connect consumer group ID:
    ./kafka-consumer-groups.sh --bootstrap-server $bs --list --command-config client.properties

  2. Stop Kafka Connect.
  3. Query Amazon MSK for the latest offset and associated timestamp for the consumer group belonging to Kafka Connect.

You can use the get_latest_offsets.py Python script from the following GitHub repo as a reference to get the timestamp associated with the latest offsets for your Kafka Connect consumer group. To enable authentication and authorization for a non-Java client with an IAM authenticated MSK cluster, refer to the following GitHub repo for instructions on installing the aws-msk-iam-sasl-signer-python package for your client.

python3 get_latest_offsets.py --broker-list $bs --topic-name “order” --consumer-group-id “connect-msk-serverless-connector-090224” --aws-region “eu-west-1”

Note the earliest timestamp across all the partitions.

Create a data pipeline from Amazon MSK to Amazon S3 with Data Firehose

The steps in this section are applicable to both scenarios. Complete the following steps to create your data pipeline:

  1. On the Data Firehose console, choose Firehose streams in the navigation pane.
  2. Choose Create Firehose stream.
  3. For Source, choose Amazon MSK.
  4. For Destination, choose Amazon S3.
  5. For Source settings, browse to the MSK cluster and enter the topic name you created as part of the prerequisites.
  6. Configure the Firehose stream starting position based on your scenario:
    1. For Scenario 1, set Topic starting position as At Timestamp and enter the timestamp you noted in the previous section.
    2. For Scenario 2, set Topic starting position as Earliest.
  7. For Firehose stream name, leave the default generated name or enter a name of your preference.
  8. For Destination settings, browse to the S3 bucket created as part of the prerequisites to stream data.

Within this S3 bucket, by default, a folder structure with YYYY/MM/dd/HH will be automatically created. Data will be delivered to subfolders pertaining to the HH subfolder according to the Data Firehose to Amazon S3 ingestion timestamp.

  1. Under Advanced settings, you can choose to create the default IAM role for all the permissions that Data Firehose needs or choose existing an IAM role that has the policies that Data Firehose needs.
  2. Choose Create Firehose stream.

On the Amazon S3 console, you can verify the data streamed to the S3 folder according to your chosen offset settings.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you’re not planning to use them further.

Conclusion

Data Firehose provides a straightforward way to deliver data from Amazon MSK to Amazon S3, enabling you to save costs and reduce latency to seconds. To try Data Firehose with Amazon S3, refer to the Delivery to Amazon S3 using Amazon Data Firehose lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Austin Groeneveld is a Streaming Specialist Solutions Architect at Amazon Web Services (AWS), based in the San Francisco Bay Area. In this role, Austin is passionate about helping customers accelerate insights from their data using the AWS platform. He is particularly fascinated by the growing role that data streaming plays in driving innovation in the data analytics space. Outside of his work at AWS, Austin enjoys watching and playing soccer, traveling, and spending quality time with his family.

Building serverless event streaming applications with Amazon MSK and AWS Lambda

Post Syndicated from Tarun Rai Madan original https://aws.amazon.com/blogs/big-data/building-serverless-event-streaming-applications-with-amazon-msk-and-aws-lambda/

As organizations build modern applications with event-driven architectures (EDA), they often seek solutions that minimize infrastructure management overhead while maximizing developer productivity. Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS Lambda together provide a serverless, scalable, and cost-efficient platform for real-time event-driven processing.

In this post, we describe how you can simplify your event-driven application architecture using AWS Lambda with Amazon MSK. We demonstrate how to configure Lambda as a consumer for Kafka topics, including a cross-account setup and how to optimize price and performance for these applications.

Why use Lambda with Amazon MSK?

Customers building event-driven applications have several key priorities when it comes to their architecture choices. They typically seek to reduce their operational overhead by using Amazon Web Services (AWS) to handle the complex, underlying infrastructure components so their teams can focus on core business logic. Additionally, developers prefer a streamlined experience that minimizes the need for repetitive boilerplate code, enabling them to be more productive and focus on creating value. Furthermore, these customers want to achieve both scalability and cost-effectiveness without the burden of managing compute infrastructure directly. Lambda integration with Amazon MSK effectively addresses these requirements, delivering a comprehensive solution that combines the benefits of serverless computing with managed Kafka services. For example, an ecommerce company can use Amazon MSK to collect real-time clickstream data from its website and process those events using AWS Lambda. With this integration, they can trigger Lambda functions to update recommendation models, send personalized offers, or analyze user behavior instantly—without provisioning or managing servers. The key benefits of using Lambda with Amazon MSK include:

  1. Simplicity through native integration – AWS Lambda offers native integration with Amazon MSK through a connector resource called event source mapping. You can use this integration to directly associate a Kafka topic—whether it’s on Amazon MSK or a self-managed Kafka cluster—as an event source for a Lambda function without writing custom consumer logic. With just a few configuration steps, event source mapping handles partition assignment, offset tracking, and parallelized batch processing under the hood. It uses the Kafka consumer group protocol to distribute topic partitions across multiple concurrent Lambda invocations, supports batch windowing, and enables at-least-once delivery semantics. Moreover, it automatically commits offsets upon successful function execution while handling retries and dead-letter queue (DLQ) routing for failed records, significantly reducing the operational overhead traditionally associated with Kafka consumers.
  2. Auto scaling and throughput controls – When using AWS Lambda with Amazon MSK through event source mapping, Lambda automatically scales by assigning a dedicated event poller per Kafka partition, enabling parallel, partition-based processing. This allows the system to elastically handle varying traffic without manual intervention. For advanced control, provisioned concurrency pre-initializes Lambda execution environments, eliminating cold starts and delivering consistent low-latency performance. Additionally, with provisioned event source mapping, you can configure the minimum and maximum number of Kafka pollers, providing precise control over throughput and concurrency. This is ideal for applications with unpredictable traffic patterns or strict latency requirements.
  3. Cost-effectiveness – AWS Lambda uses a pay-per-use model in which you only pay for compute time and number of invocations. When integrated with Amazon MSK, there are no charges for idle time, making it ideal for bursty or low-frequency Kafka workloads. You can further optimize costs by tuning batch size and batch window settings. For mission-critical workloads, provisioned concurrency provides consistent performance with controlled pricing.
  4. Event filtering – AWS Lambda supports event filtering for Amazon MSK event sources, which means you can process only the Kafka records that match specific criteria. This reduces unnecessary function invocations and optimizes your function costs. You can define up to five filters per event source mapping (with the option to request an increase to ten). Each filter uses a JSON-based pattern to specify the conditions a record must meet to be processed. Filters can be applied using the AWS Management Console, AWS Command Line Interface (AWS CLI), or AWS Serverless Application Model (AWS SAM) templates. For more details and examples, refer to the AWS Lambda documentation on event filtering with Amazon MSK.
  5. Handling Availability Zone outage for your consumer – Amazon MSK enables high availability for your Kafka brokers by distributing them across multiple Availability Zones within a Region. To maintain high availability across your application, you similarly need a consumer that offers high availability. AWS Lambda offers high availability and resilience by running your consumer functions across multiple Availability Zones in a Region. This means that even if one Availability Zone experiences an outage, your Lambda function will continue to operate in other healthy Availability Zones. While Lambda manages security patching and Availability Zone failure scenarios, you can focus on your application logic.
  6. Cross-account event processingCross-account connectivity between AWS Lambda and Amazon MSK allows a Lambda function in one AWS account to consume data from an MSK cluster in another account using MSK multi-VPC private connectivity powered by AWS PrivateLink. This setup is particularly beneficial for organizations that centralize Kafka infrastructure while maintaining separate accounts for different applications or teams.
  7. Support for JSON, Avro, Protobuf, and Schema Registries – AWS Lambda supports Kafka events in JSON, Avro and Protobuf formats via event source mapping. It integrates with AWS Glue Schema registry, Confluent Cloud Schema registry, and self-managed Confluent Schema registry , enabling native schema validation, filtering, and deserialization without custom code.

How Lambda processes messages from your Kafka topic

Lambda uses event source mappings to process records from Amazon MSK by actively polling Kafka topics through event pollers that invoke Lambda functions with batches of records. These mappings are Lambda managed resources designed for high-throughput, stream-based processing. By default, Lambda detects the OffsetLag for all partitions in your Kafka topic and automatically scales pollers based on traffic. For high-throughput applications, you can enable provisioned mode to define minimum and maximum pollers, and your event source mapping auto scales between the minimum and maximum defined values. In the provisioned mode, each poller can process up to 5 MBps and supports concurrent Lambda invocations.

After Lambda processes each batch, it commits the offsets of the messages in that batch. If your function returns an error for a message in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing. To maintain ordered processing within a partition, Lambda limits the maximum event pollers to the number of partitions in the topic. When setting up Kafka as a Lambda event source, you can specify a consumer group ID to let Lambda join an existing Kafka consumer group. If other consumers are active in that group, Lambda will receive only part of the topic’s messages. If the group exists, Lambda starts from the group’s committed offset, ignoring the StartingPosition. The following diagram illustrates this flow.

Walkthrough: Build a serverless Kafka app with AWS Lambda

Follow these steps to build a serverless application that consumes messages from an MSK cluster using AWS Lambda:

  1. Create an Amazon MSK cluster. Use the AWS Management Console or AWS CLI to create your MSK cluster. When the cluster is up, create your Kafka topic(s). For detailed instructions, refer to the Amazon MSK documentation.
  2. Create a Lambda function using the AWS Management Console or the AWS CLI. To learn more about creating a Lambda function, refer to Create your first Lambda function. The Lambda function’s execution role needs to have the following permissions:
    1. Access to connect to your MSK cluster
    2. Permissions to manage elastic network interfaces in your VPC
  3. To connect Lambda to Amazon MSK as a consumer, set up event source mapping to link your MSK topic with the Lambda function. This allows Lambda to automatically poll for new messages and process them. Follow the guide on how to configure event source mapping.

For reference, configuring event source mapping involves three steps:

  1. Network setup – In the default event source mapping mode, you need to configure a networking setup using a PrivateLink endpoint or NAT gateway for event source mapping to invoke Lambda functions. In provisioned mode, no networking configuration is needed (and you don’t incur the cost of networking components).
  2. Event source mapping parameter configuration – This involves setting necessary configuration parameters for the event source mapping to be able to poll messages from your Kafka cluster. This includes the MSK cluster, topic name, consumer group ID, authentication method, and optionally, schema registry, scaling mode. You can configure the scaling mode for provisioned throughput, along with batch size, batch window, and event filtering for your event source mapping.
  3. Access permissions – This involves configuring required permissions to access the required AWS resources, and includes configuring permissions for the function to execute the code, permissions for the event source mapping to access your MSK cluster, and permissions for Lambda to access your VPC resources.

The following screenshot shows the console setup for configuring Amazon MSK event source mapping, including the Amazon MSK trigger related fields.

The following screenshot shows event poller configuration.

The following screenshot shows additional settings you can use, depending on your use case.

Optimizing AWS Lambda for stream processing with Amazon MSK

When building real-time data processing pipelines with Amazon MSK and AWS Lambda, it’s important to tune your setup for both performance and cost-efficiency. Lambda offers powerful serverless compute capabilities, but to get the most out of it in a streaming context, you need to make a few key optimizations:

  1. Enable provisioned concurrency for low-latency processing – For workloads that are sensitive to latency—cold starts can introduce unwanted delays. By enabling provisioned concurrency, you can pre-warm a specified number of Lambda instances so they’re always ready to handle traffic immediately. This eliminates cold starts and provides consistent response times, which is crucial for latency-critical use cases.
  2. Enable provisioned mode for event source mapping for high-throughput processing – For Kafka workloads with stringent throughput requirements, activate the provisioned mode. The optimal configuration of minimum and maximum event pollers for your Kafka event source mapping depends on your application’s performance requirements. Start with the default minimum event pollers to baseline the performance profile and adjust event pollers based on observed message processing patterns and your application’s performance requirements. For workloads with spiky traffic and strict performance needs, increase the minimum event pollers to handle sudden surges. You can fine-tune the minimum event pollers by evaluating your desired throughput, your observed throughput, which depends on factors such as the ingested messages per second and average payload size, and using the throughput capacity of one event poller (up to 5 MB/s) as reference. To maintain ordered processing within a partition, Lambda caps the maximum event pollers at the number of partitions in the topic.
  3. Optimize message batching using size and windowing – By integrating Lambda with Amazon MSK, you can control how messages are batched before they’re sent to your function. Tuning parameters such as batch size (the number of records per invocation: 1–10,000 records) and maximum batching window (how long to wait for a full batch: 0–300 seconds) can significantly impact performance. Larger batches mean fewer invocations, which reduces overhead and improves throughput. However, it’s important to strike a balance—too large a batch or window might introduce unwanted processing delays. Monitor your stream’s behavior and adjust these settings based on throughput requirements and acceptable latency.
  4. Apply filters to reduce unnecessary invocations – Not every record in your Kafka topic might require processing. To avoid unnecessary Lambda invocations (and associated costs), apply filtering logic directly when configuring the event source mapping. With Lambda, you can define filtering (up to 10 filters) criteria so that only relevant records trigger your function. This helps reduce compute time, minimize noise, and optimize your budget, especially when dealing with high-throughput topics with mixed content. For Amazon MSK, Lambda commits offsets for matched and unmatched messages after successfully invoking the function.

Conclusion

By combining Amazon MSK with AWS Lambda, you can seamlessly build modern, serverless event-driven applications. This integration eliminates the need to manage consumer groups, compute infrastructure, or scaling logic so teams can focus on delivering business value faster.

Whether you’re integrating Kafka into microservices, transforming data pipelines, or building reactive applications, Lambda with Amazon MSK is a powerful and flexible serverless solution. For detailed documentation on how to configure Lambda with Amazon MSK, refer to the AWS Lambda Developer Guide. For more serverless learning resources, visit Serverless Land.


About the Authors

Tarun Rai Madan is a Principal Product Manager at Amazon Web Services (AWS). He specializes in serverless technologies and leads product strategy to help customers achieve accelerated business outcomes with event-driven applications, using services like AWS Lambda, AWS Step Functions, Apache Kafka, and Amazon SQS/SNS. Prior to AWS, he was an engineering leader in the semiconductor industry, and led development of high-performance processors for wireless, automotive, and data center applications.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS with over 25 years of experience in the IT industry. He collaborates with AWS customers worldwide to architect and implement sophisticated data streaming solutions that address complex business challenges. As an expert in distributed computing, Sayem specializes in designing large-scale distributed systems architecture for maximum performance and scalability. He has a keen interest and passion for distributed architecture, which he applies to designing enterprise-grade solutions at internet scale.

Introducing AWS Lambda native support for Avro and Protobuf formatted Apache Kafka events

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/introducing-aws-lambda-native-support-for-avro-and-protobuf-formatted-apache-kafka-events/

AWS Lambda now provides native support for Apache Avro and Protocol Buffers (Protobuf) formatted events with Apache Kafka event source mapping (ESM) when using Provisioned Mode. The support allows you to validate your schema with popular schema registries. This allows you to use and filter the more efficient binary event formats and share data using schema in a centralized and consistent way. This blog post shows how you can use Lambda to process Avro and Protobuf formatted events from Kafka topics using schema registry integration.

This new capability works with both Amazon Managed Streaming for Apache Kafka (Amazon MSK), Confluent Cloud and self-managed Kafka clusters. To get started, update your existing Kafka ESM to Provisioned Mode and add schema registry configuration, or create a new ESM in Provisioned Mode with schema registry integration enabled.

Avro and Protobuf

Many organizations use Avro and Protobuf formats with Apache Kafka because these binary serialization formats offer advantages over JSON. They provide 50-80% smaller message sizes, faster serialization and deserialization performance, robust schema evolution capabilities, and strong typing across multiple programming languages.Working with these formats in Lambda functions previously necessitated custom code. Developers needed to implement schema registry clients, handle authentication and caching, write format-specific deserialization logic, and manage schema evolution scenarios.

What’s new

Lambda’s Kafka Event Source Mapping (ESM) now provides built-in integration with AWS Glue Schema Registry, Confluent Cloud Schema Registry, and self-managed Confluent Schema Registry. When you configure schema registry settings for your Kafka ESM, the service automatically validates incoming JSON Schema, Avro, and Protobuf records against their registered schema. This moves complex schema registry integration logic from your application layer to the managed Lambda service.

You can build your function with Kafka’s open-source ConsumerRecords interface using Powertools for AWS Lambda to get your Avro or Protobuf generated business objects directly. Optionally you can specify to get your records in the JSON format, where your function receives clean, validated JSON data regardless of the original serialization format, removing the need for custom deserialization code in your Lambda functions. This also allows you to create Kafka consumers across multiple programming languages.

Powertools for AWS Lambda is a developer toolkit that provides specific support for Java, .NET, Python, and TypeScript, maintaining consistency with existing Kafka development patterns. You can directly access business objects without custom deserialization code.

You can also setup filtering rules to discard irrelevant, JSON, Avro or Protobuf formatted events before function invocations, which can improve processing performance and reduce costs.

How schema validation works

When you configure schema registry integration for your Kafka ESM, you specify the registry endpoint, authentication details, and which event fields (key, value, or both) to validate. The ESM polls your Kafka topics for records as usual but now performs additional processing before invoking your Lambda function.For each incoming event, the ESM extracts the schema ID embedded in the serialized data. It fetches the corresponding schema from your configured registry. This process happens transparently, with schema definitions cached for up to 24 hours to optimize performance. The ESM identifies the format of your events using schema metadata and validates the event structure. It keeps either the original binary data or deserializes it to JSON format based on your customer configuration and sends it to your function for processing.


Figure 1: Kafka processing flow diagram.

The ESM handles schema evolution automatically. When producers begin using new schema versions, the service detects the updated schema IDs and fetches the latest definitions from your registry. This makes sure that your functions always receive properly deserialized data without requiring code changes.

Event record format

As a part of the ESM schema registry configuration, you need to specify Event Record Format, which Lambda uses to deliver validated records to your function. The schema registry configuration supports SOURCE and JSON.

SOURCE preserves the original binary format of the data as a base64-encoded string with producer-appended schema-id removed. This allows direct conversion to Avro or Protobuf objects so that you can use Kafka’s ConsumerRecords interface for a Kafka-like experience. Use this format when working with strongly typed languages or when you need to maintain the full capabilities of Avro or Protobuf schemas. Then, you can use any Avro or Protobuf deserializer to convert raw bytes to your business object. Powertools provides native support for this deserialization.

With JSON, the ESM deserializes the data ready for direct use in languages with native JSON support. Use this when you don’t need to preserve the original binary format or work with generated classes. You can also use Powertools to convert the base64 to your business object. See the documentation for payload formats and deserialization behavior.

If you configure filtering rules, then they operate on the JSON-formatted events after deserialization. This upstream filtering prevents unnecessary Lambda invocations for events that don’t match your processing criteria, directly reducing your compute costs.

Configuration and setup

To use this feature, you must enable Provisioned Mode for your Kafka ESM, which provides the dedicated compute resources needed for schema registry integration.

You can configure the integration through the AWS Management ConsoleAWS Command Line Interface (AWS CLI)AWS Language SDKs, or infrastructure as code (IaC) tools such as the AWS Serverless Application Model (AWS SAM) or AWS Cloud Development Kit (AWS CDK).

Your schema registry configuration includes the registry endpoint URL, authentication method (AWS Identity and Access Management (IAM) for AWS Glue Schema Registry, or Basic Auth, SASL/SCRAM, or mTLS for Confluent registries), and validation settings. You specify which event attributes to validate and optionally define filtering rules using standard Lambda event filtering syntax.

For error handling, configure Lambda failure destinations where events that fail schema validation or deserialization are sent. This makes sure that problematic events don’t disappear silently but are routed to other services such as Amazon Simple Queue Service (Amazon SQS), Amazon Simple Notification Service (Amazon SNS), and Amazon S3 for debugging and analysis.

Seeing the new features in action

There are a number of Serverless Patterns that you can use to process Kafka streams using Lambda. This example uses the Java pattern.

Deploy a sample Amazon MSK cluster

To set up an Amazon MSK cluster, follow the instructions in the GitHub repo and create a new AWS CloudFormation stack using the MSKAndKafkaClientEC2.yaml template file. The stack creates the Amazon MSK cluster, along with a client Amazon EC2 instance, to manage the Kafka cluster. There are costs involved when running this infrastructure.

  1. Connect to the EC2 instance using EC2 Instance Connect.
  2. Check that the Kafka topic is created by checking the contents of the kafka_topic_creator_output.txt file.
    cat kafka_topic_creator_output.txt

  3. The file should contain the text: “Created topic MskIamJavaLambdaTopic.”

Deploy the Glue schema registry and consumer Lambda function

The EC2 instance contains the software needed to deploy the schema registry and Lambda function.

  1. Change directory to the pattern directory.
    cd serverless-patterns/msk-lambda-iam-java-sam
  2. Build the application using AWS SAM.
    sam build
  3. To deploy your application for the first time, run the following in the EC2 instance shell:
    sam deploy --capabilities CAPABILITY_IAM --no-confirm-changeset \
    	--no-disable-rollback --region $AWS_REGION --stack-name msk-lambda-schema-avro-java-sam --guided

  4. You can accept all the defaults by hitting Enter. You can browse to the AWS Glue schema registry console and view the ContactSchema definition:
    {
      "type": "record",
      "name": "Contact",
      "fields": [
        {"name": "firstname", "type": "string"},
        {"name": "lastname", "type": "string"},
        {"name": "company", "type": "string"},
        {"name": "street", "type": "string"},
        {"name": "city", "type": "string"},
        {"name": "county", "type": "string"},
        {"name": "state", "type": "string"},
        {"name": "zip", "type": "string"},
        {"name": "homePhone", "type": "string"},
        {"name": "cellPhone", "type": "string"},
        {"name": "email", "type": "string"},
        {"name": "website", "type": "string"}
      ]
    }
    

    The consumer Lambda function ESM is configured for Provisioned Mode.

  5. View the ESM configuration from the Lambda console for the Lambda function name prefixed with msk-lambda-schema-avro-ja-LambdaMSKConsumer.
  6. Choose the MSK Lambda trigger which opens the Triggers pane under Configuration.
    Figure 2: View Lambda ESM schema configuration
  7. The configuration specifies using the Event record format SOURCE so your function can use Kafka’s native open-source ConsumerRecords interface. Powertools then deserializes the payload.
  8. The schema validation attribute is VALUE.
  9. The ESM filter configuration only processes the records that match zip codes of 2000.
  10. In your function code, specify the open-source Kafka ConsumersRecords interface by including Powertools for Lambda as a dependency. ConsumerRecords provides metadata about Kafka records and allows you to get direct access to your Avro/Protobuf generated business objects without requiring any additional deserialization code.
package com.amazonaws.services.lambda.samples.events.msk;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class AvroKafkaHandler implements RequestHandler<ConsumerRecords<String, Contact>, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AvroKafkaHandler.class);

    @Override
    @Logging(logEvent = true)
    @Deserialization(type = DeserializationType.KAFKA_AVRO)
    public String handleRequest(ConsumerRecords<String, Contact> records, Context context) {
        LOGGER.info("=== AvroKafkaHandler called ===");
        LOGGER.info("Event object: {}", records);
        LOGGER.info("Number of records: {}", records.count());
        
        for (ConsumerRecord<String, Contact> record : records) {
            LOGGER.info("Processing record - Topic: {}, Partition: {}, Offset: {}", 
                       record.topic(), record.partition(), record.offset());
            LOGGER.info("Record key: {}", record.key());
            LOGGER.info("Record value: {}", record.value());
            
            if (record.value() != null) {
                Contact contact = record.value();
                LOGGER.info("Contact details - firstName: {}, zip: {}", 
                           contact.getFirstname(), contact.getZip());
            }
        }
        
        LOGGER.info("=== AvroKafkaHandler completed ===");
        return "OK";
    }
}
Produce and consumer records

To send messages to Kafka, there is a LambdaMSKProducerJava function.

  1. Invoke the function from the Lambda console or CLI within the EC2 instance.
    sam remote invoke LambdaMSKProducerJavaFunction --region $AWS_REGION \
    	--stack-name msk-lambda-schema-avro-java-sam

  2. You can view the Producer logs to see the 10 records produced.The consumer Lambda function processes the records.
  3. View the consumer Lambda function logs using the Amazon CloudWatch logs console or CLI within the EC2 instance.
    sam logs --name LambdaMSKConsumerJavaFunction \
    	--stack-name msk-lambda-schema-avro-java-sam --region $AWS_REGION

The Lambda function processes and logs only the records that match the filter FILTER. The Avro binary data is deserialized using Powertools for AWS Lambda. You should see the function logs showing each record processed with the decoded keys and values.


Figure 3: Lambda consumer logs showing Avro processing

Cleaning up

You can clean up the example Lambda function by running the sam delete command.

sam delete

If you created the Amazon MSK cluster and EC2 client instance, then navigate to the CloudFormation console, choose the stack, and choose Delete.

Performance and cost considerations

Schema validation and deserialization can add processing time before your function invocation. However, this overhead is typically minimal when compared to the benefits. ESM caching minimizes schema registry API calls. Using filtering allows you to reduce costs, depending on how effectively your filtering rules eliminate irrelevant events. This feature simplifies the operational overhead of managing schema registry integration code so teams can focus on business logic rather than infrastructure concerns.

Error handling and monitoring

If schema registries become temporarily unavailable, then cached schemas allow event processing to continue until the registry is available again. Authentication failures generate error messages with automatic retry logic. Schema evolution happens seamlessly as Lambda automatically detects and fetches new versions.

If events fail validation or deserialization, they are routed to your configured failure destinations. For Amazon SQS and Amazon SNS destinations, the service sends metadata about the failure. For Amazon S3 destinations, both metadata and the original serialized payload are included for detailed analysis.

You can use standard Lambda monitoring, with more CloudWatch metrics providing visibility into schema validation success rates, registry API usage, and filtering effectiveness.

Conclusion

AWS Lambda now supports Avro and Protobuf formats for Kafka event processing in Provisioned Mode for Kafka ESM. This enables schema validation, event filtering, and integration with both Amazon MSK, Confluent, and self-managed Kafka clusters. Whether you’re building new Kafka applications or migrating existing consumers to Lambda, this native schema registry integration streamlines processing pipelines.

For more information about the Lambda Kafka integration capabilities, go to the learning guide, Lambda ESM documentation. To learn about Lambda pricing, such as Provisioned Mode costs, visit the Lambda pricing page.

For more serverless learning resources, visit Serverless Land.

Secure access to a cross-account Amazon MSK cluster from Amazon MSK Connect using IAM authentication

Post Syndicated from Venkata Sai Mahesh Swargam original https://aws.amazon.com/blogs/big-data/secure-access-to-a-cross-account-amazon-msk-cluster-from-amazon-msk-connect-using-iam-authentication/

Amazon Managed Streaming for Apache Kafka (MSK) Connect is a fully managed, scalable, and highly available service that enables the streaming of data between Apache Kafka and other data systems. Amazon MSK Connect is built on top of Kafka Connect, an open-source framework that provides a standard way to connect Kafka with external data systems. Kafka Connect supports a variety of connectors, which are used to stream data in and out of Kafka. MSK Connect extends the capabilities of Kafka Connect by providing a managed service with added security features, straightforward configuration, and automatic scaling capabilities, enabling businesses to focus on their data streaming needs without the overhead of managing the underlying infrastructure.

In some use cases, you might need to use an MSK cluster in one AWS account, but MSK Connect is located in a separate account. In this post, we demonstrate how to create a connector to achieve this use case. At the time of writing, MSK Connect connectors can be created only for MSK clusters that have AWS Identity and Access Management (IAM) role-based authentication or no authentication. We demonstrate how to implement IAM authentication after establishing network connectivity. IAM provides enhanced security measures, making sure your systems are protected against unauthorized access.

Solution overview

The connector can be configured for a variety of purposes, such as sinking data to an Amazon Simple Storage Service (Amazon S3) bucket, tracking the source database changes, or serving as a migration tool such as MirrorMaker2 on MSK Connect to transfer data from a source cluster to a target cluster this is located in a different account.

The following diagram illustrates a use case using Debezium and Amazon S3 source connectors.

The following diagram illustrates using S3 Sink and migration to a cross-account failover cluster using a MirrorMaker connector deployed on MSK Connect.

Currently MSK Connect connectors can be created only for MSK clusters which have IAM role-based authentication or no authentication. In this blog, I’ll guide you through the essential steps for implementing the industry-recommended IAM (Identity and Access Management) authentication after establishing network connectivity. IAM provides enhanced security measures, ensuring your systems are protected against unauthorized access.

The launch of multi-VPC private connectivity (powered by AWS PrivateLink) and cluster policy support for MSK clusters simplifies the connectivity of Kafka clients to brokers. By enabling this feature on the MSK cluster, you can use the cluster-based policy to manage all access control centrally in one place. In this post, we cover the process of enabling this feature on the source MSK cluster.

We don’t fully utilize the multi-VPC connectivity provided by this new feature because that requires you to use different bootstrap URLs with port numbers (14001:3) that are not supported by MSK Connect as of writing of this post. We explore a secure network connectivity solution that uses private connectivity patterns, as detailed in How Goldman Sachs builds cross-account connectivity to their Amazon MSK clusters with AWS PrivateLink.

Connecting to a cross-account MSK cluster from MSK Connect involves the following steps.

Steps to configure the MSK cluster in Account A:

  1. Enable the multi-VPC private connectivity(Private Link) feature for IAM authentication scheme that is enabled for your MSK cluster.
  2. Configure the cluster policy to allow a cross-account connector.
  3. Implement one of the preceding network connectivity patterns according to your use case to establish the connectivity with the Account B VPC and make network changes accordingly.

Steps to configure the MSK connector in Account B:

  1. Create an MSK connector in private subnets using the AWS Command Line Interface (AWS CLI).
  2. Verify the network connectivity from Account A and make network changes accordingly.
  3. Check the destination service to verify the incoming data.

Prerequisites

To follow along with this post, you should have an MSK cluster in one AWS account and MSK Connect in a separate account.

Set up the MSK cluster setup in Account A:

In this post, we only show the important steps that are required to enable the multi-VPC feature on an MSK cluster:

  1. Create a provisioned MSK cluster in Account A’s VPC with the following considerations, which are required for the multi-VPC feature:
    • Cluster version must be 2.7.1 or higher.
    • Instance type must be m5.large or higher.
    • Authentication should be IAM (you must not enable unauthenticated access for this cluster).
  2. After you create the cluster, go to the Networking settings section of your cluster and choose Edit. Then choose Turn on multi-VPC connectivity.

  1. Select IAM role-based authentication and choose Turn on selection.

It might take around 30 minutes to enable. This step is required to enable the cluster policy feature that allows the cross-account connector to access the MSK cluster.

  1. After it has been enabled, scroll down to Security settings and choose Edit cluster policy.
  2. Define your cluster policy and choose Save changes.

  1. The new cluster policy allows for defining a Basic or Advanced cluster policy. With the Basic option, it only allows CreateVPCConnection, GetBootstrapBrokers, DescribeCluster, and DescribeClusterV2 actions that are required for creating the cross-VPC connectivity to your cluster. However, we have to use Advanced to allow more actions that are required by the MSK Connector. The policy should be as follows:
    {
    
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "AWS": "Connector-AccountId"
            },
            "Action": [
                "kafka:CreateVpcConnection",
                "kafka:GetBootstrapBrokers",
                "kafka:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:CreateTopic",
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
    "Resource": [
                    "arn:aws:kafka:<region>:<Cluster-AccountId>:cluster/<cluster-name>/<uuid>",
                    "arn:aws:kafka:<region>:<Cluster-AccountId>:topic/<cluster-name>/<uuid>/<specific-topic-name>",
                    "arn:aws:kafka:<region>:<Cluster-AccountId>:group/<cluster-name>/<uuid>/<specific-group-name>"
                ]
        }]
    }

You might need to modify the preceding permissions to limit access to your resources (topics, groups). Also, you can restrict access to a specific connector by giving the connector IAM role, or you can mention the account number to allow the connectors in that account.

Now the cluster is ready. However, you need to make sure of the network connectivity between the cross-account connector VPC and the MSK cluster VPC.

If you’re using VPC peering or Transit Gateway while connecting to MSK Connect either from cross-account or the same account, do not configure your connector to reach the peered VPC resources with IPs in the following CIDR ranges (for more details, see Connecting from connectors):

  • 10.99.0.0/16
  • 192.168.0.0/16
  • 172.21.0.0/16

In the MSK cluster security group, make sure you allowed port 9098 from Account B network resources and make changes in the subnets according to your network connectivity pattern.

Set up the MSK connector in Account B:

In this section, we demonstrate how to use the S3 Sink connector. However, you can use a different connector according to your use case and make the changes accordingly.

  1. Create an S3 bucket (or use an existing bucket).
  2. Make sure that the VPC that you’re using in this account has a security group and private subnets. If your connector for MSK Connect needs access to the internet, refer to Enable internet access for Amazon MSK Connect.
  3. Verify the network connectivity between Account A and Account B by using the telnet command to the broker endpoints with port 9098.
  4. Create an S3 VPC endpoint.
  5. Create a connector plugin according to your connector plugin provider (confluent or lenses). Make a note of the custom plugin Amazon Resource Name (ARN) to use in a later step.
  6. Create an IAM role for your connector to allow access to your S3 bucket and the MSK cluster.
    • The IAM role’s trust relationship should be as follows:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Principal": {
                      "Service": "kafkaconnect.amazonaws.com"
                  },
                  "Action": "sts:AssumeRole"
              }
          ]
      }

    • Add the following S3 access policy to your IAM role:
      {
          "Version": "2012-10-17",
          "Statement": [{
              "Effect": "Allow",
              "Action": [
                  "s3:ListAllMyBuckets",
                  "s3:ListBucket",
                  "s3:GetBucketLocation",
                  "s3:DeleteObject",
                  "s3:PutObject",
                  "s3:GetObject",
                  "s3:AbortMultipartUpload",
                  "s3:ListMultipartUploadParts",
                  "s3:ListBucketMultipartUploads"
              ],
              "Resource": [
                              "arn:aws:s3:::<destination-bucket>",
                           "arn:aws:s3:::<destination-bucket>/*"
              ],
                 "Condition": {
              "StringEquals": {
                     "aws:SourceVpc": "vpc-xxxx"
                     }
                     }
          }]
      }

    • The following policy contains the required actions by the connector:
      {
      "Version": "2012-10-17",
      "Statement": [
         {
              "Effect": "Allow",
              "Action": [
                  "kafka-cluster:Connect",
                  "kafka-cluster:DescribeCluster",
                  "kafka-cluster:ReadData",
                  "kafka-cluster:DescribeTopic",
                  "kafka-cluster:WriteData",
                  "kafka-cluster:CreateTopic",
                  "kafka-cluster:AlterGroup",
                  "kafka-cluster:DescribeGroup"
              ],
              "Resource": [
                  "arn:aws:kafka:<region>:<Cluster-AccountId>:cluster/<cluster-name>/<uuid>",
                  "arn:aws:kafka:<region>:<Cluster-AccountId>:topic/<cluster-name>/<uuid>/<specific-topic-name>",
                  "arn:aws:kafka:<region>:<Cluster-AccountId>:group/<cluster-name>/<uuid>/<specific-group-name>"
              ]
          }
      ]
      }

You might need to modify the preceding permissions to limit access to your resources (topics, groups)

Finally, it’s time to create the MSK connector. Because the Amazon MSK console doesn’t allow viewing MSK clusters in other accounts, we show you how to use the AWS CLI instead. We also use basic Amazon S3 configuration for testing purposes. You might need to modify the configuration according to your connector’s use case.

  1. Create a connector using the AWS CLI with the following command with the required parameters of the connector, along with Account A’s MSK cluster broker endpoints:
    aws kafkaconnect create-connector \
    --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" \
    --connector-configuration \
    "connector.class=io.confluent.connect.s3.S3SinkConnector, \
    s3.region=<region>, \
    schema.compatibility=NONE, \
    flush.size=2, \
    tasks.max=1, \
    topics=<MSK-Cluster-topic>, \
    security.protocol=SASL_SSL, \
    s3.compression.type=gzip, \
    format.class=io.confluent.connect.s3.format.json.JsonFormat, \
    sasl.mechanism=AWS_MSK_IAM, \
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required, \
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler, \
    value.converter=org.apache.kafka.connect.storage.StringConverter, \
    storage.class=io.confluent.connect.s3.storage.S3Storage, \
    s3.bucket.name=<s3-bucket-name>, \
    timestamp.extractor=Record, \
    key.converter=org.apache.kafka.connect.storage.StringConverter" \
    --connector-name "Connector-name" \
    --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<broker-strings>:9098","vpc": {"securityGroups": ["sg-0b36a015789f859a3"],"subnets": ["subnet-07950da1ebb8be6d8","subnet-026a729668f3f9728"]}}}' \
    --kafka-cluster-client-authentication "authenticationType=IAM" \
    --kafka-cluster-encryption-in-transit "encryptionType=TLS" \
    --kafka-connect-version "2.7.1" \
    --log-delivery workerLogDelivery='{cloudWatchLogs={enabled=true,logGroup="<MSKConnect-log-group-name>"}}' \
    --plugins "customPlugin={customPluginArn=<Custom-Plugin-ARN>,revision=1}" \
    --service-execution-role-arn "<IAM-role-ARN>"

  2. After you create the connector, connect the producer to your topic and insert data into it. In the following code, we use a Kafka client to insert data for testing purposes:
    bin/kafka-console-producer.sh --broker-list <broker-string> --producer.config client.properties --topic <topic-name>

If everything is set up correctly, you should see the data in your destination S3 bucket. If not, check the troubleshooting tips in the following section.

Troubleshooting tips

After deploying the connector, if it’s in the CREATING state on the connector details page, access the Amazon CloudWatch log group specified in your connector creation request. Review the logs for any errors. If no errors are found, wait for the connector to complete its creation process.

Additionally, make sure the IAM roles have their required permissions, and check the security groups and NACLs for proper connectivity between VPCs.

Clean up

When you’re done testing this solution, clean up any unwanted resources to avoid ongoing charges

Conclusion

In this post, we demonstrated how to create an MSK connector when you need to use an MSK cluster in one AWS account, but MSK Connect is located in a separate account. This architecture includes an S3 Sink connector for demonstration purposes, but it can accommodate other types of sink and source connectors. Additionally, this architecture focuses solely on IAM authenticated connectors. If an unauthenticated connector is desired, the multi-VPC connectivity (PrivateLink) and cluster policy components can be ignored. The remaining process, which involves creating a network connection between the account VPCs, remains the same.

Try out the solution for yourself, and let us know your questions and feedback in the comments section.

Check out more AWS Partners or contact an AWS Representative to learn how we can help accelerate your business.


About the Author

Venkata Sai Mahesh Swargam is a Cloud Engineer at AWS in Hyderabad. He specializes in Amazon MSK and Amazon Kinesis services. Mahesh is dedicated to helping customers by providing technical guidance and solving issues related to their Amazon MSK architectures. In his free time, he enjoys being with family and traveling around the world.

Build a secure serverless streaming pipeline with Amazon MSK Serverless, Amazon EMR Serverless and IAM

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/build-a-secure-serverless-streaming-pipeline-with-amazon-msk-serverless-amazon-emr-serverless-and-iam/

The exponential growth and vast volume of streaming data have made it a vital resource for organizations worldwide. To unlock its full potential, real-time analytics are essential for extracting actionable insights. Derived from a wide range of sources, including social media, Internet of Things (IoT) sensors, and user interactions, streaming data empowers businesses to respond promptly to emerging trends and events, make informed decisions, and stay ahead of the competition.

Commonly streaming applications use Apache Kafka for data ingestion and Apache Spark Structured Streaming for processing. However, integrating and securing these components poses considerable challenges for users. The complexity of managing certificates, keystores, and TLS configurations to connect Spark Streaming to Kafka brokers demands specialized expertise. A managed, serverless framework would greatly simplify this process, alleviating the need for manual configuration and streamlining the integration of these critical components.

To simplify the management and security of traditional streaming architectures, you can use Amazon Managed Streaming for Apache Kafka (Amazon MSK). This fully managed service simplifies data ingestion and processing. Amazon MSK Serverless alleviates the need for cluster management and scaling, and further enhances security by integrating AWS Identity and Access Management (IAM) for authentication and authorization. This consolidated approach replaces complex certificate and key management require by TLS client authentication through AWS Certificate Manager, streamlining operations and bolstering data protection. For instance, when a client attempts to write data to the cluster, MSK Serverless verifies both the client’s identity and its permissions using IAM.

For efficient data processing, you can use Amazon EMR Serverless with a Spark application built on the Spark Structured Streaming framework, enabling near real-time data processing. This setup seamlessly handles large volumes of data from MSK Serverless, using IAM authentication for secure and swift data processing.

The post demonstrates a comprehensive, end-to-end solution for processing data from MSK Serverless using an EMR Serverless Spark Streaming job, secured with IAM authentication. Additionally, it demonstrates how to query the processed data using Amazon Athena, providing a seamless and integrated workflow for data processing and analysis. This solution enables near real-time querying of the latest data processed from MSK Serverless and EMR Serverless using Athena, providing instant insights and analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this post.

The workflow consists of the following steps:

  1. The architecture begins with an MSK Serverless cluster set up with IAM authentication. An Amazon Elastic Compute Cloud (Amazon EC2) instance runs a Python script producer.py that acts as a data producer, sending sample data to a Kafka topic within the cluster.
  2. The Spark Streaming job retrieves data from the Kafka topic, stores it in Amazon Simple Storage Service (Amazon S3), and creates a corresponding table in the AWS Glue Data Catalog. As it continuously consumes data from the Kafka topic, the job stays up-to-date with the latest streaming data. With checkpointing enabled, the job tracks processed records, allowing it to resume from where it left off in case of a failure, providing seamless data processing.
  3. To analyze this data, users can use Athena, a serverless query service. Athena enables interactive SQL-based exploration of data directly in Amazon S3 without the need for complex infrastructure management.

Prerequisites

Before getting started, make sure you have the following:

  • An active AWS account with billing enabled
  • An IAM user with administrator access (AdministratorAccess policy) or specific permissions to create and manage resources such as a virtual private cloud (VPC), subnet, security group, IAM roles, NAT gateway, internet gateway, EC2 client, MSK Serverless, EMR Serverless, Amazon EMR Studio, and S3 buckets
  • Sufficient VPC capacity in your chosen AWS Region

Although using an IAM user with administrator access will work, it’s recommended to follow the principle of least privilege in production environments by creating custom IAM policies with only the necessary permissions. The IAM user we create has the AdministrativeAccess policy attached to it. However, you might not need such elevated access.

For this post, we create the solution resources in the us-east-2 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Create MSK Serverless and EMR Serverless resources

The vpc-msk-emr-serverless-studio.yaml stack creates a VPC, subnet, security group, IAM roles, NAT gateway, internet gateway, EC2 client, MSK Serverless, EMR Serverless, EMR Studio, and S3 buckets. To create the solution resources, complete the following steps:

  1. Launch the stack vpc-msk-emr-serverless-studio using the CloudFormation template:

  1. Provide the parameter values as listed in the following table.
Parameters Description Sample value
EnvironmentName An environment name that is prefixed to resource names. msk-emr-serverless-pipeline
InstanceType Amazon MSK client EC2 instance type. t2.micro
LatestAmiId Latest AMI ID of Amazon Linux 2023 for ec2 instance. You can use the default value. /aws/service/ami-amazon-linux-latest/al2023-ami-kernel-6.1-x86_64
VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24

The stack creation process can take approximately 10 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

Next, you set up the data ingestion to the Kafka topic from the Kafka EC2 instance.

Produce records to Kafka topic

Complete the following steps to set up data ingestion:

  1. On the Amazon EC2 console, go to the EC2 instance that you created using the CloudFormation template.

  1. Log in to the EC2 instance using Session Manager, a capability of AWS Systems Manager.
  2. Choose the instance msk-emr-serverless-blog and then choose Connect.

  1. Create a Kafka topic in MSK Serverless from the EC2 instance.
    1. In the following export command, replace my-endpoint with the MSKBootstrapServers value from the CloudFormation stack output:
      $ sudo su - ec2-user
      $ BS=<your-msk-serverless-endpoint (e.g.) boot-xxxxxx.yy.kafka-serverless.us-east-2.amazonaws.com:9098>

    2. Run the following command on the EC2 instance to create a topic called sales_data_topic:

Kafka client already installed at ec2-user home directory (/home/ec2-user) with MSK IAM Authentication jar and client configuration also created (/home/ec2-user/kafka_2.12-2.8.1/bin/client.properties) with IAM authentication properties.

The following code shows the contents of client.properties:

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

/home/ec2-user/kafka_2.12-2.8.1/bin/kafka-topics.sh \
--bootstrap-server $BS \
--command-config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
--create --topic sales_data_topic \
--partitions 10

Created topic sales_data_topic.
  1. Run the following command to produce records to the Kafka topic using the syntheticSalesDataProducer.py Python script present in EC2 instance. Update the Region accordingly.
nohup python3 -u syntheticSalesDataProducer.py --num_records 1000 \
--sales_data_topic sales_data_topic --bootstrap_server $BS \
--region=us-east-2 > syntheticSalesDataProducer.log &

Understanding Amazon MSK IAM authentication with EMR Serverless

Amazon MSK IAM authentication enables secure authentication and authorization for Kafka clusters (MSK Serverless) using IAM roles. When integrating with EMR Serverless Spark Streaming, Amazon MSK IAM authentication allows Spark jobs to access Kafka topics securely, using IAM roles for fine-grained access control. This provides secure data processing and streaming.

IAM policy configuration

To enable EMR Serverless jobs to authenticate with an MSK Serverless cluster using IAM, you need to attach specific Kafka-related IAM permissions to the EMR Serverless job execution role. These permissions allow the job to perform essential operations on the Kafka cluster, topics, and consumer groups.The following IAM policy must be attached to the EMR Serverless job execution role to enable necessary permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:cluster/<SERVERLESS_CLUSTER_NAME>/<ID>"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:topic/<SERVERLESS_CLUSTER_NAME>/*/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:group/<SERVERLESS_CLUSTER_NAME>/*/*"
            ],
            "Effect": "Allow"
        }
    ]
}

This code refers to the following actions:

  • Connect, DescribeCluster – Required to initiate a secure connection and obtain metadata
  • DescribeTopic, ReadData, WriteData – Enables data consumption and production
  • CreateTopic (optional) – Allows dynamic topic creation
  • AlterGroup, DescribeGroup – Needed for consumer group management in streaming jobs

These permissions make sure that the Spark Streaming job can securely authenticate and interact with MSK Serverless resources using its IAM role.

Required dependencies

To enable Amazon MSK IAM authentication in Spark (especially on EMR Serverless), specific JAR dependencies must be included in your Spark Streaming job using sparkSubmitParameters:

  • spark-sql-kafka-0-10_2.12 – This is the Kafka connector for Spark Structured Streaming. It provides the DataFrame API to read from and write to Kafka.
  • aws-msk-iam-auth – This JAR provides the IAM authentication mechanism required to connect to MSK Serverless using the AWS_MSK_IAM SASL mechanism.

You can include these dependencies directly by specifying them in the --packages argument when submitting the EMR Serverless job. For example:

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,software.amazon.msk:aws-msk-iam-auth:2.2.0

When the job is submitted, EMR Serverless will automatically download these JARs from Maven Central (or another configured repository) at runtime. You don’t need to bundle them manually unless offline usage or specific versions are required.

Spark Streaming job configuration for Amazon MSK IAM authentication

In your Spark Streaming application, configure the Kafka source with SASL properties to enable IAM based authentication. The following code shows the relevant configuration:

topic_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", topic_input)
    .option("startingOffsets", "earliest")
    .option("kafka.security.protocol","SASL_SSL")
    option("kafka.sasl.mechanism","AWS_MSK_IAM")
    .option("kafka.sasl.jaas.config","software.amazon.msk.auth.iam.IAMLoginModule required;")
    .option("kafka.sasl.client.callback.handler.class","software.amazon.msk.auth.iam.IAMClientCallbackHandler")
    .load()
    .selectExpr("CAST(value AS STRING)")
    )

Key properties include:

  • kafka.security.protocol = SASL_SSL – Enables encrypted communication over SSL with SASL authentication
  • kafka.sasl.mechanism = AWS_MSK_IAM – Tells Kafka to use the IAM based SASL mechanism
  • kafka.sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required; – Specifies the login module provided by AWS for IAM integration
  • kafka.sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler – Handles the actual signing and authentication using the IAM role

With these settings, Spark uses the IAM credentials attached to the EMR Serverless job execution role to authenticate to MSK Serverless without needing additional credentials, certificates, or secrets.

Data processing using an EMR Serverless streaming job with Amazon MSK IAM authentication

Complete the following steps to run a Spark Streaming job to process the data from MSK Serverless:

  1. Submit the Spark Streaming job to EMR Serverless using the AWS Command Line Interface (AWS CLI), which is already installed on the EC2 instance.
  2. Log in to the EC2 instance using Session Manager. Choose the instance msk-emr-serverless-blog and then choose Connect.
  3. Run the following command to submit the streaming job. Provide the parameters from the CloudFormation stack output.
sudo su - ec2-user

aws emr-serverless start-job-run \
--application-id <APPLICATION ID> \
--execution-role-arn <EXECUTION ROLE ARN> \
--mode 'STREAMING' \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<EMR BLOG SCRIPT BUCKET>/emr_pyspark_streaming_script/pysparkStreamingBlog.py",
"entryPointArguments":["--topic_input","sales_data_topic","--kafka_bootstrap_servers","<BOOTSTRAP URL WITH PORT>","--output_s3_path","s3://<EMR STREAMING OUTPUT BUCKET>/output/sales-order-data/","--checkpointLocation","s3://<EMR STREAMING OUTPUT BUCKET>/checkpointing/checkpoint-sales-order-data/","--database_name","emrblog","--table_name","sales_order_data"],
"sparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.executor.cores=2 --conf spark.executor.memory=5g --conf spark.driver.cores=2 --conf spark.driver.memory=5g --conf spark.executor.instances=5 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,software.amazon.msk:aws-msk-iam-auth:2.2.0"
}}'
  1. After you submit the job, log in to EMR Studio using the URL in the EmrServerlessStudioURL value from the CloudFormation stack output.
  2. In the navigation pane, choose Applications under Serverless.
  3. Choose the application ID in the EmrServerlessSparkApplicationID value from the CloudFormation stack output.
  4. On the Streaming job runs tab, verify that the job has been submitted and wait for it to begin running.

Validate the data in Athena

After the EMR Serverless Spark Streaming job ran and created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, open the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database emrblog that the streaming job created.
  4. To validate the data, run the following query:
SELECT 
    DATE_TRUNC('minute', date) AS minute_window, 
    ROUND(SUM(total_amount), 2) AS total_amount
FROM 
    emrblog.sales_order_data
WHERE 
    DATE_TRUNC('day', date) = CURRENT_DATE
GROUP BY 
    DATE_TRUNC('minute', date)
ORDER BY 
    minute_window DESC;

Clean up

To clean up your resources, complete the following steps:

  1. Log in to EMR Studio using the URL from the EmrServerlessStudioURL value in the CloudFormation stack output.
  2. In the navigation pane, choose Applications under Serverless.
  3. Choose the application ID from the EmrServerlessSparkApplicationID value in the CloudFormation stack output.
  4. On the Streaming job runs tab, select the job that has been running and cancel the job run.
  5. On the AWS CloudFormation console, delete the CloudFormation stack vpc-msk-emr-serverless-studio.

Conclusion

In this post, we showcased a serverless pipeline for streaming data with IAM authentication, empowering you to focus on deriving insights from your analytics. You can customize the EMR Serverless Spark Streaming code to apply transformations and filters, so only valid data is loaded into Amazon S3. This solution combines the power of Amazon EMR Spark Serverless streaming with MSK Serverless, securely integrated through IAM authentication. Now you can streamline your streaming processes without the complexity of managing Amazon MSK and Amazon EMR Spark Streaming integrations.


About the Authors

Shubham Purwar is an AWS Analytics Specialist Solution Architect. He helps organizations unlock the full potential of their data by designing and implementing scalable, secure, and high-performance analytics solutions on the AWS platform. With deep expertise in AWS analytics services, he collaborates with customers to uncover their distinct business requirements and create customized solutions that deliver actionable insights and drive business growth. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specialized in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Prashanthi Chinthala is a Cloud Engineer (DIST) at AWS. She helps customers overcome EMR challenges and develop scalable data processing and analytics pipelines on AWS.

Build multi-Region resilient Apache Kafka applications with identical topic names using Amazon MSK and Amazon MSK Replicator

Post Syndicated from Subham Rakshit original https://aws.amazon.com/blogs/big-data/build-multi-region-resilient-apache-kafka-applications-with-identical-topic-names-using-amazon-msk-and-amazon-msk-replicator/

Resilience has always been a top priority for customers running mission-critical Apache Kafka applications. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is deployed across multiple Availability Zones and provides resilience within an AWS Region. However, mission-critical Kafka deployments require cross-Region resilience to minimize downtime during service impairment in a Region. With Amazon MSK Replicator, you can build multi-Region resilient streaming applications to provide business continuity, share data with partners, aggregate data from multiple clusters for analytics, and serve global clients with reduced latency. This post explains how to use MSK Replicator for cross-cluster data replication and details the failover and failback processes while keeping the same topic name across Regions.

MSK Replicator overview

Amazon MSK offers two cluster types: Provisioned and Serverless. Provisioned cluster supports two broker types: Standard and Express. With the introduction of Amazon MSK Express brokers, you can now deploy MSK clusters that significantly reduce recovery time by up to 90% while delivering consistent performance. Express brokers provide up to 3 times the throughput per broker and scale up to 20 times faster compared to Standard brokers running Kafka. MSK Replicator works with both broker types in Provisioned clusters and along with Serverless clusters.

MSK Replicator supports an identical topic name configuration, enabling seamless topic name retention during both active-active or active-passive replication. This avoids the risk of infinite replication loops commonly associated with third-party or open source replication tools. When deploying an active-passive cluster architecture for regional resilience, where one cluster handles live traffic and the other acts as a standby, an identical topic configuration simplifies the failover process. Applications can transition to the standby cluster without reconfiguration because topic names remain consistent across the source and target clusters.

To set up an active-passive deployment, you have to enable multi-VPC connectivity for the MSK cluster in the primary Region and deploy an MSK Replicator in the secondary Region. The replicator will consume data from the primary Region’s MSK cluster and asynchronously replicate it to the secondary Region. You connect the clients initially to the primary cluster but fail over the clients to the secondary cluster in the case of primary Region impairment. When the primary Region recovers, you deploy a new MSK Replicator to replicate data back from the secondary cluster to the primary. You need to stop the client applications in the secondary Region and restart them in the primary Region.

Because replication with MSK Replicator is asynchronous, there is a possibility of duplicate data in the secondary cluster. During a failover, consumers might reprocess some messages from Kafka topics. To address this, deduplication should occur on the consumer side, such as by using an idempotent downstream system like a database.

In the next sections, we demonstrate how to deploy MSK Replicator in an active-passive architecture with identical topic names. We provide a step-by-step guide for failing over to the secondary Region during a primary Region impairment and failing back when the primary Region recovers. For an active-active setup, refer to Create an active-active setup using MSK Replicator.

Solution overview

In this setup, we deploy a primary MSK Provisioned cluster with Express brokers in the us-east-1 Region. To provide cross-Region resilience for Amazon MSK, we establish a secondary MSK cluster with Express brokers in the us-east-2 Region and replicate topics from the primary MSK cluster to the secondary cluster using MSK Replicator. This configuration provides high resilience within each Region by using Express brokers, and cross-Region resilience is achieved through an active-passive architecture, with replication managed by MSK Replicator.

The following diagram illustrates the solution architecture.

The primary Region MSK cluster handles client requests. In the event of a failure to communicate to MSK cluster due to primary region impairment, you need to fail over the clients to the secondary MSK cluster. The producer writes to the customer topic in the primary MSK cluster, and the consumer with the group ID msk-consumer reads from the same topic. As part of the active-passive setup, we configure MSK Replicator to use identical topic names, making sure that the customer topic remains consistent across both clusters without requiring changes from the clients. The entire setup is deployed within a single AWS account.

In the next sections, we describe how to set up a multi-Region resilient MSK cluster using MSK Replicator and also show the failover and failback strategy.

Provision an MSK cluster using AWS CloudFormation

We provide AWS CloudFormation templates to provision certain resources:

This will create the virtual private cloud (VPC), subnets, and the MSK Provisioned cluster with Express brokers within the VPC configured with AWS Identity and Access Management (IAM) authentication in each Region. It will also create a Kafka client Amazon Elastic Compute Cloud (Amazon EC2) instance, where we can use the Kafka command line to create and view a Kafka topic and produce and consume messages to and from the topic.

Configure multi-VPC connectivity in the primary MSK cluster

After the clusters are deployed, you need to enable the multi-VPC connectivity in the primary MSK cluster deployed in us-east-1. This will allow MSK Replicator to connect to the primary MSK cluster using multi-VPC connectivity (powered by AWS PrivateLink). Multi-VPC connectivity is only required for cross-Region replication. For same-Region replication, MSK Replicator uses an IAM policy to connect to the primary MSK cluster.

MSK Replicator uses IAM authentication only to connect to both primary and secondary MSK clusters. Therefore, although other Kafka clients can still continue to use SASL/SCRAM or mTLS authentication, for MSK Replicator to work, IAM authentication has to be enabled.

To enable multi-VPC connectivity, complete the following steps:

  1. On the Amazon MSK console, navigate to the MSK cluster.
  2. On the Properties tab, under Network settings, choose Turn on multi-VPC connectivity on the Edit dropdown menu.

  1. For Authentication type, select IAM role-based authentication.
  2. Choose Turn on selection.

Enabling multi-VPC connectivity is a one-time setup and it can take approximately 30–45 minutes depending on the number of brokers. After this is enabled, you need to provide the MSK cluster resource policy to allow MSK Replicator to talk to the primary cluster.

  1. Under Security settings¸ choose Edit cluster policy.
  2. Select Include Kafka service principal.

Now that the cluster is enabled to receive requests from MSK Replicator using PrivateLink, we need to set up the replicator.

Create a MSK Replicator

Complete the following steps to create an MSK Replicator:

  1. In the secondary Region (us-east-2), open the Amazon MSK console.
  2. Choose Replicators in the navigation pane.
  3. Choose Create replicator.
  4. Enter a name and optional description.

  1. In the Source cluster section, provide the following information:
    1. For Cluster region, choose us-east-1.
    2. For MSK cluster, enter the Amazon Resource Name (ARN) for the primary MSK cluster.

For cross-Region setup, the primary cluster will appear disabled if the multi-VPC connectivity is not enabled and the cluster resource policy is not configured in the primary MSK cluster. After you choose the primary cluster, it automatically selects the subnets associated with primary cluster. Security groups are not required because the primary cluster’s access is governed by the cluster resource policy.

Next, you select the target cluster. The target cluster Region is defaulted to the Region where the MSK Replicator is created. In this case, it’s us-east-2.

  1. In the Target cluster section, provide the following information:
    1. For MSK cluster, enter the ARN of the secondary MSK cluster. This will automatically select the cluster subnets and the security group associated with the secondary cluster.
    2. For Security groups, choose any additional security groups.

Make sure that the security groups have outbound rules to allow traffic to your secondary cluster’s security groups. Also make sure that your secondary cluster’s security groups have inbound rules that accept traffic from the MSK Replicator security groups provided here.

Now let’s provide the MSK Replicator settings.

  1. In the Replicator settings section, enter the following information:
    1. For Topics to replicate, we keep the topics to replicate as a default value that replicates all topics from the primary to secondary cluster.
    2. For Replication starting position, we choose Earliest, so that we can get all the events from the start of the source topics.
    3. For Copy settings, select Keep the same topic names to configure the topic name in the secondary cluster as identical to the primary cluster.

This makes sure that the MSK clients don’t need to add a prefix to the topic names.

  1. For this example, we keep the Consumer group replication setting as default and set Target compression type as None.

Also, MSK Replicator will automatically create the required IAM policies.

  1. Choose Create to create the replicator.

The process takes around 15–20 minutes to deploy the replicator. After the MSK Replicator is running, this will be reflected in the status.

Configure the MSK client for the primary cluster

Complete the following steps to configure the MSK client:

  1. On the Amazon EC2 console, navigate to the EC2 instance of the primary Region (us-east-1) and connect to the EC2 instance dr-test-primary-KafkaClientInstance1 using Session Manager, a capability of AWS Systems Manager.

After you have logged in, you need to configure the primary MSK cluster bootstrap address to create a topic and publish data to the cluster. You can get the bootstrap address for IAM authentication on the Amazon MSK console under View Client Information on the cluster details page.

  1. Configure the bootstrap address with the following code:
sudo su - ec2-user

export BS_PRIMARY=<<MSK_BOOTSTRAP_ADDRESS>>
  1. Configure the client configuration for IAM authentication to talk to the MSK cluster:
echo -n "security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
" > /home/ec2-user/kafka/config/client_iam.properties

Create a topic and produce and consume messages to the topic

Complete the following steps to create a topic and then produce and consume messages to it:

  1. Create a customer topic:
/home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server=$BS_PRIMARY \
--create --replication-factor 3 --partitions 3 \
--topic customer \
--command-config=/home/ec2-user/kafka/config/client_iam.properties
  1. Create a console producer to write to the topic:
/home/ec2-user/kafka/bin/kafka-console-producer.sh \
--bootstrap-server=$BS_PRIMARY --topic customer \
--producer.config=/home/ec2-user/kafka/config/client_iam.properties
  1. Produce the following sample text to the topic:
This is a customer topic
This is the 2nd message to the topic.
  1. Press Ctrl+C to exit the console prompt.
  2. Create a consumer with group.id msk-consumer to read all the messages from the beginning of the customer topic:
/home/ec2-user/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server=$BS_PRIMARY --topic customer --from-beginning \
--consumer.config=/home/ec2-user/kafka/config/client_iam.properties \
--consumer-property group.id=msk-consumer

This will consume both the sample messages from the topic.

  1. Press Ctrl+C to exit the console prompt.

Configure the MSK client for the secondary MSK cluster

Go to the EC2 cluster of the secondary Region us-east-2 and follow the previously mentioned steps to configure an MSK client. The only difference from the previous steps is that you should use the bootstrap address of the secondary MSK cluster as the environment variable. Configure the variable $BS_SECONDARY to configure the secondary Region MSK cluster bootstrap address.

Verify replication

After the client is configured to talk to the secondary MSK cluster using IAM authentication, list the topics in the cluster. Because the MSK Replicator is now running, the customer topic is replicated. To verify it, let’s see the list of topics in the cluster:

/home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server=$BS_SECONDARY \
--list --command-config=/home/ec2-user/kafka/config/client_iam.properties

The topic name is customer without any prefix.

By default, MSK Replicator replicates the details of all the consumer groups. Because you used the default configuration, you can verify using the following command if the consumer group ID msk-consumer is also replicated to the secondary cluster:

/home/ec2-user/kafka/bin/kafka-consumer-groups.sh --bootstrap-server=$BS_SECONDARY \
--list --command-config=/home/ec2-user/kafka/config/client_iam.properties

Now that we have verified the topic is replicated, let’s understand the key metrics to monitor.

Monitor replication

Monitoring MSK Replicator is very important to make sure that replication of data is happening fast. This reduces the risk of data loss in case an unplanned failure occurs. Some important MSK Replicator metrics to monitor are ReplicationLatency, MessageLag, and ReplicatorThroughput. For a detailed list, see Monitor replication.

To understand how many bytes are processed by MSK Replicator, you should monitor the metric ReplicatorBytesInPerSec. This metric indicates the average number of bytes processed by the replicator per second. Data processed by MSK Replicator consists of all data MSK Replicator receives. This includes the data replicated to the target cluster and filtered by MSK Replicator. This metric is applicable if you use Keep same topic name in the MSK Replicator copy settings. During a failback scenario, MSK Replicator starts to read from the earliest offset and replicates records from the secondary back to the primary. Depending on the retention settings, some data might exist in the primary cluster. To prevent duplicates, MSK Replicator processes the data but automatically filters out duplicate data.

Fail over clients to the secondary MSK cluster

In the case of an unexpected event in the primary Region in which clients can’t connect to the primary MSK cluster or the clients are receiving unexpected produce and consume errors, this could be a sign that the primary MSK cluster is impacted. You may notice a sudden spike in replication latency. If the latency continues to rise, it could indicate a regional impairment in Amazon MSK. To verify this, you can check the AWS Health Dashboard, though there is a chance that status updates may be delayed. Once you identify signs of a regional impairment in Amazon MSK, you should prepare to fail over the clients to the secondary region.

For critical workloads we recommend not taking a dependency on control plane actions for failover. To mitigate this risk, you could implement a pilot light deployment, where essential components of the stack are kept running in a secondary region and scaled up when the primary region is impaired. Alternatively, for faster and smoother failover with minimal downtime, a hot standby approach is recommended. This involves pre-deploying the entire stack in a secondary region so that, in a disaster recovery scenario, the pre-deployed clients can be quickly activated in the secondary region.

Failover process

To perform the failover, you first need to stop the clients pointed to the primary MSK cluster. However, for the purpose of the demo, we are using console producer and consumers, so our clients are already stopped.

In a real failover scenario, using primary Region clients to communicate with the secondary Region MSK cluster is not recommended, as it breaches fault isolation boundaries and leads to increased latency. To simulate the failover using the preceding setup, let’s start a producer and consumer in the secondary Region (us-east-2). For this, run a console producer in the EC2 instance (dr-test-secondary-KafkaClientInstance1) of the secondary Region.

The following diagram illustrates this setup.

Complete the following steps to perform a failover:

  1. Create a console producer using the following code:
/home/ec2-user/kafka/bin/kafka-console-producer.sh \
--bootstrap-server=$BS_SECONDARY --topic customer \
--producer.config=/home/ec2-user/kafka/config/client_iam.properties
  1. Produce the following sample text to the topic:
This is the 3rd message to the topic.
This is the 4th message to the topic.

Now, let’s create a console consumer. It’s important to make sure the consumer group ID is exactly the same as the consumer attached to the primary MSK cluster. For this, we use the group.id msk-consumer to read the messages from the customer topic. This simulates that we are bringing up the same consumer attached to the primary cluster.

  1. Create a console consumer with the following code:
/home/ec2-user/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server=$BS_SECONDARY --topic customer --from-beginning \
--consumer.config=/home/ec2-user/kafka/config/client_iam.properties \
--consumer-property group.id=msk-consumer

Although the consumer is configured to read all the data from the earliest offset, it only consumes the last two messages produced by the console producer. This is because MSK Replicator has replicated the consumer group details along with the offsets read by the consumer with the consumer group ID msk-consumer. The console consumer with the same group.id mimic the behaviour that the consumer is failed over to the secondary Amazon MSK cluster.

Fail back clients to the primary MSK cluster

Failing back clients to the primary MSK cluster is the common pattern in an active-passive scenario, when the service in the primary region has recovered. Before we fail back clients to the primary MSK cluster, it’s important to sync the primary MSK cluster with the secondary MSK cluster. For this, we need to deploy another MSK Replicator in the primary Region configured to read from the earliest offset from the secondary MSK cluster and write to the primary cluster with the same topic name. The MSK Replicator will copy the data from the secondary MSK cluster to the primary MSK cluster. Although the MSK Replicator is configured to start from the earliest offset, it will not duplicate the data already present in the primary MSK cluster. It will automatically filter out the existing messages and will only write back the new data produced in the secondary MSK cluster when the primary MSK cluster was down. The replication step from secondary to primary wouldn’t be required if you don’t have a business requirement of keeping the data same across both clusters.

After the MSK Replicator is up and running, monitor the MessageLag metric of MSK Replicator. This metric indicates how many messages are yet to be replicated from the secondary MSK cluster to the primary MSK cluster. The MessageLag metric should come down close to 0. Now you should stop the producers writing to the secondary MSK cluster and restart connecting to the primary MSK cluster. You should also allow the consumers to read data from the secondary MSK cluster until the MaxOffsetLag metric for the consumers is not 0. This makes sure that the consumers have already processed all the messages from the secondary MSK cluster. The MessageLag metric should be 0 by this time because no producer is producing records in the secondary cluster. MSK Replicator replicated all messages from the secondary cluster to the primary cluster. At this point, you should start the consumer with the same group.id in the primary Region. You can delete the MSK Replicator created to copy messages from the secondary to the primary cluster. Make sure that the previously existing MSK Replicator is in RUNNING status and successfully replicating messages from the primary to secondary. This can be confirmed by looking at the ReplicatorThroughput metric, which should be greater than 0.

Failback process

To simulate a failback, you first need to enable multi-VPC connectivity in the secondary MSK cluster (us-east-2) and add a cluster policy for the Kafka service principal like we did before.

Deploy the MSK Replicator in the primary Region (us-east-1) with the source MSK cluster pointed to us-east-2 and the target cluster pointed to us-east-1. Configure Replication starting position as Earliest and Copy settings as Keep the same topic names.

The following diagram illustrates this setup.

After the MSK Replicator is in RUNNING status, let’s verify there is no duplicate while replicating the data from the secondary to the primary MSK cluster.

Run a console consumer without the group.id in the EC2 instance (dr-test-primary-KafkaClientInstance1) of the primary Region (us-east-1):

/home/ec2-user/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server=$BS_PRIMARY --topic customer --from-beginning \
--consumer.config=/home/ec2-user/kafka/config/client_iam.properties

This should show the four messages without any duplicates. Although in the consumer we specify to read from the earliest offset, MSK Replicator makes sure the duplicate data isn’t replicated back to the primary cluster from the secondary cluster.

This is a customer topic
This is the 2nd message to the topic.
This is the 3rd message to the topic.
This is the 4th message to the topic.

You can now point the clients to start producing to and consuming from the primary MSK cluster.

Clean up

At this point, you can tear down the MSK Replicator deployed in the primary Region.

Conclusion

This post explored how to enhance Kafka resilience by setting up a secondary MSK cluster in another Region and synchronizing it with the primary cluster using MSK Replicator. We demonstrated how to implement an active-passive disaster recovery strategy while maintaining consistent topic names across both clusters. We provided a step-by-step guide for configuring replication with identical topic names and detailed the processes for failover and failback. Additionally, we highlighted key metrics to monitor and outlined actions to provide efficient and continuous data replication.

For more information, refer to What is Amazon MSK Replicator? For a hands-on experience, try out the Amazon MSK Replicator Workshop. We encourage you to try out this feature and share your feedback with us.


About the Author

Subham Rakshit is a Senior Streaming Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build streaming architectures so they can get value from analyzing their streaming data. His two little daughters keep him occupied most of the time outside work, and he loves solving jigsaw puzzles with them. Connect with him on LinkedIn.