Tag Archives: Amazon Kinesis

Amazon FSx for NetApp ONTAP now integrates with Amazon S3 for seamless data access

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/amazon-fsx-for-netapp-ontap-now-integrates-with-amazon-s3-for-seamless-data-access/

Today, we’re announcing the ability to access your data in Amazon FSx for NetApp ONTAP file systems using Amazon Simple Storage Service (Amazon S3). With this capability, you can use your enterprise file data to augment generative AI applications with Amazon Bedrock Knowledge Bases for Retrieval Augmented Generation (RAG), train machine learning (ML) models with Amazon SageMaker, generate insights with Amazon S3 integrated third-party services, use comprehensive research capabilities in AI-powered business intelligence (BI) tools such as Amazon Quick Suite, and run analyses using Amazon S3 based cloud-native applications, all while your file data continues to reside in your FSx for NetApp ONTAP file system.

Amazon FSx for NetApp ONTAP is the first and fully AWS managed NetApp ONTAP file system in the cloud to migrate on-premises applications that rely on NetApp ONTAP or other network-attached storage (NAS) appliances to AWS without having to change how you manage your data. FSx for NetApp ONTAP provides the popular capabilities, high performance, and data management APIs of ONTAP file systems with the added benefits of the AWS Cloud, such as simplified management, on-demand scaling, and seamless integration with other AWS services.

Over the years, AWS has developed a broad range of industry-leading AI, ML, and analytics services and applications that work with data in Amazon S3 that organizations use to innovate faster, discover new insights, and make even better data-driven decisions. However, some organizations want to use these services with their enterprise file data stored in NetApp ONTAP or other NAS appliances.

How to get started
You can create and attach an S3 Access Point to your FSx for ONTAP file system using the Amazon FSx console, the AWS Command Line Interface (AWS CLI), or the AWS SDK.

I have an existing FSx for ONTAP file system demo-create-s3access which I created by following the steps in the Creating file systems in the FSx for ONTAP documentation. Using the Amazon FSx console I now choose the file system ID fs-0c45b011a7f071d70 to access the full details of the file system.

I’ll attach the access point to the volume of the file system. I choose the volume vol1 and then select Create S3 Access Point from the Actions dropdown menu.


I enter details such as the access point name, the type of file system user identity and the network configuration, then choose Create s3 Access Point to finalize the process.


After it’s created, the access point my-s3-accesspoint is ready to allow access to the file data stored in my file system demo-create-s3access from Amazon S3. Amazon Access Points are S3 endpoints that can be attached to Amazon FSx volumes and used to perform Amazon S3 object operations.


I can now bring proprietary data stored in the file system demo-create-s3access to Amazon S3 for use in applications that work with Amazon S3 while my file data continues to reside in the FSx for NetApp ONTAP file system using the access point my-s3-accesspoint (this data remains accessible through the file protocols).

For the walkthrough in this post, I’ll integrate with Quick Suite.

Integrating decades of enterprise file data with the latest AI powered BI tools on AWS
In the Quick Suite Console, in the left navigation pane, I choose Connections, then select Integrations. Before you begin, make sure that you have the correct permissions to the Amazon S3 AWS resource. You can control the AWS resources that Quick Suite can access by following the Amazon Quick Suite user guide.


After I’ve selected the Amazon S3 integration I enter my Amazon S3 Access Point alias as the S3 bucket URL, leave the rest of the information as default, then choose Create and continue.


I finalize the process by providing the Name of the knowledge base, the Description, then choose Create.


After the knowledge base has been created it’s automatically synchronized, it’s now available for interaction.


I want to learn more about the AWS European Sovereign Cloud so I’ve updated the file system (accessed through the S3 Access Point my-s3-accesspoin-iyytkgz83djdjj7abn3u711supfgkuse1b-ext-s3alias) with the AWS whitepaper on this topic. In the chat in Amazon Quick Suite. I start asking the first question “do we have any documentation on the europe sovereignty cloud?“. To answer my question, the chat agent accesses and analyzes various types of data sources I have permission to use, including uploaded files in my current conversation, spaces I have access to, knowledge bases from my integrations, and more.

When I verify the source, I see that the document I uploaded to my file system is listed as one of the sources.

Other use cases of Amazon S3 Access Points for Amazon FSx for NetApp ONTAP
Earlier, we looked at use cases such as connecting an organization’s proprietary file data to Amazon Quick Suite for advanced business intelligence. Additionally, Amazon S3 Access Points for Amazon FSx for NetApp ONTAP can be used to seamlessly integrate enterprise file data with comprehensive analytics services, such as Amazon Athena for serverless SQL queries or AWS Glue for ETL processing, to name a few.

Amazon S3 Access Points for Amazon FSx for NetApp ONTAP are also suitable for data access from serverless compute workloads that are cloud-native with containerized microservices that require flexible access to shared enterprise datasets, such as configuration files, reference data, content libraries, model artifacts, and application assets.

Now available
You can get started today using the Amazon FSx console, AWS CLI, or AWS SDK to attach Amazon S3 Access Points to your Amazon FSx for NetApp ONTAP file systems. The feature is available in the following AWS Regions: Africa (Cape Town), Asia Pacific (Hong Kong, Hyderabad, Jakarta, Melbourne, Mumbai, Osaka, Seoul, Singapore, Sydney, Tokyo), Canada (Central, Calgary), Europe (Frankfurt, Ireland, London, Milan, Paris, Spain, Stockholm, Zurich), Israel (Tel Aviv), Middle East (Bahrain, UAE), South America (Sao Paulo), US East (N. Virginia, Ohio), and US West (N. California Oregon). You’re billed by Amazon S3 for the requests and data transfer costs through your S3 Access Point, in addition to your standard Amazon FSx charges. Learn more on the Amazon FSx for NetApp ONTAP pricing page.

PS: Writing a blog post at AWS is always a team effort, even when you see only one name under the post title. In this case, I want to thank Luke Miller, for his expertise and generous help with technical guidance, which made this overview possible and comprehensive.

Veliswa Boya.

Medidata’s journey to a modern lakehouse architecture on AWS

Post Syndicated from Mike Araujo original https://aws.amazon.com/blogs/big-data/medidatas-journey-to-a-modern-lakehouse-architecture-on-aws/

This post was co-authored by Mike Araujo Principal Engineer at Medidata Solutions.

The life sciences industry is transitioning from fragmented, standalone tools towards integrated, platform-based solutions. Medidata, a Dassault Systèmes company, is building a next-generation data platform that addresses the complex challenges of modern clinical research. In this post, we show you how Medidata created a unified, scalable, real-time data platform that serves thousands of clinical trials worldwide with AWS services, Apache Iceberg, and a modern lakehouse architecture.

Challenges with legacy architecture

As the Medidata clinical data repository expanded, the team recognized the shortcomings of the legacy data solution to provide quality data products to their customers across their growing portfolio of data offerings. Several data tenants began to erode. The following diagram shows Medidata’s legacy extract, transform, and load (ETL) architecture.

Built upon a series of scheduled batch jobs, the legacy system proved ill-equipped to provide a unified view of the data across the entire ecosystem. Batch jobs ran at different intervals, often requiring a sufficient degree of scheduling buffer to make sure upstream jobs completed within the expected window. As the data volume expanded, the jobs and their schedules continued to inflate, introducing a latency window between ingestion and processing for dependent consumers. Different consumers operating from various underlying data services further magnified the problem as pipelines had to be continuously built across a variety of data delivery stacks.

The expanding portfolio of pipelines began to overwhelm existing maintenance operations. With more operations, the opportunity for failure expanded and recovery efforts further complicated. Existing observability systems were inundated with operational data, and identifying the root cause of data quality issues became a multi-day endeavor. Increases in the data volume required scaling considerations across the entire data estate.

Additionally, the proliferation of data pipelines and copies of the data in different technologies and storage systems necessitated expanding access controls with enhanced security features to make sure only the correct users had access to the subset of data to which they were permitted. Making sure access control changes were correctly propagated across all systems added a further layer of complexity to consumers and producers.

Solution overview

With the advent of Clinical Data Studio (Medidata’s unified data management and analytics solution for clinical trials) and Data Connect (Medidata’s data solution for acquiring, transforming, and exchanging electronic health record (EHR) data across healthcare organizations), Medidata introduced a new world of data discovery, analysis, and integration to the life sciences industry powered by open source technologies and hosted on AWS. The following diagram illustrates the solution architecture.

Fragmented batch ETL jobs were replaced by real-time Apache Flink streaming pipelines, an open source, distributed engine for stateful processing, and powered by Amazon Elastic Kubernetes Service (Amazon EKS), a fully managed Kubernetes service. The Flink jobs write to Apache Kafka running in Amazon Managed Apache Kafka (Amazon MSK), a streaming data service that manages Kafka infrastructure and operations, before landing in Iceberg tables backed by the AWS Glue Data Catalog, a centralized metadata repository for data assets. From this collection of Iceberg tables, a central, single source of data is now accessible from a variety of consumers without additional downstream processing, alleviating the need for custom pipelines to satisfy the requirements of downstream consumers. Through these fundamental architectural changes, the team at Medidata solved the issues presented by the legacy solution.

Data availability and consistency

With the introduction of the Flink jobs and Iceberg tables, the team was able to deliver a consistent view of their data across the Medidata data experience. Pipeline latency was reduced from days to minutes, helping Medidata customers realize a 99% performance gain from the data ingestion to the data analytics layers. Due to Iceberg’s interoperability, Medidata users saw the same view of the data regardless of where they viewed that data, minimizing the need for consumer-driven custom pipelines because Iceberg could plug into existing consumers.

Maintenance and durability

Iceberg’s interoperability provided a single copy of the data to satisfy their use cases, so the Medidata team could focus its observation and maintenance efforts on a five-times smaller subset of operations than previously required. Observability was enhanced by tapping into the various metadata components and metrics exposed by Iceberg and the Data Catalog. Quality management transformed from cross-system traces and queries to a single analysis of unified pipelines, with an added benefit of point in time data queries thanks to the Iceberg snapshot feature. Data volume increases are handled with out-of-box scaling supported by the entire infrastructure stack and AWS Glue Iceberg optimization features that include compaction, snapshot retention, and orphan file deletion, which provide a set-and-forget experience for solving a number of common Iceberg frustrations, such as the small file problem, orphan file retention, and query performance.

Security

With Iceberg at the center of its solution architecture, the Medidata team no longer had to spend the time building custom access control layers with enhanced security features at each data integration point. Iceberg on AWS centralizes the authorization layer using familiar systems such as AWS Identity and Access Management (IAM), providing a single and durable control for data access. The data also stays entirely within the Medidata virtual private cloud (VPC), further reducing the opportunity for unintended disclosures.

Conclusion

In this post, we demonstrated how legacy universe of consumer-driven custom ETL pipelines can be replaced with a scalable, high-performant streaming lakehouses. By putting Iceberg on AWS at the center of data operations, you can have a single source of data for your consumers.

To learn more about Iceberg on AWS, refer to Optimizing Iceberg tables and Using Apache Iceberg on AWS.


About the authors

Mike Araujo

Mike is a Principal Engineer at Medidata Solutions, working on building a next generation data and AI platform for clinical data and trials. By using the power of open source technologies such as Apache Kafka, Apache Flink, and Apache Iceberg, Mike and his team have enabled the delivery of billions of clinical events and data transformations in near real time to downstream consumers, applications, and AI agents. His core skills focus on architecting and building big data and ETL solutions at scale as well as their integration in agentic workflows.

Sandeep Adwankar

Sandeep is a Senior Product Manager at AWS, who has driven feature launches across Amazon SageMaker, AWS Glue, and AWS Lake Formation. He has led initiatives in Amazon S3 Tables analytics, Iceberg compaction strategies, and AWS Glue Iceberg optimizations. His recent work focuses on generative AI and autonomous systems, including the AWS Glue Data Catalog model context protocol and Amazon Bedrock structured knowledge bases. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that accelerate their business outcomes.

Ian Beatty

Ian is a Technical Account Manager at AWS, where he specializes in supporting independent software vendor (ISV) customers in the healthcare and life sciences (HCLS) and financial services industry (FSI) sectors. Based in the Rochester, NY area, Ian helps ISV customers navigate their cloud journey by maintaining resilient and optimized workloads on AWS. With over a decade of experience building on AWS since 2014, he brings deep technical expertise from his previous roles as an AWS Architect and DevSecOps team lead for SaaS ISVs before joining AWS more than 3 years ago.

Ashley Chen

Ashley is a Solutions Architect at AWS based in Washington D.C. She supports independent software vendor (ISV) customers in the healthcare and life sciences industries, focusing on customer enablement, generative AI applications, and container workloads.

Introducing AWS Lambda event source mapping tools in the AWS Serverless MCP Server

Post Syndicated from Ben Freiberg original https://aws.amazon.com/blogs/compute/introducing-aws-lambda-event-source-mapping-tools-in-the-aws-serverless-mcp-server/

Modern serverless applications increasingly rely on event-driven architectures, where AWS Lambda functions process events from various sources like Amazon Kinesis, Amazon DynamoDB Streams, Amazon Simple Queue Service (Amazon SQS), Amazon Managed Streaming for Apache Kafka (Amazon MSK), and self-managed Apache Kafka.

Although event source mappings (ESM) offer a powerful mechanism for integrating AWS Lambda with stream and queue-based sources, configuring them to align with high-level architectural goals can sometimes involve navigating a broad set of options and parameters. Achieving an optimal configuration typically requires mapping developer intent to several technical settings, which can introduce inefficiencies or operational overhead.

In May 2025, AWS launched the AWS Serverless MCP Server, which provided AI-powered assistance for serverless application development, including infrastructure provisioning, deployment automation, and architectural guidance. Building on this foundation, AWS is now expanding the Serverless MCP Server to include specialized ESM tools.

These new dedicated tools in the AWS Serverless Model Context Protocol (MCP) Server combine the power of AI assistance with ESM expertise to enhance how developers build and manage event-driven serverless applications using Lambda. The new ESM tools provide contextual guidance specific to ESM configuration that address the challenges of event-driven development.

This post describes how the new tools under Serverless MCP Server work with AI coding assistants to streamline event source mapping management. Learn how to use this solution to accelerate your event-driven development workflow and build robust, high-performing applications more efficiently.

Overview

An event source mapping is a Lambda resource that reads items from stream and queue-based services and invokes a function with batches of records. Within an event source mapping, resources called event pollers actively poll for new messages and invoke functions. Using ESMs, AWS Lambda functions can automatically consume events from various sources without requiring custom polling infrastructure. Lambda handles the complexity of scaling, batching, filtering, and error handling, helping developers focus on business logic.

Navigating ESM configurations

Configuring these mappings optimally, especially for virtual private cloud (VPC)-based sources like Apache Kafka, requires additional understanding of networking, permissions, and performance tuning.

When working with event source mappings, developers need to address several technical considerations. For Kafka Streams using VPC-based Amazon Managed Streaming for Apache Kafka or self-managed Apache Kafka, configurations involve networking setup to enable Lambda access to Kafka topics. Developers must manage bootstrap servers, AWS Identity and Access Management (IAM) permissions, and topic access settings, while also handling authentication including SASL/SCRAM credentials, mTLS certificate management, and Kafka ACL permissions.

Developers need to know how to translate performance requirements, such as processing 1,000 events per second, into specific ESM parameter configurations. Depending on the stream source, this involves determining appropriate batch sizes, parallelization factors, and retry policies while managing iterator age, offset lag and potential timeout issues. Additionally, developers need visibility into configuration effectiveness and other diagnostic information to optimize resource allocation and ensure reliable event processing.

Dedicated event source mapping tools

The new ESM tools in the open source AWS Serverless MCP Server address these challenges by providing AI assistants with proven knowledge of event source mapping patterns and best practices. These tools guide developers through the entire ESM lifecycle, from initial setup to optimization and troubleshooting. They also enhance the event-driven development experience by translating the developers intent into detailed, technical configuration, helping developers express high-level goals such as desired throughput, latency, or reliability requirements. The new tools cover all areas of event source mapping management:

  • Setup and configuration: Developers initialize new event source mapping configurations using AWS Serverless Application Model (AWS SAM) templates, select appropriate event source settings, and configure networking requirements for VPC-based sources like Amazon MSK.
  • Optimization and tuning: As applications evolve, the tools assists with fine-tuning ESM parameters like batch size, batching window, retry policies, and parallelization factors based on performance goals and telemetry data.
  • Troubleshooting and diagnostics: Specialized tools diagnose ESM connectivity issues, analyze Amazon CloudWatch Logs and metrics, and recommend solutions for common problems like VPC misconfigurations or permission errors.

Event source mapping tools in action

This example walks you through a scenario of creating, optimizing, and troubleshooting an event source mapping for Amazon MSK to demonstrate the capabilities of the new ESM tools.

Prerequisites and installation

To get started, download or update the AWS Serverless MCP Server from GitHub or Python Package Index (PyPi) and follow the installation instructions. You can use this MCP server with any AI coding assistant of your choice, such as Amazon Q Developer, Cursor, Cline, Kiro, and more.

Add the following code to your MCP client configuration:

{
  "mcpServers": {
    "awslabs.aws-serverless-mcp-server": {
      "command": "uvx",
      "args": [
        "awslabs.aws-serverless-mcp-server@latest"
      ],
      "env": { 
        "AWS_PROFILE": "your-aws-profile",
        "AWS_REGION": "us-east-1",
        "FASTMCP_LOG_LEVEL": "ERROR"
      }
    }
  }
}

The Serverless MCP Server incorporates built-in guardrails to ensure secure and controlled development. By default, the server operates in a read-only mode, allowing only non-mutating actions. With this safety-first approach, you can explore ESM capabilities and architectural patterns while preventing unintended changes to your applications or infrastructure.

Creating and configuring an event source mapping

Imagine you want to set up a Lambda function to process events from an Amazon MSK cluster. Start by prompting your AI assistant:

Create a new Kafka cluster and a VPC named <your-vpc-name> in <your-aws-region>. The cluster should be in the VPC’s private subnets. Then, create a Lambda function to consume from the stream within the same VPC cluster. Prefix all created resources with <your-prefix>.

AI prompt to create a new Kafka cluster and ESM

The agent uses the esm_guidance to receive tailored guidance based on your use case and performance requirements. The tool analyzes your intent and provides step-by-step instructions for setting up the ESM with optimal configurations.

Apart from creating deployment and initialization scripts and supporting documentation, properly configured IAM polices and security groups rules to access the cluster are also generated. The assistant then validates the ESM parameters against AWS limits and best practices.

Next, you want to understand the networking requirements:

My Kafka cluster is in a VPC. What networking configuration do I need for Lambda to access it?

AI assistant prompt for setting up Kafka ESM networking connectivity

The Serverless MCP Server provides specialized guidance for VPC-based Kafka configurations using the esm_guidance tool with guidance_type=”networking”. This guidance provides detailed information about subnet requirements, security group rules, and NAT gateway setup, and it validates your network topology for reliable connectivity.

Optimizing event source mapping performance

After your ESM is running, you notice that processing latency is higher than expected. You can ask for optimization guidance:

I have an ESM with UUID <your-esm-uuid> in <your-aws-region>. My target throughput is between 10 MB/s and 100 MB/s. Please update my ESM configuration to meet these throughput requirements while optimizing the cost of the event pollers.

AI prompt to optimize Kinesis ESM throughput
The server uses the esm_optimize tool to analyze your current configuration and provide optimization recommendations. The tool supports three main actions:

  • Analysis mode: (action="analyze") Analyzes configuration tradeoffs for your optimization targets (throughput, latency, cost, failure rate)
  • Validation mode: (action="validate") Validates your ESM configuration against AWS limits and event source restrictions
  • Template generation: (action="generate_template") Creates updated AWS SAM templates with optimized configurations

You can use this tool to get guidance on your event source mapping configurations for Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. Here are two examples:

I have a Kinesis stream with 100 shards receiving 100 MB/s of data. My Lambda function processes each record in about 50ms. Currently, my ESM has ParallelizationFactor=1 and BatchSize=100, but I’m seeing high iterator age (over 60 seconds) during peak times. How should I optimize my ESM configuration to reduce processing latency and handle the throughput?

AI prompt to optimize Kinesis ESM throughput

I have an SQS standard queue that receives 50,000 messages per hour during peak times. Each message takes about 2 seconds to process. My current ESM configuration has BatchSize=10 and no ScalingConfig set. I’m seeing message delays during peak hours. How should I optimize my ESM configuration for better throughput while keeping costs reasonable?

The tool generates updated AWS Serverless Application Model (AWS SAM) templates with the recommended configurations, making it easy to apply the changes through your deployment pipeline. However, it always requires explicit user confirmation before any deployment.

Troubleshooting event source mapping issues

When an issue arises, the ESM tools provide diagnostic capabilities. For example, if your ESM stops processing events:

I have a cluster called <your-kafka-cluster-name> and a consumer Lambda function named <your-lambda-function-name>in <your-aws-region>. Please investigate why my ESM (UUID: <your-esm-uuid>) trigger is not working and provide updated configurations to resolve the issue.

AI assistant prompt for investigation an issue with Kafka ESM

The server uses the esm_kafka_troubleshoot tool to provide comprehensive troubleshooting for Apache Kafka clusters. The tool supports two main modes:

  • Diagnostic mode: (issue_type="diagnosis") Analyzes your ESM status and provides diagnostic indicators. This helps identify whether timeouts occur before or after reaching Kafka brokers. It categorizes issues into specific types for targeted resolution.
  • Resolution mode: Provides step-by-step resolution guidance for specific issues.

AI prompt to start debugging an issue with a Kafka ESM

The tool automatically detects your event source type and provides tailored guidance. It validates VPC connectivity, examines IAM permissions, checks security group configurations, and analyzes CloudWatch Logs to provide a detailed diagnosis report with specific remediation steps.

Key benefits

The event source mapping tools in the AWS Serverless MCP Server provide unique advantages over traditional event source mapping configuration approaches:

  • AI-powered configuration translation: The tools translate high-level developer intent (such as process 1,000 events per second) into specific ESM parameters like batch size, parallelization factor, and batching window.
  • Complete infrastructure-as-code generation: Unlike generic AWS CLI tools that provide individual commands, ESM tools generate complete AWS SAM templates, initialization scripts, cleanup scripts, and validation scripts for end-to-end automation.
  • Proactive network validation: For VPC-based event sources like Amazon MSK or self-managed Kafka, the tools validate network topology, security group rules, and connectivity before deployment, preventing common silent failures.
  • Context-aware troubleshooting: The diagnostic tools correlate ESM status, CloudWatch metrics, VPC configuration, and IAM permissions to provide comprehensive root cause analysis with specific remediation steps.

New tools available in the Serverless MCP Server

The event source mapping tools are designed to minimize trust permission prompts by using a small set of primary tools that internally call specialized functions. The tools can be classified into three main categories:

  • esm_guidance: This tool provides comprehensive guidance on creating and configuring event source mappings for all event sources (DynamoDB, Kinesis, Kafka, SQS). It handles setup, networking guidance, and troubleshooting based on the guidance_type parameter. The tool automatically generates AWS SAM templates, IAM policies, and security group configurations.
  • esm_optimize: This advanced optimization tool analyzes configuration tradeoffs, validates ESM settings, and generates AWS SAM templates for performance tuning. It supports three actions:
    • analyze: Provides configuration tradeoff analysis for failure rate, latency, throughput, and cost optimization
    • validate: Validates ESM configurations against AWS limits and event source restrictions
    • generate_template: Creates AWS SAM templates with optimized configurations
  • esm_kafka_troubleshoot: This specialized troubleshooting tool for Kafka ESM issues supports both Amazon MSK and self-managed Apache Kafka clusters. It also provides diagnostic capabilities and step-by-step resolution guidance for connectivity, authentication, and network issues.

The primary tools internally call specialized helper functions to provide comprehensive functionality that help generate IAM polices, security groups, scaling and concurrency configurations, and validate configurations.

Visit the Serverless MCP Server documentation for the full list of tools and resources.

Best practices and considerations

When building event-driven applications with the AWS Serverless MCP Server, start by using its guidance tools for architectural decisions. The server helps you choose appropriate event sources, understand networking requirements, and configure optimal settings based on your performance goals.For Kafka-based ESMs, pay special attention to VPC configuration. Use the server’s network troubleshooting tools to validate connectivity before deployment. The server can detect common issues like missing NAT gateways, incorrect security group rules, or subnet routing problems.Monitor your event source mappings continuously using the server’s diagnostic tools. Set up alerts for key metrics like iterator age, error rates, and throttling. The server can help you interpret these metrics and recommend configuration adjustments to maintain optimal performance.

Conclusion

The new event source mapping tools in the open-source AWS Serverless MCP Server simplify event source mapping management throughout the development lifecycle, from initial setup to ongoing optimization and troubleshooting. By combining AI assistance with ESM expertise, it helps developers build and deploy event-driven applications more efficiently while avoiding common configuration pitfalls.

As organizations continue to adopt event-driven serverless computing, tools that simplify ESM management and accelerate delivery become increasingly valuable.

To get started, visit the GitHub repository and explore the documentation. Share your experiences and suggestions through the GitHub repository to improve the MCP server’s capabilities and help shape the future of AI-assisted event-driven development.

For more serverless learning resources, visit Serverless Land.

Amazon Kinesis Data Streams now supports 10x larger record sizes: Simplifying real-time data processing

Post Syndicated from Sumant Nemmani original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-streams-now-supports-10x-larger-record-sizes-simplifying-real-time-data-processing/

Today, AWS announced that Amazon Kinesis Data Streams now supports record sizes up to 10MiB – a tenfold increase from the previous limit. With this launch, you can now publish intermittent larger data payloads on your data streams while continuing to use existing Kinesis Data Streams APIs in your applications without additional effort. This launch is accompanied by a 2x increase in the maximum PutRecords request size from 5MiB to 10MiB, simplifying data pipelines and reducing operational overhead for IoT analytics, change data capture, and generative AI workloads.

In this post, we explore Amazon Kinesis Data Streams large record support, including key use cases, configuration of maximum record sizes, throttling considerations, and best practices for optimal performance.

Real world use cases

As data volumes grow and use cases evolve, we’ve seen increasing demand for supporting larger record sizes in streaming workloads. Previously, when you needed to process records larger than 1MiB, you had two options:

  • Split large records into multiple smaller records in producer applications and reassemble them in consumer applications
  • Store large records in Amazon Simple Storage Service (Amazon S3) and send only metadata through Kinesis Data Streams

Both these approaches are useful, but they add complexity to data pipelines, requiring additional code, increasing operational overhead, and complicating error handling and debugging, particularly when customers need to stream large records intermittently.

This enhancement improves the ease of use and reduces operational overhead for customers handling intermittent data payloads across various industries and use cases. In the IoT analytics domain, connected vehicles and industrial equipment are generating increasing volumes of sensor telemetry data, with the size of individual telemetry records occasionally exceeding the previous 1MiB limit in Kinesis. This required customers to implement complex workarounds, such as splitting large records into multiple smaller ones or storing the large records separately and only sending metadata through Kinesis. Similarly, in database change data capture (CDC) pipelines, large transaction records can be produced, especially during bulk operations or schema changes. In the machine learning and generative AI space, workflows are increasingly requiring the ingestion of larger payloads to support richer feature sets and multi-modal data types like audio and images. The increased Kinesis record size limit from 1MiB to 10MiB limits the need for these types of complex workarounds, simplifying data pipelines and reducing operational overhead for customers in IoT, CDC, and advanced analytics use cases. Customers can now more easily ingest and process these intermittent large data records using the same familiar Kinesis APIs.

How it works

To start processing larger records:

  1. Update your stream’s maximum record size limit (maxRecordSize) through the AWS Console, AWS CLI, or AWS SDKs.
  2. Continue using the same PutRecord and PutRecords APIs for producers.
  3. Continue using the same GetRecords or SubscribeToShard APIs for consumers.

Your stream will be in Updating status for a few seconds before being ready to ingest larger records.

Getting started

To start processing larger records with Kinesis Data Streams, you can update the maximum record size by using the AWS Management Console, CLI or SDK.

On the AWS Management Console,

  1. Navigate to the Kinesis Data Streams console.
  2. Choose your stream and select the Configuration tab.
  3. Choose Edit (next to Maximum record size).
  4. Set your desired maximum record size (up to 10MiB).
  5. Save your changes.

Note: This setting only adjusts the maximum record size for this Kinesis data stream. Before increasing this limit, verify that all downstream applications can handle larger records.

Most common consumers such as Kinesis Client Library (starting with version 2.x), Amazon Data Firehose delivery to Amazon S3 and AWS Lambda support processing records larger than 1 MiB. To learn more, refer to the Amazon Kinesis Data Streams documentation for large records.

You can also update this setting using the AWS CLI:

aws kinesis update-max-record-size \
--stream-arn <stream-arn> \
--max-record-size-in-ki-b 5000

Or using the AWS SDK:

import boto3

client = boto3.client('kinesis')
response = client.update_max_record_size(
StreamARN='arn:aws:kinesis:us-west-2:123456789012:stream/my-stream',
MaxRecordSizeInKiB=5000
)

Throttling and best practices for optimal performance

Individual shard throughput limits of 1MiB/s for writes and 2MiB/s for reads remain unchanged with support for larger record sizes. To work with large records, let’s understand how throttling works. In a stream, each shard has a throughput capacity of 1 MiB per second. To accommodate large records, each shard temporarily bursts up to 10MiB/s, eventually averaging out to 1MiB per second. To help visualize this behavior, think of each shard having a capacity tank that refills at 1MiB per second. After sending a large record (for example, a 10MiB record), the tank begins refilling immediately, allowing you to send smaller records as capacity becomes available. This capacity to support large records is continuously refilled into the stream. The rate of refilling depends on the size of the large records, the size of the baseline record, the overall traffic pattern, and your chosen partition key strategy. When you process large records, each shard continues to process baseline traffic while leveraging its burst capacity to handle these larger payloads.

To illustrate how Kinesis Data Streams handles different proportions of large records, let’s examine the results a simple test. For our test configuration, we set up a producer that sends data to an on-demand stream (defaults to 4 shards) at a rate of 50 records per second. The baseline records are 10KiB in size, while large records are 2MiB each. We conducted multiple test cases by progressively increasing the proportion of large records from 1% to 5% of the total stream traffic, along with a baseline case containing no large records. To ensure consistent testing conditions, we distributed the large records uniformly over time for example, in the 1% scenario, we sent one large record for every 100 baseline records. The following graph shows the results:

In the graph, horizontal annotations indicate throttling occurrence peaks. The baseline scenario, represented by the blue line, shows minimal throttling events. As the proportion of large records increases from 1% to 5%, we observe an increase in the rate at which your stream throttles your data, with a notable acceleration in throttling events between the 2% and 5% scenarios. This test demonstrates how Kinesis Data Streams manages increasing proportion of large records.

We recommend maintaining large records at 1-2% of your total record count for optimal performance. In production environments, actual stream behavior varies based on three key factors: the size of baseline records, the size of large records, and the frequency at which large records appear in the stream. We recommend that you test with your demand pattern to determine the specific behavior.

With on-demand streams, when the incoming traffic exceeds 500 KB/s per shard, it splits the shard within 15 minutes. The parent shard’s hash key values are redistributed evenly across child shards. Kinesis automatically scales the stream to increase the number of shards, enabling distribution of large records across a larger number of shards depending on the partition key strategy employed.

For optimal performance with large records:

  1. Use a random partition key strategy to distribute large records evenly across shards.
  2. Implement backoff and retry logic in producer applications.
  3. Monitor shard-level metrics to identify potential bottlenecks.

If you still need to continuously stream of large records, consider using Amazon S3 to store payloads and send only metadata references to the stream. Refer to Processing large records with Amazon Kinesis Data Streams for more information.

Conclusion

Amazon Kinesis Data Streams now supports record sizes up to 10MiB, a tenfold increase from the previous 1MiB limit. This enhancement simplifies data pipelines for IoT analytics, change data capture, and AI/ML workloads by eliminating the need for complex workarounds. You can continue using existing Kinesis Data Streams APIs without additional code changes and benefit from increased flexibility in handling intermittent large payloads.

  • For optimal performance, we recommend maintaining large records at 1-2% of total record count.
  • For best results with large records, implement a uniformly distributed partition key strategy to evenly distribute records across shards, include backoff and retry logic in producer applications, and monitor shard-level metrics to identify potential bottlenecks.
  • Before increasing the maximum record size, verify that all downstream applications and consumers can handle larger records.

We’re excited to see how you’ll leverage this capability to build more powerful and efficient streaming applications. To learn more, visit the Amazon Kinesis Data Streams documentation.


About the authors

Sumant Nemmani

Sumant Nemmani

Sumant is a product manager for Amazon Kinesis Data Streams. He is passionate about learning from customers and enjoys helping them unlock value with AWS. Outside of work, he spends time making music with his band Project Mishram, exploring history and food while traveling, and long-form podcasts on technology and history.

Umesh Chaudhari

Umesh Chaudhari

Umesh is a Sr. Streaming Solutions Architect at AWS. He works with customers to design and build real-time data processing systems. He has extensive working experience in software engineering, including architecting, designing, and developing data analytics systems. Outside of work, he enjoys traveling, following tech trends

Pratik Patel

Pratik Patel

Pratik is Sr. Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively keep customers’ AWS environments operationally healthy.

Building a real-time ICU patient analytics pipeline with AWS Lambda event source mapping

Post Syndicated from Priyanka Chaudhary original https://aws.amazon.com/blogs/big-data/building-a-real-time-icu-patient-analytics-pipeline-with-aws-lambda-event-source-mapping/

In hospital intensive care units (ICUs), continuous patient monitoring is critical. Medical devices generate vast amounts of real-time data on vital signs such as heart rate, blood pressure, and oxygen saturation. The key challenge lies in early detection of patient deterioration through vital sign trending. Healthcare teams must process thousands of data points daily per patient to identify concerning patterns, a task crucial for timely intervention and potentially life-saving care.

AWS Lambda event source mapping can help in this scenario by automatically polling data streams and triggering functions in real-time without additional infrastructure management. By using AWS Lambda for real-time processing of sensor data and storing aggregated results in secure data structures designed for large analytic datasets called Iceberg tables in Amazon Simple Storage Service (Amazon S3) buckets, medical teams can achieve both immediate alerting capabilities and gain long-term analytical insights, enhancing their ability to provide timely and effective care.

In this post, we demonstrate how to build a serverless architecture that processes real-time ICU patient monitoring data using Lambda event source mapping for immediate alert generation and data aggregation, followed by persistent storage in Amazon S3 with an Iceberg catalog for comprehensive healthcare analytics. The solution demonstrates how to handle high-frequency vital sign data, implement critical threshold monitoring, and create a scalable analytics platform that can grow with your healthcare organization’s needs and help monitor sensor alert fatigue in the ICU.

Architecture

The following architecture diagram illustrates a real-time ICU patient analytics system.

Arch diagram

In this architecture, real-time patient monitoring data from hospital ICU sensors is ingested into AWS IoT Core, which then streams the data into Amazon Kinesis Data Streams. Two Lambda functions consume this streaming data concurrently for different purposes, both using Lambda event source mapping integration with Kinesis Data Streams. The first Lambda function uses the filtering feature of event source mapping to detect critical health events where SpO2(blood oxygen saturation) levels fall below 90%, immediately triggering notifications to caregivers through Amazon Simple Notification Service (Amazon SNS) for rapid response. The second Lambda function employs the tumbling window feature of event source mapping to aggregate sensor data over 10-minute time intervals. This aggregated data is then systematically stored in S3 buckets in Apache Iceberg format for historical analysis and reporting. The entire pipeline operates in a serverless manner, providing scalable, real-time processing of critical healthcare data while maintaining both immediate alerting capabilities and long-term data storage for analytics.

Amazon S3 data, with its support for Apache Iceberg table format, enables healthcare organizations to efficiently store and query large volumes of time-series patient data. This solution allows for complex analytical queries across historical patient data while maintaining high performance and cost efficiency.

Prerequisites

To implement the solution provided in this post, you should have the following:

  • An active AWS account
  • IAM permissions to deploy CloudFormation templates and provision AWS resources
  • Python installed on your machine to run the ICU patient sensor data simulator code

Deploy a real-time ICU patient analytics pipeline using CloudFormation

You use AWS CloudFormation templates to create the resources for a real-time data analytics pipeline.

  1. To get started, Sign in to the console as Account user and select the appropriate Region.
  2. Download and launch CloudFormation template  where you want to host the Lambda functions.
  3. Choose Next.
  4. On the Specify stack details page, enter a Stack name (for example, IoTHealthMonitoring).
  5. For Parameters, enter the following:
    1. IoTTopic: Enter the MQTT topic for your IoT devices (for example, icu/sensors).
    2. EmailAddress: Enter an email address for receiving notifications.
  6. Wait for the stack creation to complete. This process might take 5-10 minutes.
  7. After the CloudFormation stack completes, it creates following resources:
    1. An AWS IoT Core rule to capture data from the specified IoTTopic topic and routes it to Kinesis data stream.
    2. A Kinesis data stream for ingesting IoT sensor data.
    3. Two Lambda functions:
      • FilterSensorData: Monitors critical health metrics and sends alerts.
      • AggregateSensorData: Aggregates sensor data in 10 minutes window.
    4. An Amazon DynamoDB table (NotificationTimestamps) to store notification timestamps for rate limiting alerts.
    5. An Amazon SNS topic and subscription to send email notifications for critical patient conditions.
    6. An Amazon Data Firehose delivery stream to deliver processed data to Amazon S3 using Iceberg format.
    7. Amazon S3 buckets to store sensor data.
    8. Amazon Athena and AWS Glue resources for the database and an Iceberg table for querying aggregated data.
    9. AWS Identity and Access Management (IAM) roles and policies to support required permissions for Amazon IoT rules, Lambda functions, and Data Firehose streams.
    10. Amazon CloudWatch log groups to record for Kinesis Firehose activity and Lambda functions.

Solution walkthrough

Now that you’ve deployed the solution, let’s review a functional walkthrough. First, simulate patient vital signs data and send it to AWS IoT Core using the following Python code on your local machine. To run this code successfully, ensure you have the necessary IAM permissions to publish messages to the IoT topic in the AWS account where the solution is deployed.

import boto3
import json
import random
import time
# AWS IoT Data client
iot_data_client = boto3.client(
    'iot-data',
    region_name='us-west-2'
)
# IOT Topic to publish
topic = 'icu/sensors'
# Fixed set of patient IDs
patient_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print("Infinite sensor data simulation...")
try:
    while True:
        for patient_id in patient_ids:
            # Generate sensor data
            message = {
                "patient_id": patient_id,
                "timestamp": int(time.time()),
                "spo2": random.randint(91, 99),
                "heart_rate": random.randint(60, 100),
                "temperature_f": round(random.uniform(97.0, 100.0), 1)
            }
            # Publish to topic
            response = iot_data_client.publish(
                topic=topic,
                qos=1,
                payload=json.dumps(message)
            )
            print(f"Published: {message}")
        # Wait 30 seconds before next round
        print("Sleeping for 30 seconds...\n")
        time.sleep(30)
except KeyboardInterrupt:
    print("\nSimulation stopped by user.")

The following is the format of a sample ICU sensor message produced by the simulator.

{
    "patient_id": 1,
    "timestamp": 1683000000,
    "spo2": 85,
    "heart_rate": 75,
    "temperature_f": 98.6
}

Data is published to the icu/sensors IoT topic every 30 seconds for 10 different patients, creating a continuous stream of ICU patient monitoring data. Messages published to AWS IoT Core are passed to Kinesis Data Streams using the following message routing rule deployed by our solution.

Two Lambda functions consume data from Data Streams concurrently, both using the Lambda event source mapping integration with Kinesis Data Streams.

Event source mapping

Lambda event source mapping automatically triggers Lambda functions in response to data changes from supported event sources like Amazon DynamoDB Streams, Amazon Kinesis Data Streams, Amazon Simple Queue Service (Amazon SQS), Amazon MQ, and Amazon Managed Streaming for Apache Kafka. This serverless integration works by having Lambda poll these sources for new records, which are then processed in configurable batch sizes ranging from 1 to 10,000 records. When new data is detected, Lambda automatically invokes the function synchronously, handling the scaling automatically based on the workload. The service supports at-least-once delivery and provides robust error handling through retry policies and dead-letter queues for failed events. Event source mappings can be fine-tuned through various parameters such as batch windows, maximum record age, and retry attempts, making them highly adaptable to different use cases. This feature is particularly valuable in event-driven architectures, so that customers can focus on business logic while AWS manages the complexities of event processing, scaling, and reliability.

Event source mapping uses tumbling windows and filtering to process and analyze data.

Tumbling windows

Tumbling windows in Lambda event processing enable data aggregation in fixed, non-overlapping time intervals, where each event belongs to exactly one window. This is ideal for time-based analytics and periodic reporting. When combined with event source mapping, this approach allows efficient batch processing of events within defined time periods (for example, 10-minute windows), enabling calculations such as average vital signs or cumulative fluid intake and output while optimizing function invocations and resource usage.

When you configure an event source mapping between Kinesis Data Streams and a Lambda function, use the Tumbling Window Duration setting, which appears in the trigger configuration in the Lambda console. The solution you deployed using the CloudFormation template includes the AggregateSensorData Lambda function, which uses a 10-minute tumbling window configuration. Depending on the volume of messages flowing through the Amazon Kinesis stream, the AggregateSensorData function can be invoked multiple times for each 10-minute window, sequentially, with the following attributes in the event supplied to the function.

  • Window start and end: The beginning and ending timestamps for the current tumbling window.
  • State: An object containing the state returned from the previous window, which is initially empty. The state object can contain up to 1 MB of data.
  • isFinalInvokeForWindow: Indicates if this is the last invocation for the tumbling window. This only occurs once per window period.
  • isWindowTerminatedEarly: A window ends early only if the state exceeds the maximum allowed size of 1 MB.

In a tumbling window, there is a series of Lambda invocations in the following pattern:

AggregateSensorData Lambda code snippet:

def handler(event, context):
    
    state_across_window = event['state']
    # Iterate through each record and decode the base64 data
    for record in event['Records']:
        encoded_data = record['kinesis']['data']
        partition_key = record['kinesis']['partitionKey']
        decoded_bytes = base64.b64decode(encoded_data)
        decoded_str = decoded_bytes.decode('utf-8')
        decoded_json = json.loads(decoded_str)
        # create partition_key attribute if it do not exists in state
        if partition_key not in state_across_window:
            state_across_window[partition_key] = {"min_spo2": decoded_json['spo2'], "max_spo2": decoded_json['spo2'], "avg_spo2": decoded_json['spo2'], "sum_spo2": decoded_json['spo2'], "min_heart_rate": decoded_json['heart_rate'], "max_heart_rate": decoded_json['heart_rate'], "avg_heart_rate": decoded_json['heart_rate'], "sum_heart_rate": decoded_json['heart_rate'], "min_temperature_f": decoded_json['temperature_f'], "max_temperature_f": decoded_json['temperature_f'], "avg_temperature_f": decoded_json['temperature_f'], "sum_temperature_f": decoded_json['temperature_f'], "record_count": 1}
        else:
            min_spo2 = state_across_window[partition_key]['min_spo2'] if state_across_window[partition_key]['min_spo2'] < decoded_json['spo2'] else decoded_json['spo2']
            max_spo2 = state_across_window[partition_key]['max_spo2'] if state_across_window[partition_key]['max_spo2'] > decoded_json['spo2'] else decoded_json['spo2']
            sum_spo2 = state_across_window[partition_key]['sum_spo2'] + decoded_json['spo2']
            min_heart_rate = state_across_window[partition_key]['min_heart_rate'] if state_across_window[partition_key]['min_heart_rate'] < decoded_json['heart_rate'] else decoded_json['heart_rate']
            max_heart_rate = state_across_window[partition_key]['max_heart_rate'] if state_across_window[partition_key]['max_heart_rate'] > decoded_json['heart_rate'] else decoded_json['heart_rate']
            sum_heart_rate = state_across_window[partition_key]['sum_heart_rate'] + decoded_json['heart_rate']
            
            min_temperature_f = state_across_window[partition_key]['min_temperature_f'] if state_across_window[partition_key]['min_temperature_f'] < decoded_json['temperature_f'] else decoded_json['temperature_f']
            max_temperature_f = state_across_window[partition_key]['max_temperature_f'] if state_across_window[partition_key]['max_temperature_f'] > decoded_json['temperature_f'] else decoded_json['temperature_f']
            sum_temperature_f = state_across_window[partition_key]['sum_temperature_f'] + decoded_json['temperature_f']
            
            record_count = state_across_window[partition_key]['record_count'] + 1
            avg_spo2 = sum_spo2/record_count
            avg_heart_rate = sum_heart_rate/record_count
            avg_temperature_f = sum_temperature_f/record_count
            
            state_across_window[partition_key] = {"min_spo2": min_spo2, "max_spo2": max_spo2, "avg_spo2": avg_spo2, "sum_spo2": sum_spo2, "min_heart_rate": min_heart_rate, "max_heart_rate": max_heart_rate, "avg_heart_rate": avg_heart_rate, "sum_heart_rate": sum_heart_rate, "min_temperature_f": min_temperature_f, "max_temperature_f": max_temperature_f, "avg_temperature_f": avg_temperature_f, "sum_temperature_f": sum_temperature_f, "record_count": record_count}
        
    # Determine if the window is final (window end)
    is_final_window = event.get('isFinalInvokeForWindow', False)
    # Determine if the window is terminated (window ended early)
    is_terminated_window = event.get('isWindowTerminatedEarly', False)
    window_start = event['window']['start']
    window_end = event['window']['end']
    if is_final_window or is_terminated_window:
        firehose_client = boto3.client('firehose')
        firehose_stream = os.environ['FIREHOSE_STREAM_NAME']
        for key, value in state_across_window.items():
            value['patient_id'] = key
            value['window_start'] = window_start
            value['window_end'] = window_end
            
            firehose_client.put_record(
                DeliveryStreamName= firehose_stream,
                Record={'Data': json.dumps(value) }
            )
        
        return {
            "state": {},
            "batchItemFailures": []
        }
    else:
        print(f"interim call for window: ws: {window_start} we: {window_end}")
        return {
            "state": state_across_window,
            "batchItemFailures": []
        }
  • The first invocation contains an empty state object in the event. The function returns a state object containing custom attributes that are specific to the custom logic in the aggregation.
  • The second invocation contains the state object provided by the first Lambda invocation. This function returns an updated state object with new aggregated values. Subsequent invocations follow this same sequence. Following is a sample of the aggregated state, which can be supplied to subsequent Lambda invocations within the same 10-minute tumbling window.
{
    "min_spo2": 88,
    "max_spo2": 90,
    "avg_spo2": 89.2,
    "sum_spo2": 625,
    "min_heart_rate": 21,
    "max_heart_rate": 22,
    "avg_heart_rate": 21.1,
    "sum_heart_rate": 148,
    "min_temperature_f": 90,
    "max_temperature_f": 91,
    "avg_temperature_f": 90.1,
    "sum_temperature_f": 631,
    "record_count": 7,
    "patient_id": "44",
    "window_start": "2025-05-29T20:51:00Z",
    "window_end": "2025-05-29T20:52:00Z"
}
  • The final invocation in the tumbling window has the isFinalInvokeForWindow flag set to the true. This contains the state returned by the most recent Lambda invocation. This invocation is responsible for passing aggregated state messages to the Data Firehose stream, which delivers data to the Amazon S3 bucket using Iceberg data format.
  • After the aggregated data is sent to Amazon S3, you can query the data using Athena.
Query: SELECT * FROM "cfdb_<<Database>>"."table_<<Table>>"

Sample result of the preceding Athena query:

Event source mapping with filtering

Lambda event source mapping with filtering optimizes data processing from sources like Amazon Kinesis by applying JSON pattern filtering before function invocation. This is demonstrated in the ICU patient monitoring solution, where the system filters for SpO2 readings from Kinesis Data Streams that are below 90%. Instead of processing all incoming data, the filtering capability is used to selectively processes only critical readings, significantly reducing costs and processing overhead. The solution uses DynamoDB for sophisticated state management, tracking low SpO2 events through a schema combining PatientID and timestamp-based keys within defined monitoring windows.

This state-aware implementation balances clinical urgency with operational efficiency by sending immediate Amazon SNS notifications when critical conditions are first detected while implementing a 15-minute alert suppression window to prevent alert fatigue among healthcare providers. By maintaining state across multiple Lambda invocations, the system helps ensure rapid response to potentially life-threatening situations while minimizing unnecessary notifications for the same patient condition. The integration of Lambda’event filtering, DynamoDB state management, and reliable alert delivery provided by Amazon SNS creates a robust, scalable healthcare monitoring solution that exemplifies how AWS services can be strategically combined to address complex requirements while balancing technical efficiency with clinical effectiveness.

Filter sensor data Lambda code snippet:

sns_client = boto3.client('sns')
dynamodb = boto3.resource('dynamodb')
table_name = os.environ['DYNAMODB_TABLE']
sns_topic_arn = os.environ['SNS_TOPIC_ARN']
table = dynamodb.Table(table_name)
FIFTEEN_MINUTES = 15 * 60  # 15 minutes in seconds
def handler(event, context):
    for record in event['Records']:
        print(f"Aggregated event: {record}")
        encoded_data = record['kinesis']['data']
        partition_key = record['kinesis']['partitionKey']
        decoded_bytes = base64.b64decode(encoded_data)
        decoded_str = decoded_bytes.decode('utf-8')
        # Check last notification timestamp from DynamoDB
        try:
            response = table.get_item(Key={'partition_key': partition_key})
            item = response.get('Item')
            now = int(time.time())
            if item:
                last_sent = item.get('timestamp', 0)
                if now - last_sent < FIFTEEN_MINUTES:
                    print(f"Notification for {partition_key} skipped (sent recently)")
                    continue
            # Send SNS Notification
            sns_response = sns_client.publish(
                TopicArn=sns_topic_arn,
                Message=f"Patient SpO2 below 90 percentage event information: {decoded_str}",
                Subject=f"Low SpO2 detected for patient ID {partition_key}"
            )
            print("Message sent to SNS! MessageId:", sns_response['MessageId'])
            # Update DynamoDB with current timestamp and TTL
            table.put_item(Item={
                'partition_key': partition_key,
                'timestamp': now,
                'ttl': now + FIFTEEN_MINUTES + 60  # Add extra buffer to TTL
            })
        except Exception as e:
            print("Error processing event:", e)
            return {
                'statusCode': 500,
                'body': json.dumps('Error processing event')
            }
    return {
        'statusCode': 200,
        'body': {}
    }

To generate an alert notification through the deployed solution, update the preceding simulator code by setting the SpO2 value to less than 90 and run it again. Within 1 minute, you should receive an alert notification at the email address you provided during stack creation. The following image is an example of an alert notification generated by the deployed solution.

Clean up

To avoid ongoing costs after completing this tutorial, delete the CloudFormation stack that you deployed earlier in this post. This will remove most of the AWS resources created for this solution. You might need to manually delete objects created in Amazon S3, because CloudFormation won’t remove non-empty buckets during stack deletion.

Conclusion

As demonstrated in this post, you can build a serverless real-time analytics pipeline for healthcare monitoring by using AWS IoT Core, Amazon S3 buckets with iceberg format, and Amazon Kinesis Data Streams integration with AWS Lambda event source mapping. This architectural approach eliminates the need for complex code while enabling rapid critical patient care alerts and data aggregation for analysis using Lambda. The solution is particularly valuable for healthcare organizations looking to modernize their patient monitoring systems with real-time capabilities. The architecture can be extended to handle various medical devices and sensor data streams, making it adaptable for different healthcare monitoring scenarios. This post presents one implementation approach, and organizations adopting this solution should ensure the architecture and code meets their specific application performance, security, privacy, and regulatory compliance needs.

If this post helps you or inspires you to solve a problem, we would love to hear about it!


About the authors

Nihar Sheth

Nihar Sheth

Nihar is a Senior Product Manager on the AWS Lambda team at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enable customers to achieve their business goals.

Pratik Patel

Pratik Patel

Pratik is Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively helps in keeping customers’ AWS environments operationally healthy.

Priyanka Chaudhary

Priyanka Chaudhary

Priyanka is Senior Solutions Architect at AWS. She is specialized in data lake and analytics services and helps many customers in this area. As a Solutions Architect, she plays a crucial role in guiding strategic customers through their cloud journey by designing scalable and secure cloud solutions. Outside of work, she loves spending time with friends and family, watching movies, and traveling.

Build a streaming data mesh using Amazon Kinesis Data Streams

Post Syndicated from Felix John original https://aws.amazon.com/blogs/big-data/build-a-streaming-data-mesh-using-amazon-kinesis-data-streams/

Organizations face an ever-increasing need to process and analyze data in real time. Traditional batch processing methods no longer suffice in a world where instant insights and immediate responses to market changes are crucial for maintaining competitive advantage. Streaming data has emerged as the cornerstone of modern data architectures, helping businesses capture, process, and act upon data as it’s generated.

As customers move from batch to real-time processing for streaming data, organizations are facing another challenge: scaling data management across the enterprise, because the centralized data platform can become the bottleneck. Data mesh for streaming data has emerged as a solution to address this challenge, building on the following principles:

  • Distributed domain-driven architecture – Moving away from centralized data teams to domain-specific ownership
  • Data as a product – Treating data as a first-class product with clear ownership and quality standards
  • Self-serve data infrastructure – Enabling domains to manage their data independently
  • Federated data governance – Following global standards and policies while allowing domain autonomy

A streaming mesh applies these principles to real-time data movement and processing. This mesh is a modern architectural approach that enables real-time data movement across decentralized domains. It provides a flexible, scalable framework for continuous data flow while maintaining the data mesh principles of domain ownership and self-service capabilities. A streaming mesh represents a modern approach to data integration and distribution, breaking down traditional silos and helping organizations create more dynamic, responsive data ecosystems.

AWS provides two primary solutions for streaming ingestion and storage: Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Amazon Kinesis Data Streams. These services are key to building a streaming mesh on AWS. In this post, we explore how to build a streaming mesh using Kinesis Data Streams.

Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. The service can continuously capture gigabytes of data per second from hundreds of thousands of sources, making it ideal for building streaming mesh architectures. Key features include automatic scaling, on-demand provisioning, built-in security controls, and the ability to retain data for up to 365 days for replay purposes.

Benefits of a streaming mesh

A streaming mesh can deliver the following benefits:

  • Scalability – Organizations can scale from processing thousands to millions of events per second using managed scaling capabilities such as Kinesis Data Streams on-demand, while maintaining transparent operations for both producers and consumers.
  • Speed and architectural simplification – Streaming mesh enables real-time data flows, alleviating the need for complex orchestration and extract, transform, and load (ETL) processes. Data is streamed directly from source to consumers as it’s produced, simplifying the overall architecture. This approach replaces intricate point-to-point integrations and scheduled batch jobs with a streamlined, real-time data backbone. For example, instead of running nightly batch jobs to synchronize inventory data of physical goods across regions, a streaming mesh allows for instant inventory updates across all systems as sales occur, significantly reducing architectural complexity and latency.
  • Data synchronization – A streaming mesh captures source system changes one time and enables multiple downstream systems to independently process the same data stream. For instance, a single order processing stream can simultaneously update inventory systems, shipping services, and analytics platforms while maintaining replay capability, minimizing redundant integrations and providing data consistency.

The following personas have distinct responsibilities in the context of a streaming mesh:

  • Producers – Producers are responsible for generating and emitting data products into the streaming mesh. They have full ownership over the data products they generate and must make sure these data products adhere to predefined data quality and format standards. Additionally, producers are tasked with managing the schema evolution of the streaming data, while also meeting service level agreements for data delivery.
  • Consumers – Consumers are responsible for consuming and processing data products from the streaming mesh. They rely on the data products provided by producers to support their applications or analytics needs.
  • Governance – Governance is responsible for maintaining both the operational health and security of the streaming mesh platform. This includes managing scalability to handle changing workloads, enforcing data retention policies, and optimizing resource usage for efficiency. They also oversee security and compliance, enforcing proper access control, data encryption, and adherence to regulatory standards.

The streaming mesh establishes a common platform that enables seamless collaboration between producers, consumers, and governance teams. By clearly defining responsibilities and providing self-service capabilities, it removes traditional integration barriers while maintaining security and compliance. This approach helps organizations break down data silos and achieve more efficient, flexible data utilization across the enterprise.A streaming mesh architecture consists of two key constructs: stream storage and the stream processor. Stream storage serves all three key personas—governance, producers, and consumers—by providing a reliable, scalable, on-demand platform for data retention and distribution.

The stream processor is essential for consumers reading and transforming the data. Kinesis Data Streams integrates seamlessly with various processing options. AWS Lambda can read from a Kinesis data stream through event source mapping, which is a Lambda resource that reads items from the stream and invokes a Lambda function with batches of records. Other processing options include the Kinesis Client Library (KCL) for building custom consumer applications, Amazon Managed Service for Apache Flink for complex stream processing at scale, Amazon Data Firehose, and more. To learn more, refer to Read data from Amazon Kinesis Data Streams.

This combination of storage and flexible processing capabilities supports the diverse needs of multiple personas while maintaining operational simplicity.

Common access patterns for building a streaming mesh

When building a streaming mesh, you should consider data ingestion, governance, access control, storage, schema control, and processing. When implementing the components that make up the streaming mesh, you must properly address the needs of the personas defined in the previous section: producer, consumer, and governance. A key consideration in streaming mesh architectures is the fact that producers and consumers can also exist outside of AWS entirely. In this post, we examine the key scenarios illustrated in the following diagram. Although the diagram has been simplified for clarity, it highlights the most important scenarios in a streaming mesh architecture:

  • External sharing – This involves producers or consumers outside of AWS
  • Internal sharing – This involves producers and consumers within AWS, potentially across different AWS accounts or AWS Regions

Overview of internal and external sharing

Building a streaming mesh on a self-managed streaming solution that facilitates internal and external sharing can be challenging because producers and consumers require the appropriate service discovery, network connectivity, security, and access control to be able to interact with the mesh. This can involve implementing complex networking solutions such as VPN connections with authentication and authorization mechanisms to support secure connectivity. In addition, you must consider the access pattern of the consumers when building the streaming mesh.The following are common access patterns:

  • Shared data access with replay – This pattern allows multiple (standard or enhanced fan-out) consumers to access the same data stream as well as the ability to replay data as needed. For example, a centralized log stream might serve various teams: security operations for threat detection, IT operations for system troubleshooting, or development teams for debugging. Each team can access and replay the same log data for their specific needs.
  • Messaging filtering based on rules – In this pattern, you must filter the data stream, and consumers are only reading a subset of the data stream. The filtering is based on predefined rules at the column or row level.
  • Fan-out to subscribers without replay – This pattern is designed for real-time distribution of messages to multiple subscribers with each subscriber or consumer. The messages are delivered under at-most-once semantics and can be dropped or deleted after consumption. The subscribers can’t replay the events. The data is consumed by services such as AWS AppSync or other GraphQL-based APIs using WebSockets.

The following diagram illustrates these access patterns.

Streaming mesh patterns

Build a streaming mesh using Kinesis Data Streams

When building a streaming mesh that involves internal and external sharing, you can use Kinesis Data Streams. This service offers a built-in API layer that deliver secure and highly available HTTP/S endpoints accessible through the Kinesis API. Producers and consumers can securely write and read from the Kinesis Data Streams endpoints using the AWS SDK, the Amazon Kinesis Producer Library (KPL), or Kinesis Client Library (KCL), alleviating the need for custom REST proxies or additional API infrastructure.

Security is inherently integrated through AWS Identity and Access Management (IAM), supporting fine-grained access control that can be centrally managed. You can also use attribute-based access control (ABAC) with stream tags assigned to Kinesis Data Streams resources for managing access control to the streaming mesh, because ABAC is particularly helpful in complex and scaling environments. Because ABAC is attribute-based, it enables dynamic authorization for data producers and consumers in real time, automatically adapting access permissions as organizational and data requirements evolve. In addition, Kinesis Data Streams provides built-in rate limiting, request throttling, and burst handling capabilities.

In the following sections, we revisit the previously mentioned common access patterns for consumers in the context of a streaming mesh and discuss how to build the patterns using Kinesis Data Streams.

Shared data access with replay

Kinesis Data Stream has built-in support for the shared data access with replay pattern. The following diagram illustrates this access pattern, focusing on same-account, cross-account, and external consumers.

Shared access with replay

Governance

When you create your data mesh with Kinesis Data Streams, you should create a data stream with the appropriate number of provisioned shards or on-demand mode based on your throughput needs. On-demand mode should be considered for more dynamic workloads. Note that message ordering can only be guaranteed at the shard level.

Configure the data retention period of up to 365 days. The default retention period is 24 hours and can be modified using the Kinesis Data Streams API. This way, the data is retained for the specified retention period and can be replayed by the consumers. Note that there is an additional fee for long-term data retention fee beyond the default 24 hours.

To enhance network security, you can use interface VPC endpoints. They make sure the traffic between your producers and consumers residing in your virtual private cloud (VPC) and your Kinesis data streams remain private and don’t traverse the internet. To provide cross-account access to your Kinesis data stream, you can use resource policies or cross-account IAM roles. Resource-based policies are directly attached to the resource that you want to share access to, such as the Kinesis data stream, and a cross-account IAM role in one AWS account delegates specific permissions, such as read access to the Kinesis data stream, to another AWS account. At the time of writing, Kinesis Data Streams doesn’t support cross-Region access.

Kinesis Data Streams enforces quotas at the shard and stream level to prevent resource exhaustion and maintain consistent performance. Combined with shard-level Amazon CloudWatch metrics, these quotas help identify hot shards and prevent noisy neighbor scenarios that could impact overall stream performance.

Producer

You can build producer applications using the AWS SDK or the KPL. Using the KPL can facilitate the writing because it provides built-in functions such as aggregation, retry mechanisms, pre-shard rate limiting, and increased throughput. The KPL can incur an additional processing delay. You should consider integrating Kinesis Data Streams with the AWS Glue Schema Registry to centrally control discover, control, and evolve schemas and make sure produced data is continuously validated by a registered schema.

You must make sure your producers can securely connect to the Kinesis API whether from inside or outside the AWS Cloud. Your producer can potentially live in the same AWS account, across accounts, or outside of AWS entirely. Typically, you want your producers to be as close as possible to the Region where your Kinesis data stream is running to minimize latency. You can enable cross-account access by attaching a resource-based policy to your Kinesis data stream that grants producers in other AWS accounts permission to write data. At the time of writing, the KPL doesn’t support specifying a stream Amazon Resource Name (ARN) when writing to a data stream. You must use the AWS SDK to write to a cross-account data stream (for more details, see Share your data stream with another account). There are also limitations for cross-Region support if you want to produce data to Kinesis Data Streams from Data Firehose in a different Region using the direct integration.

To securely access the Kinesis data stream, producers need valid credentials. Credentials should not be stored directly in the client application. Instead, you should use IAM roles to provide temporary credentials using the AssumeRole API through AWS Security Token Service (AWS STS). For producers outside of AWS, you can also consider AWS IAM Roles Anywhere to obtain temporary credentials in IAM. Importantly, only the minimum permissions that are required to write the stream should be granted. With ABAC support for Kinesis Data Streams, specific API actions can be allowed or denied when the tag on the data stream matches the tag defined in the IAM role principle.

Consumer

You can build consumers using the KCL or AWS SDK. The KCL can simplify reading from Kinesis data streams because it automatically handles complex tasks such as checkpointing and load balancing across multiple consumers. This shared access pattern can be implemented using standard as well as enhanced fan-out consumers. In the standard consumption mode, the read throughput is shared by all consumers reading from the same shard. The maximum throughput for each shard is 2 MBps. Records are delivered to the consumers in a pull model over HTTP using the GetRecords API. Alternatively, with enhanced fan-out, consumers can use the SubscribeToShard API with data pushed over HTTP/2 for lower-latency delivery. For more details, see Develop enhanced fan-out consumers with dedicated throughput.

Both consumption methods allow consumers to specify the shard and sequence number from which to start reading, enabling data replay from different points within the retention period. Kinesis Data Streams recommends to be aware of the shard limit that is shared and use fan-out when possible. KCL 2.0 or later uses enhanced fan-out by default, and you must specifically set the retrieval mode to POLLING to use the standard consumption model. Regarding connectivity and access control, you should closely follow what is already suggested for the producer side.

Messaging filtering based on rules

Although Kinesis Data Streams doesn’t provide built-in filtering capabilities, you can implement this pattern by combining it with Lambda or Managed Service for Apache Flink. For this post, we focus on using Lambda to filter messages.

Governance and producer

Governance and producer personas should follow the best practices already defined for the shared data access with replay pattern, as described in the previous section.

Consumer

You should create a Lambda function that consumes (shared throughput or dedicated throughput) from the stream and create a Lambda event source mapping with your filter criteria. At the time of writing, Lambda supports event source mappings for Amazon DynamoDB, Kinesis Data Streams, Amazon MQ, Managed Streaming for Apache Kafka or self-managed Kafka, and Amazon Simple Queue Service (Amazon SQS). Both the ingested data records and your filter criteria for the data field must be in a valid JSON format for Lambda to properly filter the incoming messages from Kinesis sources.

When using enhanced fan-out, you configure a Kinesis dedicated-throughput consumer to act as the trigger for your Lambda function. Lambda then filters the (aggregated) records and passes only those records that meet your filter criteria.

Fan-out to subscribers without replay

When distributing streaming data to multiple subscribers without the ability to replay, Kinesis Data Streams supports an intermediary pattern that’s particularly effective for web and mobile clients needing real-time updates. This pattern introduces an intermediary service to bridge between Kinesis Data Streams and the subscribers, processing records from the data stream (using a standard or enhanced fan-out consumer model) and delivering the data records to the subscribers in real time. Subscribers don’t directly interact with the Kinesis API.

A common approach uses GraphQL gateways such as AWS AppSync, WebSockets API services like the Amazon API Gateway WebSockets API, or other suitable services that make the data available to the subscribers. The data is distributed to the subscribers through networking connections such as WebSockets.

The following diagram illustrates the access pattern of fan-out to subscribers without replay. The diagram displays the managed AWS services AppSync and API Gateway as intermediary consumer options for illustration purposes.

Fan-out without replay

Governance and producer

Governance and producer personas should follow the best practices already defined for the shared data access with replay pattern.

Consumer

This consumption model operates differently from traditional Kinesis consumption patterns. Subscribers connect through networking connections such as WebSockets to the intermediary service and receive the data records in real time without the ability to set offsets, replay historical data, or control data positioning. The delivery follows at-most-once semantics, where messages might be lost if subscribers disconnect, because consumption is ephemeral without persistence for individual subscribers. The intermediary consumer service must be designed for high performance, low latency, and resilient message distribution. Potential intermediary service implementations range from managed services such as AppSync or API Gateway to custom-built solutions like WebSocket servers or GraphQL subscription services. In addition, this pattern requires an intermediary consumer service such as Lambda that reads the data from the Kinesis data stream and immediately writes it to the intermediary service.

Conclusion

This post highlighted the benefits of a streaming mesh. We demonstrated why Kinesis Data Streams is particularly suited to facilitate a secure and scalable streaming mesh architecture for internal as well as external sharing. The reasons include the service’s built-in API layer, comprehensive security through IAM, flexible networking connection options, and versatile consumption models. The streaming mesh patterns demonstrated—shared data access with replay, message filtering, and fan-out to subscribers—showcase how Kinesis Data Streams effectively supports producers, consumers, and governance teams across internal and external boundaries.

For more information on how to get started with Kinesis Data Streams, refer to Getting started with Amazon Kinesis Data Streams. For other posts on Kinesis Data Streams, browse through the AWS Big Data Blog.


About the authors

Felix John

Felix John

Felix is a Global Solutions Architect and data streaming expert at AWS, based in Germany. He focuses on supporting global automotive & manufacturing customers on their cloud journey. Outside of his professional life, Felix enjoys playing Floorball and hiking in the mountains.

Ali Alemi

Ali Alemi

Ali is a Principal Streaming Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Monitor and debug event-driven applications with new Amazon EventBridge logging

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/monitor-and-debug-event-driven-applications-with-new-amazon-eventbridge-logging/

Starting today, you can use enhanced logging capability in Amazon EventBridge to monitor and debug your event-driven applications with comprehensive logs. These new enhancements help improve how you monitor and troubleshoot event flows.

Here’s how you can find this new capability on the Amazon EventBridge console:

The new observability capabilities address microservices and event-driven architecture monitoring challenges by providing comprehensive event lifecycle tracking. EventBridge now generates detailed log entries every time a matched event against rules is published, delivered to subscribers, or encounters failures and retries.

You gain visibility into the complete event journey with detailed information about successes, failures, and status codes that make identifying and diagnosing issues straightforward. What used to take hours of trial-and-error debugging now takes minutes with detailed event lifecycle tracking and built-in query tools.

Using Amazon EventBridge enhanced observability
Let me walk you through a demonstration that showcases the logging capability in Amazon EventBridge.

I can enable logging for an existing event bus or when creating a new custom event bus. First, I navigate to the EventBridge console and choose Event buses in the left navigation pane. In Custom event bus, I choose Create event bus.

I can see this new capability in the Logs section. I have three options to configure the Log destination: Amazon CloudWatch Logs, Amazon Data Firehose Stream, and Amazon Simple Storage Service (Amazon S3). If I want to stream my logs into a data lake, I can select Amazon Kinesis Data Firehose Stream. Logs are encrypted in transit with TLS and at rest if a customer-managed key (CMK) is provided for the event bus. CloudWatch Logs supports customer-managed keys, and Data Firehose offers server-side encryption for downstream destinations.

For this demo, I select CloudWatch logs and S3 logs.

I can also choose Log level, from Error, Info, or Trace. I choose Trace and select Include execution data because I need to review the payloads. You need to be mindful as logging payload data may contain sensitive information, and this setting applies to all log destinations you select. Then, I configure two destinations, one each for CloudWatch log group and S3 logs. Then I choose Create.

After logging is enabled, I can start publishing test events to observe the logging behavior.

For the first scenario, I’ve built an AWS Lambda function and configured this Lambda function as a target.

I navigate to my event bus to send a sample event by choosing Send events.

Here’s the payload that I use:

{
  "Source": "ecommerce.orders",
  "DetailType": "Order Placed",
  "Detail": {
    "orderId": "12345",
    "customerId": "cust-789",
    "amount": 99.99,
    "items": [
      {
        "productId": "prod-456",
        "quantity": 2,
        "price": 49.99
      }
    ]
  }
}

After I sent the sample event, I can see the logs are available in my S3 bucket.

I can also see the log entries appearing in the Amazon CloudWatch logs. The logs show the event lifecycle, from EVENT_RECEIPT to SUCCESS. Learn more about the complete event lifecycle on TBD:DOC_PAGE.

Now, let’s evaluate these logs. For brevity, I only include a few logs and have redacted them for readability. Here’s the log from when I triggered the event:

{
    "resource_arn": "arn:aws:events:us-east-1:123:event-bus/demo-logging",
    "message_timestamp_ms": 1751608776896,
    "event_bus_name": "demo-logging",
// REDACTED FOR BREVITY //
    "message_type": "EVENT_RECEIPT",
    "log_level": "TRACE",
    "details": {
        "caller_account_id": "123",
        "source_time_ms": 1751608775000,
        "source": "ecommerce.orders",
        "detail_type": "Order Placed",
        "resources": [],
        "event_detail": "REDACTED FOR BREVITY"
    }
}

Here’s the log when the event was successfully invoked:

{
    "resource_arn": "arn:aws:events:us-east-1:123:event-bus/demo-logging",
    "message_timestamp_ms": 1751608777091,
    "event_bus_name": "demo-logging",
// REDACTED FOR BREVITY //
    "message_type": "INVOCATION_SUCCESS",
    "log_level": "INFO",
    "details": {
// REDACTED FOR BREVITY //
        "total_attempts": 1,
        "final_invocation_status": "SUCCESS",
        "ingestion_to_start_latency_ms": 105,
        "ingestion_to_complete_latency_ms": 183,
        "ingestion_to_success_latency_ms": 183,
        "target_duration_ms": 53,
        "target_response_body": "<REDACTED FOR BREVITY>",
        "http_status_code": 202
    }
}

The additional log entries include rich metadata that makes troubleshooting straightforward. For example, on a successful event, I can see the latency timing from starting to completing the event, duration for the target to finish processing, and HTTP status code.

Debugging failures with complete event lifecycle tracking
The benefit of EventBridge logging becomes apparent when things go wrong. To test failure scenarios, I intentionally misconfigure a Lambda function’s permissions and change the rule to point to a different Lambda function without proper permissions.

The attempt failed with a permanent failure due to missing permissions. The log shows it’s a FIRST attempt that resulted in NO_PERMISSIONS status.

{
    "message_type": "INVOCATION_ATTEMPT_PERMANENT_FAILURE",
    "log_level": "ERROR",
    "details": {
        "rule_arn": "arn:aws:events:us-east-1:123:rule/demo-logging/demo-order-placed",
        "role_arn": "arn:aws:iam::123:role/service-role/Amazon_EventBridge_Invoke_Lambda_123",
        "target_arn": "arn:aws:lambda:us-east-1:123:function:demo-evb-fail",
        "attempt_type": "FIRST",
        "attempt_count": 1,
        "invocation_status": "NO_PERMISSIONS",
        "target_duration_ms": 25,
        "target_response_body": "{\"requestId\":\"a4bdfdc9-4806-4f3e-9961-31559cb2db62\",\"errorCode\":\"AccessDeniedException\",\"errorType\":\"Client\",\"errorMessage\":\"User: arn:aws:sts::123:assumed-role/Amazon_EventBridge_Invoke_Lambda_123/db4bff0a7e8539c4b12579ae111a3b0b is not authorized to perform: lambda:InvokeFunction on resource: arn:aws:lambda:us-east-1:123:function:demo-evb-fail because no identity-based policy allows the lambda:InvokeFunction action\",\"statusCode\":403}",
        "http_status_code": 403
    }
}

The final log entry summarizes the complete failure with timing metrics and the exact error message.

{
    "message_type": "INVOCATION_FAILURE",
    "log_level": "ERROR",
    "details": {
        "rule_arn": "arn:aws:events:us-east-1:123:rule/demo-logging/demo-order-placed",
        "role_arn": "arn:aws:iam::123:role/service-role/Amazon_EventBridge_Invoke_Lambda_123",
        "target_arn": "arn:aws:lambda:us-east-1:123:function:demo-evb-fail",
        "total_attempts": 1,
        "final_invocation_status": "NO_PERMISSIONS",
        "ingestion_to_start_latency_ms": 62,
        "ingestion_to_complete_latency_ms": 114,
        "target_duration_ms": 25,
        "http_status_code": 403
    },
    "error": {
        "http_status_code": 403,
        "error_message": "User: arn:aws:sts::123:assumed-role/Amazon_EventBridge_Invoke_Lambda_123/db4bff0a7e8539c4b12579ae111a3b0b is not authorized to perform: lambda:InvokeFunction on resource: arn:aws:lambda:us-east-1:123:function:demo-evb-fail because no identity-based policy allows the lambda:InvokeFunction action",
        "aws_service": "AWSLambda",
        "request_id": "a4bdfdc9-4806-4f3e-9961-31559cb2db62"
    }
}

The logs provide detailed performance metrics that help identify bottlenecks. The ingestion_to_start_latency_ms: 62 shows the time from event ingestion to starting invocation, while ingestion_to_complete_latency_ms: 114 represents the total time from ingestion to completion. Additionally, target_duration_ms: 25 indicates how long the target service took to respond, helping distinguish between EventBridge processing time and target service performance.

The error message clearly states what failed, lambda:InvokeFunction action, why it failed, (no identity-based policy allows the action), which role was involved (Amazon_EventBridge_Invoke_Lambda_1428392416), and which specific resource was affected, which was indicated by the Lambda function Amazon Resource Name (ARN).

Debugging API Destinations with EventBridge Logging
One particular use case that I think EventBridge logging capability will be helpful is to debug issues with API destinations. EventBridge API destinations are HTTPS endpoints that you can invoke as the target of an event bus rule or pipe. HTTPS endpoints help you to route events from your event bus to external systems, software-as-a-service (SaaS) applications, or third-party APIs using HTTPS calls. They use connections to handle authentication and credentials, making it easy to integrate your event-driven architecture with any HTTPS-based service. 

API destinations are commonly used to send events to external HTTPS endpoints and debugging failures from the external endpoint can be a challenge. These problems typically stem from changes to the endpoint authentication requirements or modified credentials.

To demonstrate this debugging capability, I intentionally configured an API destination with incorrect credentials in the connection resource.

When I send an event to this misconfigured endpoint, the enhanced logging shows the root cause of this failure.

{
    "resource_arn": "arn:aws:events:us-east-1:123:event-bus/demo-logging",
    "message_timestamp_ms": 1750344097251,
    "event_bus_name": "demo-logging",
    //REDACTED FOR BREVITY//,
    "message_type": "INVOCATION_FAILURE",
    "log_level": "ERROR",
    "details": {
        //REDACTED FOR BREVITY//,
        "total_attempts": 1,
        "final_invocation_status": "SDK_CLIENT_ERROR",
        "ingestion_to_start_latency_ms": 135,
        "ingestion_to_complete_latency_ms": 549,
        "target_duration_ms": 327,
        "target_response_body": "",
        "http_status_code": 400
    },
    "error": {
        "http_status_code": 400,
        "error_message": "Unable to invoke ApiDestination endpoint: The request failed because the credentials included for the connection are not authorized for the API destination."
    }
}

The log provides immediate clarity about the failure. The target_arn shows this involves an API destination, the final_invocation_status indicates SDK_CLIENT_ERROR, and the http_status_code of 400 , which points to a client-side issue. Most importantly, the error_message explicitly states that: Unable to invoke ApiDestination endpoint: The request failed because the credentials included for the connection are not authorized for the API destination.

This complete log sequence provides useful debugging insights because I can see exactly how the event moved through EventBridge — from event receipt, to ingestion, to rule matching, to invocation attempts. This level of detail eliminates guesswork and points directly to the root cause of the issue.

Additional things to know
Here are a couple of things to note:

  • Architecture support – Logging works with all EventBridge features including custom event buses, partner event sources, and API destinations for HTTPS endpoints.
  • Performance impact – Logging operates asynchronously with no measurable impact on event processing latency or throughput.
  • Pricing – You pay standard Amazon S3, Amazon CloudWatch Logs or Amazon Data Firehose pricing for log storage and delivery. EventBridge logging itself incurs no additional charges. For details, visit the Amazon EventBridge pricing page .
  • Availability – Amazon EventBridge logging capability is available in all AWS Regions where EventBridge is supported.
  • Documentation — For more details, refer to the Amazon EventBridge monitoring and debugging Documentation.

Get started with Amazon EventBridge logging capability by visiting the EventBridge console and enabling logging on your event buses.

Happy building!
— Donnie 

Overcome your Kafka Connect challenges with Amazon Data Firehose

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/overcome-your-kafka-connect-challenges-with-amazon-data-firehose/

Apache Kafka is a popular open source distributed streaming platform that is widely used in the AWS ecosystem. It’s designed to handle real-time, high-throughput data streams, making it well-suited for building real-time data pipelines to meet the streaming needs of modern cloud-based applications.

For AWS customers looking to run Apache Kafka, but don’t want to worry about the undifferentiated heavy lifting involved with self-managing their Kafka clusters, Amazon Managed Streaming for Apache Kafka (Amazon MSK) offers fully managed Apache Kafka. This means Amazon MSK provisions your servers, configures your Kafka clusters, replaces servers when they fail, orchestrates server patches and upgrades, makes sure clusters are architected for high availability, makes sure data is durably stored and secured, sets up monitoring and alarms, and runs scaling to support load changes. With a managed service, you can spend your time developing and running streaming event applications.

For applications to use data sent to Kafka, you need to write, deploy, and manage application code that consumes data from Kafka.

Kafka Connect is an open-source component of the Kafka project that provides a framework for connecting with external systems such as databases, key-value stores, search indexes, and file systems from your Kafka clusters. On AWS, our customers commonly write and manage connectors using the Kafka Connect framework to move data out of their Kafka clusters into persistent storage, like Amazon Simple Storage Service (Amazon S3), for long-term storage and historical analysis.

At scale, customers need to programmatically manage their Kafka Connect infrastructure for consistent deployments when updates are required, as well as the code for error handling, retries, compression, or data transformation as it is delivered from your Kafka cluster. However, this introduces a need for investment into the software development lifecycle (SDLC) of this management software. Although the SDLC is a cost-effective and time-efficient process to help development teams build high-quality software, for many customers, this process is not desirable for their data delivery use case, particularly when they could dedicate more resources towards innovating for other key business differentiators. Beyond SDLC challenges, many customers face fluctuating data streaming throughput. For instance:

  • Online gaming businesses experience throughput variations based on game usage
  • Video streaming applications see changes in throughput depending on viewership
  • Traditional businesses have throughput fluctuations tied to consumer activity

Striking the right balance between resources and workload can be challenging. Under-provisioning can lead to consumer lag, processing delays, and potential data loss during peak loads, hampering real-time data flows and business operations. On the other hand, over-provisioning results in underutilized resources and unnecessary high costs, making the setup economically inefficient for customers. Even the action of scaling up your infrastructure introduces additional delays because resources need to be provisioned and acquired for your Kafka Connect cluster.

Even when you can estimate aggregated throughput, predicting throughput per individual stream remains difficult. As a result, to achieve smooth operations, you might resort to over-provisioning your Kafka Connect resources (CPU) for your streams. This approach, though functional, might not be the most efficient or cost-effective solution.

Customers have been asking for a fully serverless solution that will not only handle managing resource allocation, but transition the cost model to only pay for the data they are delivering from the Kafka topic, instead of underlying resources that require constant monitoring and management.

In September 2023, we announced a new integration between Amazon and Amazon Data Firehose, allowing builders to deliver data from their MSK topics to their destination sinks with a fully managed, serverless solution. With this new integration, you no longer needed to develop and manage your own code to read, transform, and write your data to your sink using Kafka Connect. Data Firehose abstracts away the retry logic required when reading data from your MSK cluster and delivering it to the desired sink, as well as infrastructure provisioning, because it can scale out and scale in automatically to adjust to the volume of data to transfer. There are no provisioning or maintenance operations required on your side.

At release, the checkpoint time to start consuming data from the MSK topic was the creation time of the Firehose stream. Data Firehose couldn’t start reading from other points on the data stream. This caused challenges for several different use cases.

For customers that are setting up a mechanism to sink data from their cluster for the first time, all data in the topic older than the timestamp of Firehose stream creation would need another way to be persisted. For example, customers using Kafka Connect connectors, like These users were limited in using Data Firehose because they wanted to sink all the data from the topic to their sink, but Data Firehose couldn’t read data from earlier than the timestamp of Firehose stream creation.

For other customers that were running Kafka Connect and needed to migrate from their Kafka Connect infrastructure to Data Firehose, this required some extra coordination. The release functionality of Data Firehose means you can’t point your Firehose stream to a specific point on the source topic, so a migration requires stopping data ingest to the source MSK topic and waiting for Kafka Connect to sink all the data to the destination. Then you can create the Firehose stream and restart the producers such that the Firehose stream can then consume new messages from the topic. This adds additional, and non-trivial, overhead to the migration effort when attempting to cut over from an existing Kafka Connect infrastructure to a new Firehose stream.

To address these challenges, we’re happy to announce a new feature in the Data Firehose integration with Amazon MSK. You can now specify the Firehose stream to either read from the earliest position on the Kafka topic or from a custom timestamp to begin reading from your MSK topic.

In the first post of this series, we focused on managed data delivery from Kafka to your data lake. In this post, we extend the solution to choose a custom timestamp for your MSK topic to be synced to Amazon S3.

Overview of Data Firehose integration with Amazon MSK

Data Firehose integrates with Amazon MSK to offer a fully managed solution that simplifies the processing and delivery of streaming data from Kafka clusters into data lakes stored on Amazon S3. With just a few clicks, you can continuously load data from your desired Kafka clusters to an S3 bucket in the same account, eliminating the need to develop or run your own connector applications. The following are some of the key benefits to this approach:

  • Fully managed service – Data Firehose is a fully managed service that handles the provisioning, scaling, and operational tasks, allowing you to focus on configuring the data delivery pipeline.
  • Simplified configuration – With Data Firehose, you can set up the data delivery pipeline from Amazon MSK to your sink with just a few clicks on the AWS Management Console.
  • Automatic scaling – Data Firehose automatically scales to match the throughput of your Amazon MSK data, without the need for ongoing administration.
  • Data transformation and optimization – Data Firehose offers features like JSON to Parquet/ORC conversion and batch aggregation to optimize the delivered file size, simplifying data analytical processing workflows.
  • Error handling and retries – Data Firehose automatically retries data delivery in case of failures, with configurable retry durations and backup options.
  • Offset select option – With Data Firehose, you can select the starting position for the MSK delivery stream to be delivered within a topic from three options:
    • Firehose stream creation time – This allows you to deliver data starting from Firehose stream creation time. When migrating from to Data Firehose, if you have an option to pause the producer, you can consider this option.
    • Earliest – This allows you to deliver data starting from MSK topic creation time. You can choose this option if you’re setting a new delivery pipeline with Data Firehose from Amazon MSK to Amazon S3.
    • At timestamp – This option allows you to provide a specific start date and time in the topic from where you want the Firehose stream to read data. The time is in your local time zone. You can choose this option if you prefer not to stop your producer applications while migrating from Kafka Connect to Data Firehose. You can refer to the Python script and steps provided later in this post to derive the timestamp for the latest events in your topic that were consumed by Kafka Connect.

The following are benefits of the new timestamp selection feature with Data Firehose:

  • You can select the starting position of the MSK topic, not just from the point that the Firehose stream is created, but from any point from the earliest timestamp of the topic.
  • You can replay the MSK stream delivery if required, for example in the case of testing scenarios to select from different timestamps with the option to select from a specific timestamp.
  • When migrating from Kafka Connect to Data Firehose, gaps or duplicates can be managed by selecting the starting timestamp for Data Firehose delivery from the point where Kafka Connect delivery ended. Because the new custom timestamp feature isn’t monitoring Kafka consumer offsets per partition, the timestamp you select for your Kafka topic should be a few minutes before the timestamp at which you stopped Kafka Connect. The earlier the timestamp you select, the more duplicate records you will have downstream. The closer the timestamp to the time of Kafka Connect stopping, the higher the likelihood of data loss if certain partitions have fallen behind. Be sure to select a timestamp appropriate to your requirements.

Overview of solution

We discuss two scenarios to stream data.

In Scenario 1, we migrate to Data Firehose from Kafka Connect with the following steps:

  1. Derive the latest timestamp from MSK events that Kafka Connect delivered to Amazon S3.
  2. Create a Firehose delivery stream with Amazon MSK as the source and Amazon S3 as the destination with the topic starting position as Earliest.
  3. Query Amazon S3 to validate the data loaded.

In Scenario 2, we create a new data pipeline from Amazon MSK to Amazon S3 with Data Firehose:

  1. Create a Firehose delivery stream with Amazon MSK as the source and Amazon S3 as the destination with the topic starting position as At timestamp.
  2. Query Amazon S3 to validate the data loaded.

The solution architecture is depicted in the following diagram.

Prerequisites

You should have the following prerequisites:

  • An AWS account and access to the following AWS services:
  • An MSK provisioned or MSK serverless cluster with topics created and data streaming to it. The sample topic used in this is order.
  • An EC2 instance configured to use as a Kafka admin client. Refer to Create an IAM role for instructions to create the client machine and IAM role that you will need to run commands against your MSK cluster.
  • An S3 bucket for delivering data from Amazon MSK using Data Firehose.
  • Kafka Connect to deliver data from Amazon MSK to Amazon S3 if you want to migrate from Kafka Connect (Scenario 1).

Migrate to Data Firehose from Kafka Connect

To reduce duplicates and minimize data loss, you need to configure your custom timestamp for Data Firehose to read events as close to the timestamp of the oldest committed offset that Kafka Connect reported. You can follow the steps in this section to visualize how the timestamps of each committed offset will vary by partition across the topic you want to read from. This is for demonstration purposes and doesn’t scale as a solution for workloads with a large number of partitions.

Sample data was generated for demonstration purposes by following the instructions referenced in the following GitHub repo. We set up a sample producer application that generates clickstream events to simulate users browsing and performing actions on an imaginary ecommerce website.

To derive the latest timestamp from MSK events that Kafka Connect delivered to Amazon S3, complete the following steps:

  1. From your Kafka client, query Amazon MSK to retrieve the Kafka Connect consumer group ID:
    ./kafka-consumer-groups.sh --bootstrap-server $bs --list --command-config client.properties

  2. Stop Kafka Connect.
  3. Query Amazon MSK for the latest offset and associated timestamp for the consumer group belonging to Kafka Connect.

You can use the get_latest_offsets.py Python script from the following GitHub repo as a reference to get the timestamp associated with the latest offsets for your Kafka Connect consumer group. To enable authentication and authorization for a non-Java client with an IAM authenticated MSK cluster, refer to the following GitHub repo for instructions on installing the aws-msk-iam-sasl-signer-python package for your client.

python3 get_latest_offsets.py --broker-list $bs --topic-name “order” --consumer-group-id “connect-msk-serverless-connector-090224” --aws-region “eu-west-1”

Note the earliest timestamp across all the partitions.

Create a data pipeline from Amazon MSK to Amazon S3 with Data Firehose

The steps in this section are applicable to both scenarios. Complete the following steps to create your data pipeline:

  1. On the Data Firehose console, choose Firehose streams in the navigation pane.
  2. Choose Create Firehose stream.
  3. For Source, choose Amazon MSK.
  4. For Destination, choose Amazon S3.
  5. For Source settings, browse to the MSK cluster and enter the topic name you created as part of the prerequisites.
  6. Configure the Firehose stream starting position based on your scenario:
    1. For Scenario 1, set Topic starting position as At Timestamp and enter the timestamp you noted in the previous section.
    2. For Scenario 2, set Topic starting position as Earliest.
  7. For Firehose stream name, leave the default generated name or enter a name of your preference.
  8. For Destination settings, browse to the S3 bucket created as part of the prerequisites to stream data.

Within this S3 bucket, by default, a folder structure with YYYY/MM/dd/HH will be automatically created. Data will be delivered to subfolders pertaining to the HH subfolder according to the Data Firehose to Amazon S3 ingestion timestamp.

  1. Under Advanced settings, you can choose to create the default IAM role for all the permissions that Data Firehose needs or choose existing an IAM role that has the policies that Data Firehose needs.
  2. Choose Create Firehose stream.

On the Amazon S3 console, you can verify the data streamed to the S3 folder according to your chosen offset settings.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you’re not planning to use them further.

Conclusion

Data Firehose provides a straightforward way to deliver data from Amazon MSK to Amazon S3, enabling you to save costs and reduce latency to seconds. To try Data Firehose with Amazon S3, refer to the Delivery to Amazon S3 using Amazon Data Firehose lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Austin Groeneveld is a Streaming Specialist Solutions Architect at Amazon Web Services (AWS), based in the San Francisco Bay Area. In this role, Austin is passionate about helping customers accelerate insights from their data using the AWS platform. He is particularly fascinated by the growing role that data streaming plays in driving innovation in the data analytics space. Outside of his work at AWS, Austin enjoys watching and playing soccer, traveling, and spending quality time with his family.

Announcing end-of-support for Amazon Kinesis Client Library 1.x and Amazon Kinesis Producer Library 0.x effective January 30, 2026

Post Syndicated from Minu Hong original https://aws.amazon.com/blogs/big-data/announcing-end-of-support-for-amazon-kinesis-client-library-1-x-and-amazon-kinesis-producer-library-0-x-effective-january-30-2026/

Amazon Kinesis Client Library (KCL) 1.x and Amazon Kinesis Producer Library (KPL) 0.x will reach end-of-support on January 30, 2026. Accordingly, these versions will enter maintenance mode on April 17, 2025. During maintenance mode, AWS will provide updates only for critical bug fixes and security issues. Major versions in maintenance mode will not receive updates for new features or feature enhancements.

KCL is a software library that simplifies building applications to process streaming data from Amazon Kinesis Data Streams. KCL handles complex tasks such as load balancing across multiple consumer workers, responding to worker failures, checkpointing processed records, and adapting to throughput changes.

KPL is a software library that helps developers write producer applications to achieve high throughput data into Kinesis Data Streams. KPL manages record batching, aggregation, and retry logic to maximize throughput and optimize resource usage while simplifying the producer application development process.

The following table outlines the level of support for each phase of the major version lifecycle of KCL and KPL.

Major versions Version Lifecycle Phase Start Date End Date Support Level
KCL 1.x General Availability 12/19/2013 4/16/2025 During this phase, the major version is fully supported. AWS provides regular minor and patch version releases that include support for new features or API updates for Kinesis Data Streams, as well as bug and security fixes.
KPL 0.x General Availability 6/2/2015 4/16/2025 During this phase, the major version is fully supported. AWS provides regular minor and patch version releases that include support for new features or API updates for Kinesis Data Streams, as well as bug and security fixes.
KCL 1.x, KPL 0.x Maintenance mode 4/17/2025 1/29/2026 AWS is limiting patch version releases to address critical bug fixes and security issues only. The major version will not receive updates for new features or APIs of Kinesis Data Streams.
KCL 1.x, KPL 0.x End-of-support 1/30/2026 N/A The major version will no longer receive updates or releases. Previously published releases will continue to be available through public package managers and the code will remain on GitHub.

Impact on non-Java KCL Libraries

The maintenance mode and end-of-support dates apply to the following non-Java KCL 1.x libraries:

Migrating from KCL 1.x or KPL 0.x

If you’re using KCL 1.x or KPL 0.x, we recommend migrating to the latest versions (KCL 3.x and KPL 1.x). When migrating from KCL 1.x to 3.x, you will need to update interfaces and security credential providers in your application. For migrations from KPL 0.x to 1.x, you can upgrade your current KPL application without any change in your data processing logic. To learn more, refer to the following resources:

Summary

KCL 1.x and KPL 0.x will reach end-of-support on January 30, 2026 and enter maintenance mode on April 17, 2025. If you need assistance or have feedback, reach out to AWS support. You can also open an issue in the KCL GitHub repository or KPL GitHub repository.


About the author

Minu Hong is a Senior Product Manager for Amazon Kinesis Data Streams at AWS. He is passionate about understanding customer challenges around streaming data and developing optimized solutions for them. Outside of work, Minu enjoys traveling, playing tennis, skiing, and cooking.

Amazon Web Services named a Leader in the 2024 Gartner Magic Quadrant for Data Integration Tools

Post Syndicated from William Vambenepe original https://aws.amazon.com/blogs/big-data/amazon-web-services-named-a-leader-in-the-2024-gartner-magic-quadrant-for-data-integration-tools/

Amazon Web Services (AWS) has been recognized as a Leader in the 2024 Gartner Magic Quadrant for Data Integration Tools. We were positioned in the Challengers Quadrant in 2023.

This recognition, we feel, reflects our ongoing commitment to innovation and excellence in data integration, demonstrating our continued progress in providing comprehensive data management solutions.

The Gartner Magic Quadrant evaluates 20 data integration tool vendors based on two axes—Ability to Execute and Completeness of Vision. This evaluation, we feel, critically examines vendors’ capabilities to address key service needs, including data engineering, operational data integration, modern data architecture delivery, and enabling less-technical data integration across various deployment models.

Discover, prepare, and integrate all your data at any scale

AWS Glue is a fully managed, serverless data integration service that simplifies data preparation and transformation across diverse data sources. With its comprehensive suite of tools, AWS Glue allows users to build and manage data pipelines efficiently, without requiring extensive infrastructure management expertise.

Given the diverse data integration needs of customers, AWS offers a robust data integration system through multiple services including Amazon EMR, Amazon Athena, Amazon Managed Workflows for Apache Airflow (Amazon MWAA), Amazon Managed Streaming for Apache Kafka (MSK), Amazon Kinesis, and others. Many thousands of customers across various industries are using these services to transform, operationalize, and manage their data across data lakes and data warehouses.

We have embarked on a journey to unify the broad range of AWS data processing, analytics, and AI capabilities, starting with the announcement of Amazon SageMaker Unified Studio at re:Invent 2024. This includes the data integration capabilities mentioned above, with support for both structured and unstructured data. With an integrated experience for data workers, SageMaker Unified Studio provides an environment where users can collaborate and build faster. It supports model development, generative AI, data processing, and SQL analytics, all accelerated by Amazon Q Developer—our most capable generative AI assistant for software development. The Unified Studio provides unified access to all data sources, whether stored in data lakes, data warehouses, or third-party and federated sources, with robust governance and enterprise-grade security built-in.

Review the Gartner Magic Quadrant

Access a complimentary copy of the full report to see why Gartner positioned AWS as a Leader, and dive deep into the strengths and cautions of AWS. We believe our recognition as a Leader in the Gartner Magic Quadrant is a testament to delivering innovations for our customers.

MQ

Gartner does not endorse any vendor, product or service depicted in its research publications and does not advise technology users to select only those vendors with the highest ratings or other designation. Gartner research publications consist of the opinions of Gartner’s research organization and should not be construed as statements of fact. Gartner disclaims all warranties, expressed or implied, with respect to this research, including any warranties of merchantability or fitness for a particular purpose.

GARTNER is a registered trademark and service mark of Gartner and Magic Quadrant is a registered trademark of Gartner, Inc. and/or its affiliates in the U.S. and internationally and are used herein with permission. All rights reserved. This graphic was published by Gartner, Inc. as part of a larger research document and should be evaluated in the context of the entire document. The Gartner document is available upon request from here.


About the authors

William Vambenepe is Director of Product Management at AWS, where he leads the Product Management, Solutions Engineering, and UX Design team for data processing services (Amazon EMR, AWS Glue, Athena, Amazon MWAA), SageMaker Unified Studio, and SageMaker Catalog. Prior to AWS, William worked at Google (6 years building and growing the Data and Analytics product portfolio for Google Cloud, and 5 years as Product Management Director for Google Search). He had previously held software engineering leadership roles at Oracle and HP. William holds an Engineering degree from Ecole Centrale Paris, a graduate Diploma in Computer Science from Cambridge University, and a Masters in Engineering Management from Stanford University.

Santosh Chandrachood has been with AWS for over 8 years and helped build, launch, and scale a variety of AWS services. Currently, Santosh is Director and service leader for Data Processing, managing Amazon EMR, Athena, AWS Glue, and Managed Workflows for Apache Airflow (Amazon MWAA). Santosh also led AWS Data Integration as the General Manager. Before joining AWS, Santosh lead engineering teams in networking, storage, and data infrastructure areas.

Serverless ICYMI Q4 2024

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/serverless-icymi-q4-2024/

Welcome to the 27th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. At the end of a quarter, we share the most recent product launches, feature enhancements, blog posts, webinars, live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, check out what happened in Q2 here.

Calendar showing October through December 2024

2024 Q4 calender

Serverless at re:Invent 2024

AWS re:Invent 2024 had 60,000 in-person attendees and 400,000 online viewers for the keynotes. The conference delivered 1,900 sessions from 3,500 speakers and included 546 AWS service and feature announcements.

The serverless content consisted of two tracks: Serverless (SVS) and App Integration (API). These tracks included 70 unique sessions and attracted nearly 11,000 attendees. Serverlesspresso, the coffee shop powered by serverless technology, operated in two locations during the event: the Expo Hall and the certification lounge.

Crowd of people standing around the AWS reI:nvent expo hall waiting to order coffee at the Serverlesspresso booth.

Serverlesspresso booth in the expo hall

Videos are available on Serverless Land YouTube.

AWS Lambda and Amazon Elastic Container Service (Amazon ECS) 10-year anniversary.

AWS marked significant milestones in serverless computing, celebrating 10 years of AWS Lambda and Amazon ECS. Lambda now serves over 1.5 million monthly customers and processes tens of trillions of requests each month. Amazon ECS launches more than 2.4 billion container tasks weekly and is used by over 65% of new AWS container customers.

AWS is commemorating this anniversary with insights from AWS Serverless Heroes, product leads, principal engineers, and AWS leadership sharing their perspectives on serverless evolution and future directions. These stories and insights are available at https://aws.amazon.com/serverless/10th-anniversary/.

AWS Lambda

The AWS Lambda team has spent a significant amount of time improving the Lambda development experience. Several enhancements have been made in the console as well as the local development experience.

Screen capture of the new AWS Lambda console with Code-OSS

Code-OSS as the new AWS Lambda inline editor

Lambda has launched a significant upgrade to its console by integrating Code-OSS, the open-source version of Visual Studio Code, delivering a familiar development experience directly in the cloud. The new Lambda Code Editor supports viewing larger function packages up to 50 MB, features a split-screen interface for simultaneous code editing and testing, and includes built-in Amazon Q Developer AI assistance for real-time coding suggestions. This enhancement comes at no additional cost and prioritizes accessibility with features like screen reader support and keyboard navigation. The update bridges the gap between cloud and local development by simplifying the process of downloading function code and AWS SAM templates, ultimately providing developers with a more streamlined and familiar serverless development experience. Watch the video explaining the changes in detail.

Additionally, the Lambda console enhances developer experience with two new features: a built-in CloudWatch Metrics Insights dashboard that surfaces key function metrics, and CloudWatch Logs Live Tail support for real-time log streaming and analysis, enabling faster troubleshooting without leaving the Lambda environment.

Screen capture of the new top 10 functions in the new AWS Lambda console

Top 10 Functions

Lambda now supports native JSON structured logging for .NET managed runtime applications, improving log searchability and analysis capabilities without requiring manual configuration of logging libraries.

Lambda has expanded its runtime support by adding Python 3.13 and Node.js 22 as both managed runtimes and container base images, providing access to the latest language features and ensuring long-term support through October 2029 and April 2027, respectively.

Lambda SnapStart capability is now available for Python and .NET runtimes, delivering sub-second startup performance for latency-sensitive applications by caching initialized execution environments.

Diagram of how SnapStart works compared to not having SnapStart

SnapStart support comparison

New CloudWatch metrics for Lambda Event Source Mappings provide enhanced visibility into event processing states for Amazon Simple Queue Service (SQS), Amazon Kinesis, and Amazon DynamoDB event sources, helping customers monitor and troubleshoot event processing issues.

Lambda introduces Provisioned Mode for Kafka event source mappings, allowing customers to optimize throughput by configuring dedicated event polling resources for applications with stringent performance requirements.

Finally, Lambda introduces an enhanced local development experience through the AWS Toolkit for Visual Studio Code, streamlining the serverless application development workflow. The update features a new Application Builder interface that guides developers through environment setup, offers sample applications, and provides quick-action buttons for common tasks like build, deploy, and invoke operations. Developers can now efficiently iterate on their code with features such as configurable build settings, step-through debugging, and the ability to sync local changes quickly to the cloud or perform full deployments. The toolkit integrates with AWS Infrastructure Composer for visual application building and includes comprehensive local testing capabilities with shareable test events. This enhancement simplifies the Lambda development process by enabling developers to author, test, debug, and deploy serverless applications without leaving their preferred IDE environment.

Screen capture of the getting started experience for serverless in a local IDE

Local IDE getting started

Amazon ECS and AWS Fargate

AWS enhances observability for containerized applications with CloudWatch Application Signals for Amazon ECS, adding infrastructure metrics correlation to existing traces and logs monitoring, enabling operators to identify and resolve performance issues across their application stack.

Amazon ECS adds service revision and deployment history tracking, allowing customers to monitor changes, track ongoing deployments, and debug deployment failures for long-running applications deployed after October 25, 2024.

A graph explaining the flow for service order and history

Service revisions and deployment history

Amazon ECS expands testing capabilities by supporting network fault injection experiments on AWS Fargate through AWS Fault Injection Service, enabling developers to verify application resilience using six different types of fault injection actions, including network disruptions and resource stress testing.

Amazon EventBridge

Amazon EventBridge announces significant performance improvements, reducing end-to-end latency by up to 94% from 2,235ms to 129.33ms at P99, enabling faster event processing for time-sensitive applications like fraud detection and gaming.

Amazon EventBridge and AWS Step Functions now integrate with private APIs through AWS PrivateLink and Amazon VPC Lattice, enabling secure connectivity between cloud and on-premises applications without custom networking code.

Screen capture of the Amazon EventBridge create connection screen showing the new Private option

Connections to Private APIs

EventBridge API destinations introduces proactive OAuth token refresh for public and private authorization endpoints, helping prevent delays and errors by automatically refreshing tokens before expiration.

AWS Step Functions

AWS Step Functions introduces the ability to export workflows as CloudFormation or SAM templates directly from the AWS console, enabling repeatable provisioning across accounts. Developers can export and customize templates from existing workflows, and use AWS Infrastructure Composer to visually connect workflows with other AWS resources.

Step Functions also adds Variables and JSONata support to enhance workflow development. Variables allow data assignment and reference between states, simplifying payload management, while JSONata provides advanced data transformation capabilities, including date formatting and mathematical operations. These features reduce the need for custom code and intermediate states, making it easier to build distributed serverless applications. Watch the in depth video to learn more.

Screen capture of AWS Step Function workflow studio using JSONata and variables in an example

JSONata and variables

Amazon Kinesis

Amazon Kinesis introduces significant updates to its client libraries. The new Kinesis Client Library (KCL) 3.0 reduces compute costs by up to 33% through enhanced load balancing, while the Kinesis Producer Library (KPL) 1.0 improves performance and security. Both libraries now support AWS SDK for Java 2.x and eliminate dependencies on SDK for Java 1.x, enabling seamless upgrades without requiring application code changes.

Screen capture of CPU usage metrics

KCL 3.0 metrics

Amazon MQ

Amazon MQ adds support for AWS PrivateLink, enabling customers to access Amazon MQ API endpoints directly from their VPC through interface VPC endpoints, eliminating the need for internet access and providing enhanced security through AWS’s internal network infrastructure.

Amazon Finch

AWS announces general availability of Linux support for Finch, an open source container development tool that simplifies building, running, and publishing Linux containers across all major operating systems. The release includes support for the Finch Daemon with Docker API compatibility and is available through RPM packages for Amazon Linux 2 and Amazon Linux 2023.

Amazon Simple Queue Service (SQS)

Amazon SQS increases the in-flight message limit for FIFO queues from 20,000 to 120,000 messages, enabling higher concurrent message processing. This enhancement allows customers to scale their receivers and process up to six times more messages simultaneously, provided they have sufficient publish throughput.

Amazon Managed Streaming for Apache Kafka(Amazon MSK)

Amazon MSK now introduces Managed Streaming for Apache Flink blueprints to simplify real-time AI application development. The service enables vector-embedding generation through Amazon Bedrock, streamlining the integration of streaming data with generative AI models. Using a straightforward configuration process, users can generate and index vector embeddings in Amazon OpenSearch, while leveraging LangChain’s data chunking capabilities for enhanced data retrieval efficiency. The service handles all integration aspects between MSK, embedding models, and Amazon OpenSearch vector stores.

AWS Amplify

AWS Amplify launches the Amplify AI kit for Amazon Bedrock, providing fullstack developers with tools to integrate AI capabilities into web applications. The kit includes a customizable React UI component, secure Bedrock access, and context-sharing features, enabling developers to implement chat, search, and summarization functionalities without machine learning expertise.

AWS AppSync

AWS AppSync launches AppSync Events, enabling developers to broadcast real-time data to multiple subscribers through serverless WebSocket APIs. The service eliminates the need to build and manage WebSocket infrastructure while providing secure, scalable event broadcasting capabilities. Developers can create APIs that automatically scale and integrate with services like Amazon EventBridge. The system supports features such as channel namespaces, event handlers, and multiple authorization modes, and is available in all regions where AWS AppSync operates. Users only pay for API operations and real-time connection minutes used.

Screen capture from the AWS AppSync console to create a new Event API.

Creating an AppSunc Event API

Amazon API Gateway

Amazon API Gateway released a significant enhancement to Amazon API Gateway, enabling customers to manage private REST APIs using custom private DNS names. This highly requested feature allows API providers to use user-friendly domain names like private.example.com, while maintaining TLS encryption for security. The implementation process involves creating a private custom domain, configuring certificates through AWS Certificate Manager (ACM), mapping private APIs, and setting resource policies. The feature supports cross-account sharing through AWS Resource Access Manager (AWS RAM) and is now available in all AWS Regions, including AWS GovCloud (US).

Serverless blog posts

October

November

Serverless Office Hours

Image from YouTube from the latest four Serverless Office Hours

Serverless office hours videos

October

November

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on X (formerly Twitter) to see the latest news, follow conversations, and interact with the team.

And finally, visit the Serverless Land  for all your serverless needs.

Top 6 game changers from AWS that redefine streaming data

Post Syndicated from Sai Maddali original https://aws.amazon.com/blogs/big-data/top-6-game-changers-from-aws-that-redefine-streaming-data/

Recently, AWS introduced over 50 new capabilities across its streaming services, significantly enhancing performance, scale, and cost-efficiency. Some of these innovations have tripled performance, provided 20 times faster scaling, and reduced failure recovery times by up to 90%. We have made it nearly effortless for customers to bring real-time context to AI applications and lakehouses.

In this post, we discuss the top six game changers that will redefine AWS streaming data.

Amazon MSK Express brokers: Kafka reimagined for AWS

AWS offers Express brokers for Amazon Managed Streaming for Apache Kafka (Amazon MSK)—a transformative breakthrough for customers needing high-throughput Kafka clusters that scale faster and cost less. With Express brokers, we are reimagining Kafka’s compute and storage decoupling to unlock performance and elasticity benefits. Express brokers offer up to three times more throughput than a comparable standard Apache Kafka broker, virtually unlimited storage, instant storage scaling, compute scaling in minutes vs. hours, and 90% faster recovery from failures compared to standard Kafka brokers. Customers can provision capacity in minutes without complex calculations, benefit from preset Kafka configurations, and scale capacity in a few clicks. Express brokers provide the same low-latency performance as standard Kafka, are 100% native Kafka, and offer key Amazon MSK features. There are no storage limits per broker and you only pay for the storage you use. With Express brokers for Amazon MSK, enterprises can expand their Kafka usage to support even more mission-critical use cases, while keeping both operational overhead and overall infrastructure costs low.

Amazon Kinesis Data Streams On-Demand: Scaling new heights

Amazon Kinesis Data Streams On-Demand makes it uncomplicated for developers to stream gigabytes per second of data without managing capacity or servers. Developers can create a new on-demand data stream or convert an existing data stream to on-demand mode with a single click. Kinesis Data Streams On-Demand now automatically scales to 10 GBps of write throughput and 200 GBps of read throughput per stream, a fivefold increase. Customers will automatically get this fivefold increase in scale without the need to take any action.

Streaming data to Iceberg tables in lakehouses

Enterprises are embracing lakehouses and open table formats such as Apache Iceberg to unlock value from their data. Amazon Data Firehose now supports seamless integration with Iceberg tables on Amazon Simple Storage Service (Amazon S3). Customers can stream data into Iceberg tables in Amazon S3 without any management overhead. Data Firehose compacts small files, minimizing storage inefficiencies and enhancing read performance. Data Firehose also handles schema changes while in flight, to provide consistency across evolving datasets. Because Data Firehose is fully managed and serverless, it scales seamlessly to handle high throughput streaming workloads, providing reliable and fast delivery of data. This capability also makes it straightforward to stream data stored in MSK topics and Kinesis data streams into Iceberg tables, potentially eliminating the need for custom extract, transform, and load (ETL) pipelines. Customers can now bring the power of real-time data to Iceberg tables without any additional effort—a paradigm shift for businesses. Additionally, Kinesis Data Firehose serves as a versatile bridge to stream real-time data from MSK clusters and Kinesis Data Streams into the newly launched Amazon S3 Tables and Amazon SageMaker Lakehouse. This unified approach facilitates more effective data management and analysis, supporting data-driven decision-making across the enterprise.

Unlocking the value of data stored in databases with change replication to Iceberg tables

Delivering database changes into Iceberg tables is emerging as a common pattern. Now in public preview, Data Firehose supports capturing changes made in databases such as PostgreSQL and MySQL and replicating the updates to Iceberg tables on Amazon S3. The integration uses change data capture (CDC) to continuously deliver database updates, eliminating manual processes and reducing operational overhead. Data Firehose automates tasks such as schema alignment and partitioning, making sure tables are optimized for analytics. With this new capability, customers can streamline their end-to-end data pipeline, allowing them to continually feed fresh data into an Iceberg table without needing to build a custom data pipeline.

Real-time context to generative AI applications

Customers tell us how they want to gain insights from generative AI by being able to bring their data to large language models (LLMs). They want to bring data as it’s generated to pre-trained models for more accurate and up-to-date responses. Amazon MSK provides a blueprint that allows customers to combine the context from real-time data with the powerful LLMs on Amazon Bedrock to generate accurate, up-to-date AI responses without writing custom code. Developers can configure the blueprint to generate vector embeddings using Amazon Bedrock embedding models, then index those embeddings in Amazon OpenSearch Service for data captured and stored in MSK topics. Customers can also improve the efficiency of data retrieval using built-in support for data chunking techniques from LangChain, an open source library, supporting high-quality inputs for model ingestion.

More cost-effective and reliable stream processing

AWS offers the Kinesis Client Library (KCL), an open source library, that simplifies the development of stream processing applications with Kinesis Data Streams. With KCL 3.0, customers can reduce compute costs to process streaming data by up to 33% compared to previous KCL versions. KCL 3.0 introduces an enhanced load balancing algorithm that continuously monitors the resource utilization of the stream processing workers and automatically redistributes the load from over-utilized workers to underutilized workers. These changes also enhance scalability and the overall efficiency of processing large volumes of streaming data. We have also made improvements to our Amazon Managed Service for Apache Flink. We offer the latest Flink versions on Amazon Managed Service for Apache Flink for customers to benefit from the latest innovations. Customers can also upgrade their existing applications to use new Flink versions with a new in-place version upgrade feature. Amazon Managed Service for Apache Flink now offers per-second billing, so customers can run their Flink applications for a short period and only pay for what they use, down to the nearest second.

Conclusion

AWS has made new innovations in data streaming services, bringing compelling value to customers on performance, scalability, elasticity, and ease of use. These advancements empower businesses to use real-time data more effectively, which modernizes the way for the next generation of data-driven applications and analytics. It is still Day 1!


About the authors

Sai Maddali is a Senior Manager Product Management at AWS who leads the product team for Amazon MSK. He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

Bill Crew is a Senior Product Marketing Manager. He is the lead marketer for Streaming and Messaging Services at AWS. Including Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Managed Service for Apache Flink, Amazon Data Firehose, Amazon Kinesis Data Streams, Amazon Message Broker (Amazon MQ), Amazon Simple Queue Service (Amazon SQS), and Amazon Simple Notification Services (Amazon SNS). Besides work, he enjoys collecting vintage vinyl records.

Introducing new Event Source Mapping (ESM) metrics for AWS Lambda

Post Syndicated from Chris McPeek original https://aws.amazon.com/blogs/compute/introducing-new-event-source-mapping-esm-metrics-for-aws-lambda/

This post is written by Tarun Rai Madan, Principal Product Manager –  Serverless, and Rajesh Kumar Pandey, Principal Software Engineer, Serverless

Today, AWS is announcing new opt-in Amazon CloudWatch metrics for AWS Lambda Event Source Mappings that subscribe to Amazon Simple Queue Service (Amazon SQS), Amazon Kinesis, and Amazon DynamoDB event sources. These metrics include PolledEventCount, InvokedEventCount, FilteredOutEventCount, FailedInvokeEventCount, DeletedEventCount, DroppedEventCount, and OnFailureDestinationDeliveredEventCount. The new metrics enable customers to monitor the processing state of events read by Event Source Mappings (ESMs), and helps them diagnose processing issues.

Previously, customers found it challenging to monitor the processing state of events read by an ESM. An ESM is a resource that polls events from an event source and invokes a Lambda function. With the new metrics for ESMs, you can count events by their processing state, which includes events that were polled, invoked, filtered out, deleted, dropped, failed, or sent to on-failure destination.

Overview

Customers building modern event-driven applications use services like SQS, Kinesis, and DynamoDB as fundamental building blocks for developing decoupled architectures, and use a Lambda function as a consumer to benefit from its simplicity, auto-scaling and cost effectiveness. To subscribe to an event source, customers configure a Lambda Event Source Mapping (ESM). An ESM is a fully-managed Lambda resource that runs an event poller which polls, processes (e.g., filters and batches), and delivers the events to a Lambda function. Due to the processing that happens on an ESM, for example, filtering, batching, and delivery to on-failure destinations, events can end up in varying terminal states. As a result, some polled events may not invoke a Lambda function. Previously, the count of polled, filtered, invoked, deleted or dropped events was not visible to customers. This made it challenging for customers to diagnose processing issues with their ESM, resulting from faulty permissions, misconfiguration, or function errors.

What’s new

With today’s announcement, customers can opt-in to CloudWatch metrics to monitor the processing state of events that are read by an ESM configured with SQS, Kinesis and DynamoDB as event sources.

PolledEventCount metric counts the number of events read by an ESM from the event source.

InvokedEventCount metric counts the number of events that invoked your Lambda function. For an event that experiences function errors, this metric may increase the count multiple times for the same polled event, due to retries.

FilteredOutEventCount metric counts the number of events filtered out by your ESM, based on the Filter Criteria defined by you.

FailedInvokeEventCount metric counts the number of events that attempted to invoke a Lambda function, but encountered partial or complete failure.

DeletedEventCount metric counts the events that have been deleted from the SQS queue by Lambda upon successful processing.

DroppedEventCount metric counts the number of events dropped due to event expiry or exhaustion of retry attempts, for Kinesis and DynamoDB ESMs configured with MaximumRecordAgeInSeconds or MaximumRetryAttempts.

OnFailureDestinationDeliveredEventCount metric counts the events sent to an on-failure destination upon reaching the MaximumRecordAgeInSeconds or MaximumRetryAttempts, for ESMs configured with DestinationConfig.

How to use the new ESM metrics

Once an ESM is created and reaches enabled state, it continuously polls the event source for new events. You can monitor the PolledEventCount metric to catch issues with polling due to misconfigured or deleted event source, misconfigured or deleted Lambda function execution role, incorrect permissions, or throttles from the event source. This metric typically increases when there is an increase in traffic in the event source. You can observe the InvokedEventCount metric to catch issues with the Lambda function, and whether the events are properly invoking your Lambda function. In case of Lambda function errors, InvokedEventCount could be more than PolledEventCount due to retries. This metric would also increase when there is an increase in events processed by an ESM. For ESMs that have filter criteria configured, you can monitor the FilteredOutEventCount to count events that were not sent to a Lambda function because they were filtered out per the defined filter criteria.

You can monitor the FailedInvokeEventCount metric to observe the number of events that failed processing when Lambda service tried to invoke your Lambda function. Invocations can fail due to network configuration issues, incorrect permissions, or a deleted Lambda function, version, or alias. If your event source mapping has partial batch responses enabled, this metric includes any event with a non-empty BatchItemFailures in the response. If all events in a batch are successfully processed by your Lambda function, Lambda service emits a 0 value for this metric. You can use the DeletedEventCount metric to ensure that processed events have been successfully deleted from your SQS queue after being processed by the Lambda function. You can use the DroppedEventCount metric to identify issues with message backlogs or misconfigured event expiry rules. You can use the OnFailureDestinationDeliveredEventCount metric to monitor issues such as failed events caused by Lambda function invocation errors.

The classification for available Lambda ESM metrics by event source is presented below:

CloudWatch metric SQS DynamoDB Kinesis Data Stream
PolledEventCount
InvokedEventCount
FilteredOutEventCount
FailedInvokeEventCount
DeletedEventCount
DroppedEventCount
OnFailureDestinationDeliveredEventCount

Activating and testing the new ESM metrics

You can enable the new ESM metrics using AWS Lambda Console, AWS Command Line Interface (CLI), Lambda ESM API, AWS SDK, AWS CloudFormation, and AWS Serverless Application Model (SAM). The metrics will be published under the AWS/Lambda namespace and EventSourceMappingUUID dimension in the CloudWatch console. To learn more, see CloudWatch metrics for Lambda.

Using AWS CLI

To turn on the new metrics using AWS CLI, use the –metrics-config parameter.

aws lambda create-event-source-mapping \
    --region <region-name> \
    --function-name <function-name> \
    --event-source-arn <event-source-arn> \
    --metrics-config '{"Metrics": ["EventCount"]}'

Using AWS Lambda Console

To turn on the new metrics using AWS Lambda Console, click on “Enable metrics” while adding the trigger for your function.

Enabling ESM metrics in AWS Console.

Figure 1: Enabling ESM metrics in AWS Console

A typical scenario where the new ESM metrics can help with better observability is an ESM that uses event filtering. To test the ESM metrics, you can deploy a sample Lambda application with Kinesis as an event source using this serverless pattern, which uses event filtering with a certain criteria to control which events are sent to Lambda. Use this pattern for both the example scenarios; please follow the setup guidelines for this pattern and continue with testing for the scenarios. Running this sample project in your account may incur charges. See AWS Lambda pricing and Amazon Kinesis pricing.

Configuring Lambda function with Kinesis event source.

Figure 2: Configuring Lambda function with Kinesis event source

Example scenario 1: ESM metrics with event filtering configured

The following diagram shows the results for the test scenario with Kinesis ESM, where the total polled events, filtered events, invoked events, and failed events are represented by PolledEventCount, FilteredOutEventCount, InvokedEventCount and FailedInvokeEventCount.

Image of ESM metrics for scenario 1.

Figure 3: ESM metrics for scenario 1

Example Scenario 2: ESM metrics with event filtering and On-Failure Destination configured

Another common scenario is where you want to have visibility around the number of events delivered to Lambda function, events filtered, and additionally, the count of events routed to on-failure destination upon failure. To test this scenario, follow a setup similar to the one in scenario 1. Create or update the ESM with an on-failure destination, and set MaximumRetryCount to 1, as shown below.

aws lambda update-event-source-mapping \
    --uuid <event-source-mapping-uuid> \
    --maximum-retry-attempts 1 \
    --filter-criteria '{"Filters": [{"Pattern": "{\"data\": { \"tire_pressure\": [ { \"numeric\": [ \"<\", 32 ] } ] } }"}]}' \
    --destination-config '{"OnFailure": {"Destination": "<your_SQS_queue_ARN>"}}' \
    --function-name <lambda-function-name>

Publish a sample payload which matches the FilterCriteria defined above. Also generate sample data with different “tire_pressure” < 32 to match the event and invoke the Lambda function.

Sample Data:

{
    "time": "2021-11-09 13:32:04",
    "fleet_id": "fleet-452",
    "vehicle_id": "a42bb15c-43eb-11ec-81d3-0242ac130003",
    "lat": 47.616226213162406,
    "lon": -122.33989110734133,
    "speed": 43,
    "odometer": 43519,
    "tire_pressure": [41, 40, 31, 41],
    "weather_temp": 76,
    "weather_pressure": 1013,
    "weather_humidity": 66,
    "weather_wind_speed": 8,
    "weather_wind_dir": "ne"
}

Once you have published these records to the stream, you should be able to see the CloudWatch metrics under AWS/Lambda namespace with the EventSourceMappingUUID dimension, as shown below. Note that if an event experiences a function error, InvokedEventCount may increase multiple times for the same polled event due to automatic retries.

Image of ESM metrics for scenario 2.

Figure 4: ESM metrics for scenario 2

Available Now

The new ESM metrics are generally available in all commercial regions that Lambda service is available in. Support is also available through AWS Lambda partners like Datadog, Elastic, and Lumigo. The Lambda service sends these new metrics to CloudWatch at no additional cost to you. However, charges apply for CloudWatch metrics at standard CloudWatch metrics pricing for these opt-in metrics, in addition to your AWS Lambda pricing and event source pricing.

Conclusion

With these new CloudWatch metrics, you can gain visibility into the processing state of your events that are polled by Lambda Event Source Mapping (ESM) for queue-based or stream-based applications. The blog explains the new metrics PolledEventCount, InvokedEventCount, FilteredOutEventCount, FailedInvokeEventCount, DeletedEventCount, DroppedEventCount, and OnFailureDestinationDeliveredEventCount, and how to use them to troubleshoot event processing issues for Lambda functions. These metrics help you track the invocation requests sent to Lambda via an ESM, monitor any delays or issues in processing, and take corrective actions if required. To learn more about these metrics, visit Lambda developer guide.

For more serverless learning resources, visit Serverless Land.

AWS Weekly Roundup: 20 years of AWS News Blog, Express brokers for Amazon MSK, Windows Server 2025 images on EC2, and more (Nov 11, 2024)

Post Syndicated from Channy Yun (윤석찬) original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-20-years-of-aws-news-blog-express-brokers-for-amazon-msk-windows-server-2025-images-on-ec2-and-more-nov-11-2024/

Happy 20th Anniversary of the AWS News Blog! 🎉🥳🎊 On November 9, 2004, Jeff Barr published his first blog post. At the time, he started a personal blog site using TypePad. He wanted to speak to his readers with his personal voice, not the company or team.

On April 29, 2014, we created a new AWS blog site and migrated all posts to that page. There are currently over 4,300 posts on the AWS News Blog, with Jeff contributing over 3,200 of them.

Since December 2016, the AWS News Blog has added new writers, but we are still following Jeff’s leadership principals for AWS News Bloggers in accordance with Day One. What’s unique about the AWS News Blog is that the blog writers get to use the features of the product team in advance, following the Customer Obsession leadership principle, and focus on walk-throughs of how customers can quickly use them to save time, with the Frugality principle.

I am very grateful for Jeff’s fundamental and pivotal role over the past 20 years, and I look forward to the next 20 years!

Last week’s launches
Here are some launches that got my attention:

New Express brokers for Amazon MSK – Express brokers are a new broker type for Amazon MSK Provisioned designed to deliver up to three times more throughput per broker, scale up to 20 times faster, and reduce recovery time by 90 percent as compared to standard Apache Kafka brokers. Express brokers come preconfigured with Kafka best practices by default, support all Kafka APIs, and provide the same low-latency performance, so you can continue using existing client applications without any changes.

New Amazon Kinesis Client Library 3.0 – You can now reduce compute costs to process streaming data by up to 33 percent with Kinesis Client Library (KCL) 3.0, compared to previous KCL versions. KCL 3.0 introduces an enhanced load balancing algorithm that continuously monitors resource utilization of the stream processing workers and automatically redistributes the load from overutilized workers to other underutilized workers. To learn more, read the AWS Big Data Blog post.

Microsoft Windows Server 2025 images on Amazon EC2 – We now support Microsoft Windows Server 2025 with License Included (LI) Amazon Machine Images (AMIs), providing customers with an easy and flexible way to launch the latest version of Windows Server. By running Windows Server 2025 on Amazon EC2, customers can take advantage of the security, performance, and reliability of AWS with the latest Windows Server features. To learn more about running Windows Server 2025 on Amazon EC2, visit Windows Workloads on AWS.

Anthropic’s Claude 3.5 Haiku model in Amazon Bedrock – Claude 3.5 Haiku is the next generation of Anthropic’s fastest model, combining rapid response times with improved reasoning capabilities, making it ideal for tasks that require both speed and intelligence. Claude 3.5 Haiku improves across every skill set and surpasses even Claude 3 Opus, the largest model in Anthropic’s previous generation, on many intelligence benchmarks—including coding. To learn more, read the AWS News Blog post.

Amazon Bedrock Prompt Management GA – You can simplify the creation, testing, versioning, and sharing of prompts in Amazon Bedrock Prompt Management. At general availability, we added new features that provide enhanced options for configuring your prompts and enabling seamless integration for invoking them in your generative AI applications, such as structured prompts and Converse and InvokeModel API integration. To learn more, read the AWS Machine Learning blog post.

Six new synthetic generative voices for Amazon Polly – The generative engine is Amazon Polly’s most advanced text-to-speech (TTS) model leveraging the generative AI technology. We added six new synthetic female-sounding generative voices: Ayanda (South African English), Léa (French), Lucia (European Spanish), Lupe (American Spanish), Mía (Mexican Spanish), and Vicki (German). This extends thirteen voices and nine locales to provide you with more options of highly expressive and engaging voices.

Amazon OpenSearch Service Extended Support – We announce the end of Standard Support and Extended Support timelines for legacy Elasticsearch versions and OpenSearch Versions. Standard Support ends on Nov 7, 2025, for legacy Elasticsearch versions up to 6.7, Elasticsearch versions 7.1 through 7.8, OpenSearch versions from 1.0 through 1.2, and OpenSearch versions 2.3 through 2.9. With Extended Support, for an incremental flat fee over regular instance pricing, you continue to get critical security updates beyond the end of Standard Support. To learn more, read the AWS Big Data Blog post.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS news
Here are some additional news items that you might find interesting:

CEO’s visiting at AWS data center – Matt Garman, CEO of AWS, had a great time visiting one of our AWS data centers recently, and was able to get a look at the continuous innovation delivered by the team. Of course, it’s no surprise that Amazon’s senior executives visit fulfillment centers, contact centers, and data centers, to do real work for customers. AWS data centers are designed for customers in every aspect, for maximum resilience, performance, and energy efficiency.

AWS supports small businesses, creates jobs, sets up sustainability initiatives, and develops educational programs near AWS data centers. Get the latest updates – AWS in your community: Here’s what’s happening near data centers across the US on About Amazon News.

Amazon Q Business at Amazon – I introduced an Amazon story to use Code transformation in Amazon Q Developer to migrate more than old 30,000 Java applications to Java 17 version. It saved over 4,500 developer years of effort compared to previous manual jobs and saved the company $260 million in annual by moving to the latest Java version.

Here is another dogfooding story of Amazon Q Business at Amazon. Amazon built an internal chatbot with Amazon Q Business and it has resolved over 1 million internal Amazon developer questions, reducing time spent churning on manual technical investigations by more than 450,000 hours.

Our team onboarded Amazon Q Business with millions of internal documents and integrated Q Business into the tools our team use every day. Now, instead of waiting hours for responses to complex technical questions on Q&A boards or Slack channels, developers can get answers in seconds.

TOURCast at PGA TOUR – If you enjoy golf, this news will be of interest to you. The PGA TOUR debuted TOURCast in Japan at the 2024 ZOZO Championship to capture and disseminate better statistical data and bring fans closer to the game based on new scoring system called ShotLink, powered by CDW. This marks the first time the PGA TOUR has been able to bring this technology to Asia, leveraging the flexibility and scalability of AWS to overcome unique challenges.


PGA TOUR volunteer setting up GPS equipment on the fairway at ZOZO championship that will input specific shot data and feed back to Shotlink Select Plus. [IMAGE: PGA TOUR]

They’ve completely rebuilt their scoring system over the past two years on a new cloud stack. With AWS cloud, whether data comes from high-tech radar systems, cameras, or manual input, the system processes it all seamlessly.

Upcoming AWS events
Check your calendars and sign up for these AWS events:

AWS GenAI LoftsAWS GenAI Lofts are about more than just the tech, they bring together startups, developers, investors, and industry experts. Whether you’re looking to gain deep insights, or get your questions answered by generative AI pros, our GenAI Lofts have you covered, and provide everything you need to start building your next innovation. Join events in São Paulo (through November 20), and Paris (through November 25).

AWS Community Days – Join community-led conferences that feature technical discussions, workshops, and hands-on labs led by expert AWS users and industry leaders from around the world: Jakarta, Indonesia (November 23), Kochi, India (December 14).

AWS re:Invent – You can still register for the annual learning event, taking place December 2–6 in Las Vegas. Surprisingly Andy Jassy, CEO of Amazon said he will come back and participate in AWS re:Invent this year. He said “As always, the priority is to make this a learning event so customers can take nuggets back and change their own customer experiences and businesses. We’ll also have a bunch of goodies for you that we’ll announce and that we think folks will like.” Let’s meet there!

You can browse all upcoming in-person and virtual events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

Channy

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Reduce your compute costs for stream processing applications with Kinesis Client Library 3.0

Post Syndicated from Minu Hong original https://aws.amazon.com/blogs/big-data/reduce-your-compute-costs-for-stream-processing-applications-with-kinesis-client-library-3-0/

Amazon Kinesis Data Streams is a serverless data streaming service that makes it straightforward to capture and store streaming data at any scale. Kinesis Data Streams not only offers the flexibility to use many out-of-box integrations to process the data published to the streams, but also provides the capability to build custom stream processing applications that can be deployed on your compute fleet.

When building custom stream processing applications, developers typically face challenges with managing distributed computing at scale that is required to process high throughput data in real time. This is where Kinesis Client Library (KCL) comes in. Thousands of AWS customers use KCL to operate custom stream processing applications with Kinesis Data Streams without worrying about the complexities of distributed systems. KCL uses Kinesis Data Streams APIs to read data from the streams and handles the heavy lifting of balancing stream processing across multiple workers, managing failovers, and checkpointing processed records. By abstracting away these concerns, KCL allows developers to focus on what matters most—implementing their core business logic for processing streaming data.

As applications process more and more data over time, customers are looking to reduce the compute costs for their stream processing applications. We are excited to launch Kinesis Client Library 3.0, which enables you to reduce your stream processing cost by up to 33% compared to previous KCL versions. KCL 3.0 achieves this with a new load balancing algorithm that continuously monitors the resource utilization of workers and redistributes the load evenly to all workers. This allows you to process the same data with fewer compute resources.

In this post, we discuss load balancing challenges in stream processing using a sample workload, demonstrating how uneven load distribution across workers increases processing costs. We then show how KCL 3.0 addresses this challenge to reduce compute costs, and walk you through how to effortlessly upgrade from KCL 2.x to 3.0. Additionally, we cover additional benefits that KCL 3.0 provides. This includes using the AWS SDK for Java 2.x and removing the dependency on the AWS SDK for Java v1.x. Lastly, we provide a key checklist as you prepare to upgrade your stream processing application to use KCL 3.0.

Load balancing challenges with operating custom stream processing applications

Customers processing real-time data streams typically use multiple compute hosts such as Amazon Elastic Compute Cloud (Amazon EC2) to handle the high throughput in parallel. In many cases, data streams contain records that must be processed by the same worker. For example, a trucking company might use multiple EC2 instances, each running one worker, to process streaming data with real-time location coordinates published from thousands of vehicles. To accurately keep track of routes of vehicles, each truck’s location needs to be processed by the same worker. For such applications, customers specify the vehicle ID as a partition key for every record published to the data stream. Kinesis Data Streams writes data records belonging to the same partition key to a single shard (the base throughput unit of Kinesis Data Streams) so that they can be processed in order.

However, data in the stream is often unevenly distributed across shards due to varying traffic associated with partition keys. For instance, some vehicles may send more frequent location updates when operational, whereas others send less frequent updates when idle. With previous KCL versions, each worker in the stream processing application processed an equal number of shards in parallel. As a result, workers processing data-heavy shards might reach their data processing limits, whereas those handling lighter shards remain underutilized. This workload imbalance presents a challenge for customers seeking to optimize their resource utilization and stream processing efficiency.

Let’s look at a sample workload with uneven traffic across shards in the stream to elaborate how this leads to uneven utilization of the compute fleet with KCL 2.6, and why it results in higher costs.

In the sample workload, the producer application publishes 2.5MBps of data across four shards. However, two shards receive 1MBps each and the other two receive 0.25MBps based on the traffic pattern associated with partition keys. In our trucking company example, you can think of it as two shards storing data from actively operating vehicles and the other two shards storing data from idle vehicles. We used three EC2 instances, each running one worker, to process this data with KCL 2.6 for this sample workload.

Initially, the load was distributed across three workers with the CPU utilizations of 50%, 50%, and 25%, averaging 42% (as shown in the following figure in the 12:18–12:29 timeframe). Because the EC2 fleet is under-utilized, we removed one EC2 instance (worker) from the fleet to operate with two workers for better cost-efficiency. However, after we removed the worker (red vertical dotted line in the following figure), the CPU utilization of one EC2 instance went up to almost 100%.

This occurs because KCL 2.6 and earlier versions distribute the load to make sure each worker processes the same number of shards, regardless of throughput or CPU utilization of workers. In this scenario, one worker processed two high-throughput shards, reaching 100% CPU utilization, and another worker handled two low-throughput shards, operating at only 25% CPU utilization.

Due to this CPU utilization imbalance, the worker compute fleet can’t be scaled down because it can lead to processing delays due to over-utilization of some workers. Even though the entire fleet is under-utilized in aggregate, uneven distribution of the load prevents us from downsizing the fleet. This increases compute costs of the stream processing application.

Next, we explore how KCL 3.0 addresses these load balancing challenges.

Load balancing improvements with KCL 3.0

KCL 3.0 introduces a new load balancing algorithm that monitors CPU utilization of KCL workers and rebalances the stream processing load. When it detects a worker approaching data processing limits or high variance in CPU utilization across workers, it redistributes the load from over-utilized to underutilized workers. This balances the stream processing load across all workers. As a result, you can avoid over-provisioning of capacity due to imbalanced CPU utilization among workers and save costs by right-sizing your compute capacity.

The following figure shows the result for KCL 3.0 with the same simulation settings we had with KCL 2.6.

With three workers, KCL 3.0 initially distributed the load similarly to KCL 2.6, resulting in 42% average CPU utilization (20:35–20:55 timeframe). However, when we removed one worker (marked with the red vertical dotted line), KCL 3.0 rebalanced the load from one worker to other two workers considering the throughput variability in shards, not just equally distributing shards based on the number of shards. As a result, two workers ended up running at about 65% CPU utilization, allowing us to safely scaling down the compute capacity without any performance risk.

In this scenario, we were able to reduce the compute fleet size from three workers to two workers, resulting in 33% reduction in compute costs compared to KCL 2.6. Although this is a sample workload, imagine the potential savings you can achieve when streaming gigabytes of data per second with hundreds of EC2 instances processing them! You can realize the same cost saving benefit for your KCL 3.0 applications deployed in containerized environments such as Amazon Elastic Container Service (Amazon ECS), Amazon Elastic Kubernetes Service (Amazon EKS), AWS Fargate, or your own self-managed Kubernetes clusters.

Other benefits in KCL 3.0

In addition to the stream processing cost savings, KCL 3.0 offers several other benefits:

  • Amazon DynamoDB read capacity unit (RCU) reduction – KCL 3.0 reduces the Amazon DynamoDB cost associated with KCL by optimizing read operations on the DynamoDB table storing metadata. KCL uses DynamoDB to store metadata such as shard-worker mapping and checkpoints.
  • Graceful handoff of shards from one worker to another – KCL 3.0 minimizes reprocessing of data when the shard processed by one worker is handed over to another worker during the rebalancing or during deployments. It allows the current worker to complete checkpointing the records that it has processed and the new worker taking over the work from the previous worker to pick up from the latest checkpoint.
  • Removal of the AWS SDK for Java 1.x dependency – KCL 3.0 has completely removed the dependency on the AWS SDK for Java 1.x, aligning with the AWS recommendation to use the latest SDK versions. This change improves overall performance, security, and maintainability of KCL applications. For details regarding AWS SDK for Java 2.x benefits, refer to Use features of the AWS SDK for Java 2.x.

Migrating to KCL 3.0

You may now be wondering how to migrate to KCL 3.0 and what code changes you’ll need to make to take advantage of its benefits. If you’re currently on KCL 2.x version, you don’t have to make any changes to your application code! Complete the following steps to migrate to KCL 3.0:

  1. Update your Maven (or build environment) dependency to KCL 3.0.
  2. Set the clientVersionConfig to CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X.
  3. Build and deploy your code.

After all KCL workers are updated, KCL 3.0 automatically starts running the new load balancing algorithm to achieve even utilization of the workers. For detailed migration instructions, see Migrating from previous KCL versions.

Key checklists when you choose to use KCL 3.0

We recommend checking the following when you decide to use KCL 3.0 for your stream processing application:

  • Make sure you added proper permissions required for KCL 3.0. KCL 3.0 creates and manages two new metadata tables (worker metrics table, coordinator state table) and a global secondary index on the lease table in DynamoDB. See IAM permissions required for KCL consumer applications for detailed permission settings you need to add.
  • The new load balancing algorithm introduced in KCL 3.0 aims to achieve even CPU utilizations across workers, not an equal number of leases per worker. Setting the maxLeasesForWorker configuration too low may limit the KCL’s ability to balance the workload effectively. If you use the maxLeasesForWorker configuration, consider increasing its value to allow for optimal load distribution.
  • If you use automatic scaling for your KCL application, it’s important to review your scaling policy after upgrading to KCL 3.0. Specifically, if you’re using average CPU utilization as a scaling threshold, you should reassess this value. If you’re conservatively using a higher-than-needed threshold value to make sure your stream processing application won’t have some workers running hot due to the imbalanced load balancing, you might be able to adjust this now. KCL 3.0 introduces improved load balancing, which results in more evenly distributed workloads across workers. After deploying KCL 3.0, monitor your workers’ CPU utilization and see if you can lower your scaling threshold to optimize your resource usage and costs while maintaining performance. This step makes sure you’re taking full advantage of KCL 3.0’s enhanced load balancing capabilities.
  • To gracefully hand off leases, make sure you have implemented a checkpointing logic inside your shutdownRequested() method in the RecordProcessor class. Refer to Step 4 of Migrating from KCL 2.x to KCL 3.x for details.

Conclusion

The release of KCL 3.0 introduces significant enhancements that can help optimize the cost-efficiency and performance of KCL applications. The new load balancing algorithm enables more even CPU utilization across worker instances, potentially allowing for right-sized and more cost-effective stream processing fleets. By following the key checklists, you can take full advantage of KCL 3.0’s features to build efficient, reliable, and cost-optimized stream processing applications with Kinesis Data Streams.


About the Authors

Minu Hong is a Senior Product Manager for Amazon Kinesis Data Streams at AWS. He is passionate about understanding customer challenges around streaming data and developing optimized solutions for them. Outside of work, Minu enjoys traveling, playing tennis, skiing, and cooking.

Pratik Patel is a Senior Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively helps in keeping customers’ AWS environments operationally healthy.

Priyanka Chaudhary is a Senior Solutions Architect and data analytics specialist. She works with AWS customers as their trusted advisor, providing technical guidance and support in building Well-Architected, innovative industry solutions.

Stream real-time data into Apache Iceberg tables in Amazon S3 using Amazon Data Firehose

Post Syndicated from Diego Garcia Garcia original https://aws.amazon.com/blogs/big-data/stream-real-time-data-into-apache-iceberg-tables-in-amazon-s3-using-amazon-data-firehose/

As businesses generate more data from a variety of sources, they need systems to effectively manage that data and use it for business outcomes—such as providing better customer experiences or reducing costs. We see these trends across many industries—online media and gaming companies providing recommendations and customized advertising, factories monitoring equipment for maintenance and failures, theme parks providing wait times for popular attractions, and many others.

To build such applications, engineering teams are increasingly adopting two trends. First, they’re replacing batch data processing pipelines with real-time streaming, so applications can derive insight and take action within seconds instead of waiting for daily or hourly batch exchange, transform, and load (ETL) jobs. Second, because traditional data warehousing approaches are unable to keep up with the volume, velocity, and variety of data, engineering teams are building data lakes and adopting open data formats such as Parquet and Apache Iceberg to store their data. Iceberg brings the reliability and simplicity of SQL tables to Amazon Simple Storage Service (Amazon S3) data lakes. By using Iceberg for storage, engineers can build applications using different analytics and machine learning frameworks such as Apache Spark, Apache Flink, Presto, Hive, or Impala, or AWS services such as Amazon SageMaker, Amazon Athena, AWS Glue, Amazon EMR, Amazon Managed Service for Apache Flink, or Amazon Redshift.

Iceberg is popular because first, it’s widely supported by different open-source frameworks and vendors. Second, it allows customers to read and write data concurrently using different frameworks. For example, you can write some records using a batch ETL Spark job and other data from a Flink application at the same time and into the same table. Third, it allows scenarios such as time travel and rollback, so you can run SQL queries on a point-in-time snapshot of your data, or rollback data to a previously known good version. Fourth, it supports schema evolution, so when your applications evolve, you can add new columns to your tables without having to rewrite data or change existing applications. To learn more, see Apache Iceberg.

In this post, we discuss how you can send real-time data streams into Iceberg tables on Amazon S3 by using Amazon Data Firehose. Amazon Data Firehose simplifies the process of streaming data by allowing users to configure a delivery stream, select a data source, and set Iceberg tables as the destination. Once set up, the Firehose stream is ready to deliver data. Firehose is integrated with over 20 AWS services, so you can deliver real-time data from Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka, Amazon CloudWatch Logs, AWS Internet of Things (AWS IoT), AWS WAF, Amazon Network Firewall Logs, or from your custom applications (by invoking the Firehose API) into Iceberg tables. It’s cost effective because Firehose is serverless, you only pay for the data sent and written to your Iceberg tables. You don’t have to provision anything or pay anything when your streams are idle during nights, weekends, or other non-use hours.

Firehose also simplifies setting up and running advanced scenarios. For example, if you want to route data to different Iceberg tables to have data isolation or better query performance, you can set up a stream to automatically route records into different tables based on what’s in your incoming data and distribute records from a single stream into dozens of Iceberg tables. Firehose automatically scales—so you don’t have to plan for how much data goes into which table—and has built-in mechanisms to handle delivery failures and guarantee exactly once delivery. Firehose supports updating and deleting records in a table based on the incoming data stream, so you can support scenarios such as GDPR and right-to-forget regulations. Because Firehose is fully compatible with Iceberg, you can write data using it and simultaneously use other applications to read and write to the same tables. Firehose integrates with the AWS Glue Data Catalog, so you can use features in AWS Glue such as managed compaction for Iceberg tables.

In the following sections, you’ll learn how to set up Firehose to deliver real-time streams into Iceberg tables to address four different scenarios:

  1. Deliver data from a stream into a single Iceberg table and insert all incoming records.
  2. Deliver data from a stream into a single Iceberg table and perform record inserts, updates, and deletes.
  3. Route records to different tables based on the content of the incoming data by specifying a JSON Query expression.
  4. Route records to different tables based on the content of the incoming data by using a Lambda function.

You will also learn how to query the data you have delivered to Iceberg tables using a standard SQL query in Amazon Athena. All of the AWS services used in these examples are serverless, so you don’t have to provision and manage any infrastructure.

Solution overview

The following diagram illustrates the architecture.

In our examples, we use Kinesis Data Generator, a sample application to generate and publish data streams to Firehose. You can also set up Firehose to use other data sources for your real-time streams. We set up Firehose to deliver the stream into Iceberg tables in the Data Catalog.

Walkthrough

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. The template performs the following operations:

  • Creates a Data Catalog database for the destination Iceberg tables
  • Creates four tables in the AWS Glue database that are configured to use the Apache Iceberg format
  • Specifies the S3 bucket locations for the destination tables
  • Creates a Lambda function (optional)
  • Sets up an AWS Identity and Access Management (IAM) role for Firehose
  • Creates resources for Kinesis Data Generator

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account. If you don’t have an account, you can create one.

Deploy the solution

The first step is to deploy the required resources into your AWS environment by using a CloudFormation template.

  1. Sign in to the AWS Management Console for CloudFormation.
  2. Choose Launch Stack.
    Launch Cloudformation Stack
  3. Choose Next.
  4. Leave the stack name as Firehose-Iceberg-Stack, and in the parameters, enter the username and password that you want to use for accessing Kinesis Data Generator.
  5. Go to the bottom of the page and select I acknowledge that AWS CloudFormation might create IAM resources and choose Next.

  6. Review the deployment and choose Submit.

The stack can take 5–10 minutes to complete, after which you can view the deployed stack on the CloudFormation console. The following figure shows the deployed Firehose-Iceberg-stack details.

Before you set up Firehose to deliver streams, you must create the destination tables in the Data Catalog. For the examples discussed here, we use the CloudFormation template to automatically create the tables used in the examples. For your custom applications, you can create your tables using CloudFormation, or by using DDL commands in Athena or Glue. The following is the DDL command for creating a table used in our example:

CREATE TABLE firehose_iceberg_db.firehose_events_1 (
type struct<device: string, event: string, action: string>,
customer_id string,
event_timestamp timestamp,
region string)
LOCATION 's3://firehose-demo-iceberg-<account_id>-<region>/iceberg/events_1'
TBLPROPERTIES (
'table_type'='iceberg',
'write_compression'='zstd'
);

Also note that the four tables that we use in the examples have the same schema, but you can have tables with different schemas in your application.

Use case 1: Deliver data from a stream into a single Iceberg table and insert all incoming records

Now that you have set up the source for your data stream and the destination tables, you’re ready to set up Firehose to deliver streams into the Iceberg tables.

Create a Firehose stream:

  1. Go to the Data Firehose console and choose Create Firehose stream.
  2. Select Direct PUT as the Source and Apache Iceberg Tables as the Destination.

This example uses Direct PUT as the source, but the same steps can be applied for other Firehose sources such as Kinesis Data Streams, and Amazon MSK.

  1. For the Firehose stream name, enter firehose-iceberg-events-1.
  2. In Destination settings, enable Inline parsing for routing information. Because all records from the stream are inserted into a single destination table, you specify a destination database and table. By default, Firehose inserts all incoming records into the specified destination table.
    1. Database expression: “firehose_iceberg_db
    2. Table expression: “firehose_events_1

Include double quotation marks to use the literal value for the database and table name. If you do not use double quotations marks, Firehose assumes that this is a JSON Query expression and will attempt to parse the expression when processing your stream and fail.

  1. Go to Buffer hints and reduce the Buffer size to 1 MiB and the Buffer interval to 60 You can fine tune these settings for your application.
  2. For Backup settings:
    • Select the S3 bucket created by the CloudFormation template. It has the following structure: s3://firehose-demo-iceberg-<account_id>-<region>
    • For error output prefix enter: error/events-1/

  3. In Advanced settings, enable CloudWatch error logging, and in Existing IAM roles, select the role that starts with Firehose-Iceberg-Stack-FirehoseIamRole-*, created by the CloudFormation template.
  4. Choose Create Firehose stream.

Generate streaming data:

Use Kinesis Data Generator to publish data records into your Firehose stream.

  1. Go to the CloudFormation stack, select the Nested stack for the generator, and choose Outputs.
  2. Choose the KinesisDataGenerator URL and enter the credentials that you defined when deploying the CloudFormation stack.
  3. Select the AWS Region where you deployed the CloudFormation stack and select your Firehose stream.
  4. For template, replace the values on the screen with the following:
    {
    "type": {
    "device": "{{random.arrayElement(["mobile", "desktop", "tablet"])}}",
    "event": "{{random.arrayElement(["firehose_events_1", "firehose_events_2"])}}",
    "action": "update"
    },
    "customer_id": "{{random.number({ "min": 1, "max": 1500})}}",
    "event_timestamp": "{{date.now("YYYY-MM-DDTHH:mm:ss.SSS")}}",
    "region": "{{random.arrayElement(["pdx", "nyc"])}}"
    }

  5. Before sending data, choose Test template to see an example payload.
  6. Choose Send data.

Querying with Athena:

You can query the data you’ve written to your Iceberg tables using different processing engines such as Apache Spark, Apache Flink, or Trino. In this example, we will show you how you can use Athena to query data that you’ve written to Iceberg tables.

  1. Go to the Athena console.
  2. Configure a Location of query result. You can use the same S3 bucket for this but add a suffix at the end.
    s3://firehose-demo-iceberg-<account_id>-<region>/athena/

  3. In the query editor, in Tables and views, select the options button next to firehose_events_1 and select Preview Table.

You should be able to see data in the Apache Iceberg tables by using Athena.

With that, you ‘ve delivered data streams into an Apache Iceberg table using Firehose and run a SQL query against your data.

Now let’s explore the other scenarios. We will follow the same procedure as before for creating the Firehose stream and querying Iceberg tables with Amazon Athena.

Use case 2: Deliver data from a stream into a single Iceberg table and perform record inserts, updates, and deletes

One of the advantages of using Apache Iceberg is that it allows you to perform row-level operations such as updates and deletes on tables in a data lake. Firehose can be set up to automatically apply record update and delete operations in your Iceberg tables.

Things to know:

  • When you apply an update or delete operation through Firehose, the data in Amazon S3 isn’t actually deleted. Instead, a marker record is written according to the Apache Iceberg format specification to indicate that the record is updated or deleted, so subsequent read and write operations get the latest record. If you want to purge (remove the underlying data from Amazon S3) the deleted records, you can use tools developed for purging records in Apache Iceberg.
  • If you attempt to update a record using Firehose and the underlying record doesn’t already exist in the destination table, Firehose will insert the record as a new row.

Create a Firehose stream:

  1. Go to the Amazon Data Firehose console.
  2. Choose Create Firehose stream.
  3. For Source, select Direct PUT. For Destination select Apache Iceberg Tables.
  4. For the Firehose stream name, enter firehose-iceberg-events-2.
  5. In the e, enable inline parsing for routing information and provide the required values as static values for Database expression and Table expression. Because you want to be able to update records, you also need to specify the Operation expression.
    1. Database expression: “firehose_iceberg_db
    2. Table expression: “firehose_events_2
    3. Operation expression: “update

Include double quotation marks to use the literal value for the database and table name. If you do not use double quotations marks, Firehose assumes that this is a JSON Query expression and will attempt to parse the expression when processing your stream and fail.

  1. Because you want to perform update and delete operations, you need to provide the columns in the destination table that will be used as unique keys to identify the record in the destination to be updated or deleted.
    • For DestinationDatabaseName: “firehose_iceberg_db
    • For DestinationTableName: “firehose_events_2
    • In UniqueKeys, replace the existing value with: “customer_id

  2. Change the Buffer hints to 1 MiB and 60
  3. In Backup settings, select the same bucket from the stack, but enter the following in the error output prefix:
    error/events-2/

  4. In Advanced settings, enable CloudWatch Error logging and select the existing role of the stack and create the new Firehose stream.

Use Kinesis Data Generator to publish records into your Firehose stream. You might need to refresh the page or change regions so that it refreshes and shows the newly created delivery stream.

Don’t make any changes to the template and start sending data to the firehose-iceberg-events-2 stream.

Run the following query in Athena to see data in the firehose_events_2 table. Note that you can send updated records for the same unique key (same customer_id value) into your Firehose stream, and Firehose automatically applies record updates in the destination table. Thus, when you query data in Athena, you will see only one record for each unique value of customer_id, even if you have sent multiple updates into your stream.

SELECT customer_id, count(*) 
FROM "firehose_iceberg_db"."firehose_events_2" 
GROUP BY customer_id LIMIT 10;

Use case 3: Route records to different tables based on the content of the incoming data by specifying a JSON Query expression

Until now, you provided the routing and operation information as static values to perform operations on a single table. However, you can specify JSON Query expressions to define how Firehose should retrieve the destination database, destination table, and operation from your incoming data stream, and accordingly route the record and perform the corresponding operation. Based on your specification, Firehose automatically routes and delivers each record into the appropriate destination table and applies the corresponding operation.

Create a Firehose stream:

  1. Go back to the Amazon Data Firehose console.
  2. Choose Create Firehose Stream.
  3. For Source, select Direct PUT. For Destination, select Apache Iceberg Tables.
  4. For the Firehose stream name, enter firehose-iceberg-events-3.
  5. In Destination settings, enable Inline parsing for routing information.
    • For Database expression, provide the same value as before as a static string: “firehose_iceberg_db
    • For Table expression, retrieve this value from the nested incoming record using JSON Query.
      .type.event

    • For Operation expression, we will also retrieve this value from our nested record using JSON Query.
      .type.action

If you have the following incoming events with different event values, With the preceding JSON Query expressions, Firehose will parse and get “firehose_event_3” or “firehose_event_4” as the table names, and “update” as the intended operation from the incoming records.

{ "type": {   "device": "tablet",  
"event": "firehose_events_3",   "action": "update" },
"customer_id": "112", "event_timestamp": "2024-10-02T15:46:52.901",
"region": "pdx"}
{ "type": {   "device": "tablet",  
"event": "firehose_events_4",   "action": "update" },
"customer_id": "112", "event_timestamp": "2024-10-02T15:46:52.901",
"region": "pdx"}

  1. Because this is an update operation, you need to configure unique keys for each table. Also, because you want to deliver records to multiple Iceberg tables, you need to provide configurations for each of the two destination tables that records can be written to.
  2. Change the Buffer hints to 1 MiB and 60
  3. In Backup settings, select the same bucket from the stack, but in the error output prefix enter the following:
    error/events-3/

  4. In Advanced settings, select the existing IAM role created by the CloudFormation stack and create the new Firehose stream.
  5. In Kinesis Data Generator, refresh the page and select the newly created Firehose stream: firehose-iceberg-events-3

If you query the firehose_events_3 and firehose_events_4 tables using Athena, you should find the data routed to right tables by Firehose using the routing information retrieved using JSON Query expressions.

Table below showing  events with event “firehose_events_3

The following figure shows Firehose Events Table 4.

Use Case 4: Route records to different tables based on the content of the incoming data by using a Lambda function

There might be scenarios where routing information isn’t readily available in the input record. You might want to parse and process incoming records or perform a lookup to determine where to deliver the record and whether to perform an update or delete operation. For such scenarios, you can use a Lambda function to generate the routing information and operation specification. Firehose automatically invokes your Lambda function for a batch of records (with a configurable batch size). You can process incoming records in your Lambda function and provide the routing information and operation in the output of the function. To learn more about how to process Firehose records using Lambda, see Transform source data in Amazon Data Firehose. After executing your Lambda function, Firehose looks for routing information and operations in the metadata fields (in the following format) provided by your Lambda function.

    "metadata":{
        "otfMetadata":{
            "destinationTableName":"firehose_iceberg_db",
            "destinationDatabaseName":"firehose_events_*",
            "operation":"insert"
        }

So, in this use case, you will explore how you can create custom routing rules based on other values of your records. Specifically, for this use case, you will route all records with a value for Region of ‘pdx’ to table 3 and all records with a region value of ‘nyc’ to table 4.

The CloudFormation template has already created the custom processing Lambda function for you, which has the following code:

import base64
import json
print('Loading function')

def lambda_handler(event, context):
    firehose_records_output = {'records': []}

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        # Process the payload based on region
        payload_json = json.loads(payload)
        region = payload_json.get('region', '')
        firehose_record_output = {}
        if region == 'pdx':
            firehose_record_output['metadata'] = {
                'otfMetadata': {
                    'destinationDatabaseName': 'firehose_iceberg_db',
                    'destinationTableName': 'firehose_events_3',
                    'operation': 'insert'
                }
            }
        elif region == 'nyc':
            firehose_record_output['metadata'] = {
                'otfMetadata': {
                    'destinationDatabaseName': 'firehose_iceberg_db',
                    'destinationTableName': 'firehose_events_4',
                    'operation': 'insert'
                }
            }

        # Create output with proper record ID, output data, result, and metadata
        firehose_record_output['recordId'] = record['recordId']
        firehose_record_output['result'] = 'Ok'
        firehose_record_output['data'] = base64.b64encode(payload.encode('utf-8'))
        firehose_records_output['records'].append(firehose_record_output)

    return firehose_records_output

Configure the Firehose stream:

  1. Go back to the Data Firehose console.
  2. Choose Create Firehose stream.
  3. For Source, select Direct PUT. For Destination, select Apache Iceberg Tables.
  4. For the Firehose stream name, enter firehose-iceberg-events-4.
  5. In Transform records, select Turn on data transformation.
  6. Browse and select the function created by the CloudFormation stack:
    • Firehose-Iceberg-Stack-FirehoseProcessingLambda-*.
    • For Version select $LATEST.
  7. You can leave the Destination Settings as default because the Lambda function will provide the required metadata for routing.
  8. Change the Buffer hints to 1 MiB and 60 seconds.
  9. In Backup settings, select the same S3 bucket from the stack, but in the error output prefix, enter the following:
    error/events-4/

  10. In Advanced settings, select the existing role of the stack and create the new Firehose stream.
  11. In Kinesis Data Generator, refresh the page and select the newly created firehose stream: firehose-iceberg-events-4.

If you run the following query, you will see that the last records that were inserted into the table are only in the Region of ‘nyc’.

SELECT * FROM "firehose_iceberg_db"."firehose_events_4" 
order by event_timestamp desc 
limit 10;

Considerations and limitations

Before using Data Firehose with Apache Iceberg, it’s important to be aware of considerations and limitations. For more information, see Considerations and limitations.

Clean up

To avoid future charges, delete the resources you created in AWS Glue, Data Catalog, and the S3 bucket used for storage.

Conclusion

It’s straightforward to set up Firehose streams to deliver streaming records into Apache Iceberg tables in Amazon S3. We hope that this post helps you get started with building some amazing applications without having to worry about writing and managing complex application code or having to manage infrastructure.

To learn more about using Amazon Data Firehose with Apache Iceberg, see the Firehose Developer Guide or try the Immersion day workshop.


About the authors

Diego Garcia Garcia is a Specialist SA Manager for Analytics at AWS. His expertise spans across Amazon’s analytics services, with a particular focus on real-time data processing and advanced analytics architectures. Diego leads a team of specialist solutions architects across EMEA, collaborating closely with customers spanning across multiple industries and geographies to design and implement solutions to their data analytics challenges.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers, helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink.

Phaneendra Vuliyaragoli is a Product Management Lead for Amazon Data Firehose at AWS. In this role, Phaneendra leads the product and go-to-market strategy for Amazon Data Firehose.

Migrate from Amazon Kinesis Data Analytics for SQL to Amazon Managed Service for Apache Flink and Amazon Managed Service for Apache Flink Studio

Post Syndicated from Julian Payne original https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-to-amazon-managed-service-for-apache-flink-and-amazon-managed-service-for-apache-flink-studio/

Amazon Kinesis Data Analytics for SQL is a data stream processing engine that helps you run your own SQL code against streaming sources to perform time series analytics, feed real-time dashboards, and create real-time metrics. AWS has made the decision to discontinue Kinesis Data Analytics for SQL, effective January 27, 2026. In this post, we explain why we plan to end support for Kinesis Data Analytics for SQL, alternative AWS offerings, and how to migrate your SQL queries and workloads.

Overview of Kinesis Data Analytics for SQL

The following diagram illustrates the workflow for using Kinesis Data Analytics for SQL.

Kinesis Data Analytics for SQL has been denoted a legacy offering since 2021 on our marketing pages, the AWS Management Console, and public documentation. In this time, we haven’t added new functionality or expanded Kinesis Data Analytics for SQL to new AWS Regions. However, we continue to actively maintain and patch the offering and support customers using the service. We will continue to undertake these activities.

To help you plan and migrate away from Kinesis Data Analytics for SQL, we will discontinue the offering gradually:

  • On October 15, 2025, you won’t be able to create new Kinesis Data Analytics for SQL applications from this time, but will be able to run any existing applications as normal.
  • We will delete any remaining customer applications on January 27, 2026. You won’t be able to start or operate your Kinesis Data Analytics for SQL applications and support will no longer be available for Kinesis Data Analytics for SQL from this time.

Overview of Managed Service for Apache Flink and Apache Flink Studio

Kinesis Data Analytics for SQL, which was launched in 2016, predates several popular AWS data stream processing offerings, such as Amazon Managed Service for Apache Flink and Amazon Managed Service for Apache Flink Studio. We have found that customers often want to use these newer offerings over Kinesis Data Analytics for SQL.

Amazon Managed Service for Apache Flink is a serverless, low-latency, highly scalable, and highly available real-time stream processing service. Apache Flink is a distributed open source engine for processing data streams. These managed Flink-based offerings provide functionality not available in Kinesis Data Analytics for SQL and can help you build end-to-end streaming pipelines and maintain the accuracy and timeliness of data. For example, Amazon Managed Service for Apache Flink supports built-in scaling, exactly-once processing semantics, multi-language support (including SQL), over 40 source and destination connectors, durable application state, and more

We see customers migrating their Kinesis Data Analytics for SQL workloads to take advantage of the advanced features available with managed Flink offerings. Customers running SQL queries typically select Amazon Managed Service for Apache Flink Studio. Amazon Managed Service for Apache Flink Studio allows you to create a notebook, which is a web-based development environment. With notebooks, you get a simple interactive development experience combined with the advanced capabilities provided by Flink. Amazon Managed Service for Apache Flink Studio uses Apache Zeppelin as the notebook, and uses Flink as the stream processing engine. Amazon Managed Service for Apache Flink Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. Notebooks are provisioned quickly and provide a way for you to instantly view and analyze your streaming data. Zeppelin provides your Amazon Managed Service for Apache Flink Studio notebooks with a complete suite of analytics tools, including the following capabilities:

  • Visualizing data
  • Exporting data to files
  • Controlling the output format for straightforward analysis
  • Turning the notebook into a scalable, production application

The following diagram illustrates a common workflow for Managed Service for Apache Flink.

Unlike Kinesis Data Analytics for SQL, Managed Service for Apache Flink adds the following SQL support:

  • Joining stream data between multiple streams in Amazon Kinesis Data Streams, or between a Kinesis data stream and an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic
  • Real-time visualization of transformed data in a data stream
  • Using Python scripts or Scala programs within the same application
  • Changing offsets of the streaming layer

Another benefit of Amazon Managed Service for Apache Flink is the improved scalability of the solution post-deployment, because you can scale the underlying resources to meet demand. In Kinesis Data Analytics for SQL, scaling is performed by adding more pumps to persuade the application into adding more resources.

Migrate to Managed Service for Apache Flink

For more information about migrating Kinesis Data Analytics for SQL to Amazon Managed Service for Apache Flink Studio, see Migrate from Amazon Kinesis Data Analytics for SQL Applications to Amazon Managed Service for Apache Flink Studio. Additionally, we have provided guidance in our public documentation, including sample code for how to recreate 17 common Kinesis Data Analytics for SQL queries in Amazon Managed Service for Apache Flink Studio, which we will continue to expand over time. We have created step by step migration guidance for customers using Amazon Data Firehose as a source, or who want to use user-defined functions in Amazon Managed Service for Apache Flink . We also provide documentation to help customers migrating machine learning workloads from Kinesis Data Analytics for SQL to Amazon Managed Service for Apache Flink.

Conclusion

In this post, we outlined how we plan to discontinue Kinesis Data Analytics for SQL and why we’re taking these steps. We recommend migrating your Kinesis Data Analytics for SQL workloads to Amazon Managed Service for Apache Flink or Apache Flink Studio, and we have provided resources to help you get started with your migration. If you need more help, you can ask questions in  re:Post, making sure to tag Kinesis Data Analytics for SQL.


About the author

Julian Payne is a Principal Product Manager at AWS. He is passionate about building products and features to help customers innovate using real-time data processing applications in the cloud. Outside of work he writes and illustrates graphic novels.

Build a real-time streaming generative AI application using Amazon Bedrock, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Streams

Post Syndicated from Felix John original https://aws.amazon.com/blogs/big-data/build-a-real-time-streaming-generative-ai-application-using-amazon-bedrock-amazon-managed-service-for-apache-flink-and-amazon-kinesis-data-streams/

Generative artificial intelligence (AI) has gained a lot of traction in 2024, especially around large language models (LLMs) that enable intelligent chatbot solutions. Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API, along with a broad set of capabilities to help you build generative AI applications with security, privacy, and responsible AI. Use cases around generative AI are vast and go well beyond chatbot applications; for instance, generative AI can be used for analysis of input data such as sentiment analysis of reviews.

Most businesses generate data continuously in real-time. Internet of Things (IoT) sensor data, application log data from your applications, or clickstream data generated by users of your website are only some examples of continuously generated data. In many situations, the ability to process this data quickly (in real-time or near real-time) helps businesses increase the value of insights they get from their data.

One option to process data in real-time is using stream processing frameworks such as Apache Flink. Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Managed Service for Apache Flink, which enables you to build and deploy sophisticated streaming applications without setting up infrastructure and managing resources.

Data streaming enables generative AI to take advantage of real-time data and provide businesses with rapid insights. This post looks at how to integrate generative AI capabilities when implementing a streaming architecture on AWS using managed services such as Managed Service for Apache Flink and Amazon Kinesis Data Streams for processing streaming data and Amazon Bedrock to utilize generative AI capabilities. We focus on the use case of deriving review sentiment in real-time from customer reviews in online shops. We include a reference architecture and a step-by-step guide on infrastructure setup and sample code for implementing the solution with the AWS Cloud Development Kit (AWS CDK). You can find the code to try it out yourself on the GitHub repo.

Solution overview

The following diagram illustrates the solution architecture. The architecture diagram depicts the real-time streaming pipeline in the upper half and the details on how you gain access to the Amazon OpenSearch Service dashboard in the lower half.

Architecture Overview

The real-time streaming pipeline consists of a producer that is simulated by running a Python script locally that is sending reviews to a Kinesis Data Stream. The reviews are from the Large Movie Review Dataset and contain positive or negative sentiment. The next step is the ingestion to the Managed Service for Apache Flink application. From within Flink, we are asynchronously calling Amazon Bedrock (using Anthropic Claude 3 Haiku) to process the review data. The results are then ingested into an OpenSearch Service cluster for visualization with OpenSearch Dashboards. We directly call the PutRecords API of Kinesis Data Streams within the Python script for the sake of simplicity and to cost-effectively run this example. You should consider using an Amazon API Gateway REST API as a proxy in front of Kinesis Data Streams when using a similar architecture in production, as described in Streaming Data Solution for Amazon Kinesis.

To gain access to the OpenSearch dashboard, we need to use a bastion host that is deployed in the same private subnet within your virtual private cloud (VPC) as your OpenSearch Service cluster. To connect with the bastion host, we use Session Manager, a capability of Amazon Systems Manager, which allows us to connect to our bastion host securely without having to open inbound ports. To access it, we use Session Manager to port forward the OpenSearch dashboard to our localhost.

The walkthrough consists of the following high-level steps:

  1. Create the Flink application by building the JAR file.
  2. Deploy the AWS CDK stack.
  3. Set up and connect to OpenSearch Dashboards.
  4. Set up the streaming producer.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Implementation details

This section focuses on the Flink application code of this solution. You can find the code on GitHub. The StreamingJob.java file inside the flink-async-bedrock directory file serves as entry point to the application. The application uses the FlinkKinesisConsumer, which is a connector for reading streaming data from a Kinesis Data Stream. It applies a map transformation to convert each input string into an instance of Review class object, resulting in DataStream<Review> to ease processing.

The Flink application uses the helper class AsyncDataStream defined in the StreamingJob.java file to incorporate an asynchronous, external operation into Flink. More specifically, the following code creates an asynchronous data stream by applying the AsyncBedrockRequest function to each element in the inputReviewStream. The application uses unorderedWait to increase throughput and reduce idle time because event ordering is not required. The timeout is set to 25,000 milliseconds to give the Amazon Bedrock API enough time to process long reviews. The maximum concurrency or capacity is limited to 1,000 requests at a time. See the following code:

DataStream<ProcessedReview> processedReviewStream = AsyncDataStream.unorderedWait(inputReviewStream, new AsyncBedrockRequest(applicationProperties), 25000, TimeUnit.MILLISECONDS, 1000).uid("processedReviewStream");

The Flink application initiates asynchronous calls to the Amazon Bedrock API, invoking the Anthropic Claude 3 Haiku foundation model for each incoming event. We use Anthropic Claude 3 Haiku on Amazon Bedrock because it is Anthropic’s fastest and most compact model for near-instant responsiveness. The following code snippet is part of the AsyncBedrockRequest.java file and illustrates how we set up the required configuration to call the Anthropic’s Claude Messages API to invoke the model:

@Override
public void asyncInvoke(Review review, final ResultFuture<ProcessedReview> resultFuture) throws Exception {

    // [..]

    JSONObject user_message = new JSONObject()
        .put("role", "user")
        .put("content", "<review>" + reviewText + "</review>");

    JSONObject assistant_message = new JSONObject()
        .put("role", "assistant")
        .put("content", "{");

    JSONArray messages = new JSONArray()
            .put(user_message)
            .put(assistant_message);

    String payload = new JSONObject()
            .put("system", systemPrompt)
            .put("anthropic_version", "bedrock-2023-05-31")
            .put("temperature", 0.0)
            .put("max_tokens", 4096)
            .put("messages", messages)
            .toString();

    InvokeModelRequest request = InvokeModelRequest.builder()
            .body(SdkBytes.fromUtf8String(payload))
            .modelId("anthropic.claude-3-haiku-20240307-v1:0")
            .build();

    CompletableFuture<InvokeModelResponse> completableFuture = client.invokeModel(request)
            .whenComplete((response, exception) -> {
                if (exception != null) {
                    LOG.error("Model invocation failed: " + exception);
                }
            })
            .orTimeout(250000, TimeUnit.MILLISECONDS);

Prompt engineering

The application uses advanced prompt engineering techniques to guide the generative AI model’s responses and provide consistent responses. The following prompt is designed to extract a summary as well as a sentiment from a single review:

String systemPrompt = 
     "Summarize the review within the <review> tags 
     into a single and concise sentence alongside the sentiment 
     that is either positive or negative. Return a valid JSON object with 
     following keys: summary, sentiment. 
     <example> {\\\"summary\\\": \\\"The reviewer strongly dislikes the movie, 
     finding it unrealistic, preachy, and extremely boring to watch.\\\", 
     \\\"sentiment\\\": \\\"negative\\\"} 
     </example>";

The prompt instructs the Anthropic Claude model to return the extracted sentiment and summary in JSON format. To maintain consistent and well-structured output by the generative AI model, the prompt uses various prompt engineering techniques to improve the output. For example, the prompt uses XML tags to provide a clearer structure for Anthropic Claude. Moreover, the prompt contains an example to enhance Anthropic Claude’s performance and guide it to produce the desired output. In addition, the prompt pre-fills Anthropic Claude’s response by pre-filling the Assistant message. This technique helps provide a consistent output format. See the following code:

JSONObject assistant_message = new JSONObject()
    .put("role", "assistant")
    .put("content", "{");

Build the Flink application

The first step is to download the repository and build the JAR file of the Flink application. Complete the following steps:

  1. Clone the repository to your desired workspace:
    git clone https://github.com/aws-samples/aws-streaming-generative-ai-application.git

  2. Move to the correct directory inside the downloaded repository and build the Flink application:
    cd flink-async-bedrock && mvn clean package

Building Jar File

Maven will compile the Java source code and package it in a distributable JAR format in the directory flink-async-bedrock/target/ named flink-async-bedrock-0.1.jar. After you deploy your AWS CDK stack, the JAR file will be uploaded to Amazon Simple Storage Service (Amazon S3) to create your Managed Service for Apache Flink application.

Deploy the AWS CDK stack

After you build the Flink application, you can deploy your AWS CDK stack and create the required resources:

  1. Move to the correct directory cdk and deploy the stack:
    cd cdk && npm install & cdk deploy

This will create the required resources in your AWS account, including the Managed Service for Apache Flink application, Kinesis Data Stream, OpenSearch Service cluster, and bastion host to quickly connect to OpenSearch Dashboards, deployed in a private subnet within your VPC.

  1. Take note of the output values. The output will look similar to the following:
 ✅  StreamingGenerativeAIStack

✨  Deployment time: 1414.26s

Outputs:
StreamingGenerativeAIStack.BastionHostBastionHostIdC743CBD6 = i-0970816fa778f9821
StreamingGenerativeAIStack.accessOpenSearchClusterOutput = aws ssm start-session --target i-0970816fa778f9821 --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com"]}'
StreamingGenerativeAIStack.bastionHostIdOutput = i-0970816fa778f9821
StreamingGenerativeAIStack.domainEndpoint = vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com
StreamingGenerativeAIStack.regionOutput = us-east-1
Stack ARN:
arn:aws:cloudformation:us-east-1:<AWS Account ID>:stack/StreamingGenerativeAIStack/3dec75f0-cc9e-11ee-9b16-12348a4fbf87

✨  Total time: 1418.61s

Set up and connect to OpenSearch Dashboards

Next, you can set up and connect to OpenSearch Dashboards. This is where the Flink application will write the extracted sentiment as well as the summary from the processed review stream. Complete the following steps:

  1. Run the following command to establish connection to OpenSearch from your local workspace in a separate terminal window. The command can be found as output named accessOpenSearchClusterOutput.
    • For Mac/Linux, use the following command:
aws ssm start-session --target <BastionHostId> --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["<OpenSearchDomainHost>"]}'
    • For Windows, use the following command:
aws ssm start-session ^
    —target <BastionHostId> ^
    —document-name AWS-StartPortForwardingSessionToRemoteHost ^    
    —parameters host="<OpenSearchDomainHost>",portNumber="443",localPortNumber="8157"

It should look similar to the following output:

Session Manager CLI

  1. Create the required index in OpenSearch by issuing the following command:
    • For Mac/Linux, use the following command:
curl --location -k --request PUT https://localhost:8157/processed_reviews \
--header 'Content-Type: application/json' \
--data-raw '{
  "mappings": {
    "properties": {
        "reviewId": {"type": "integer"},
        "userId": {"type": "keyword"},
        "summary": {"type": "keyword"},
        "sentiment": {"type": "keyword"},
        "dateTime": {"type": "date"}}}}}'
    • For Windows, use the following command:
$url = https://localhost:8157/processed_reviews
$headers = @{
    "Content-Type" = "application/json"
}
$body = @{
    "mappings" = @{
        "properties" = @{
            "reviewId" = @{ "type" = "integer" }
            "userId" = @{ "type" = "keyword" }
            "summary" = @{ "type" = "keyword" }
            "sentiment" = @{ "type" = "keyword" }
            "dateTime" = @{ "type" = "date" }
        }
    }
} | ConvertTo-Json -Depth 3
Invoke-RestMethod -Method Put -Uri $url -Headers $headers -Body $body -SkipCertificateCheck
  1. After the session is established, you can open your browser and navigate to https://localhost:8157/_dashboards. Your browser might consider the URL not secure. You can ignore this warning.
  2. Choose Dashboards Management under Management in the navigation pane.
  3. Choose Saved objects in the sidebar.
  4. Import export.ndjson, which can be found in the resources folder within the downloaded repository.

OpenSearch Dashboards Upload

  1. After you import the saved objects, you can navigate to Dashboards under My Dashboard in the navigation pane.

At the moment, the dashboard appears blank because you haven’t uploaded any review data to OpenSearch yet.

Set up the streaming producer

Finally, you can set up the producer that will be streaming review data to the Kinesis Data Stream and ultimately to the OpenSearch Dashboards. The Large Movie Review Dataset was originally published in 2011 in the paper “Learning Word Vectors for Sentiment Analysis” by Andrew L. Maas, Raymond E. Daly, Peter T. Pham, Dan Huang, Andrew Y. Ng, and Christopher Potts. Complete the following steps:

  1. Download the Large Movie Review Dataset here.
  2. After the download is complete, extract the .tar.gz file to retrieve the folder named aclImdb 3 or similar that contains the review data. Rename the review data folder to aclImdb.
  3. Move the extracted dataset to data/ inside the repository that you previously downloaded.

Your repository should look like the following screenshot.

Folder Overview

  1. Modify the DATA_DIR path in producer/producer.py if the review data is named differently.
  2. Move to the producer directory using the following command:
    cd producer

  3. Install the required dependencies and start generating the data:
    pip install -r requirements.txt && python produce.py

The OpenSearch dashboard should be populated after you start generating streaming data and writing it to the Kinesis Data Stream. Refresh the dashboard to view the latest data. The dashboard shows the total number of processed reviews, the sentiment distribution of the processed reviews in a pie chart, and the summary and sentiment for the latest reviews that have been processed.

When you have a closer look at the Flink application, you will notice that the application marks the sentiment field with the value error whenever there is an error with the asynchronous call made by Flink to the Amazon Bedrock API. The Flink application simply filters the correctly processed reviews and writes them to the OpenSearch dashboard.

For robust error handling, you should write any incorrectly processed reviews to a separate output stream and not discard them completely. This separation allows you to handle failed reviews differently than successful ones for simpler reprocessing, analysis, and troubleshooting.

Clean up

When you’re done with the resources you created, complete the following steps:

  1. Delete the Python producer using Ctrl/Command + C.
  2. Destroy your AWS CDK stack by returning to the root folder and running the following command in your terminal:
    cd cdk && cdk destroy

  3. When asked to confirm the deletion of the stack, enter yes.

Conclusion

In this post, you learned how to incorporate generative AI capabilities in your streaming architecture using Amazon Bedrock and Managed Service for Apache Flink using asynchronous requests. We also gave guidance on prompt engineering to derive the sentiment from text data using generative AI. You can build this architecture by deploying the sample code from the GitHub repository.

For more information on how to get started with Managed Service for Apache Flink, refer to Getting started with Amazon Managed Service for Apache Flink (DataStream API). For details on how to set up Amazon Bedrock, refer to Set up Amazon Bedrock. For other posts on Managed Service for Apache Flink, browse through the AWS Big Data Blog.


About the Authors

Felix John is a Solutions Architect and data streaming expert at AWS, based in Germany. He focuses on supporting small and medium businesses on their cloud journey. Outside of his professional life, Felix enjoys playing Floorball and hiking in the mountains.

Michelle Mei-Li Pfister is a Solutions Architect at AWS. She is supporting customers in retail and consumer packaged goods (CPG) industry on their cloud journey. She is passionate about topics around data and machine learning.

Build Spark Structured Streaming applications with the open source connector for Amazon Kinesis Data Streams

Post Syndicated from Idan Maizlits original https://aws.amazon.com/blogs/big-data/build-spark-structured-streaming-applications-with-the-open-source-connector-for-amazon-kinesis-data-streams/

Apache Spark is a powerful big data engine used for large-scale data analytics. Its in-memory computing makes it great for iterative algorithms and interactive queries. You can use Apache Spark to process streaming data from a variety of streaming sources, including Amazon Kinesis Data Streams for use cases like clickstream analysis, fraud detection, and more. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at any scale.

With the new open source Amazon Kinesis Data Streams Connector for Spark Structured Streaming, you can use the newer Spark Data Sources API. It also supports enhanced fan-out for dedicated read throughput and faster stream processing. In this post, we deep dive into the internal details of the connector and show you how to use it to consume and produce records from and to Kinesis Data Streams using Amazon EMR.

Introducing the Kinesis Data Streams connector for Spark Structured Streaming

The Kinesis Data Streams connector for Spark Structured Streaming is an open source connector that supports both provisioned and On-Demand capacity modes offered by Kinesis Data Streams. The connector is built using the latest Spark Data Sources API V2, which uses Spark optimizations. Starting with Amazon EMR 7.1, the connector comes pre-packaged on Amazon EMR on Amazon EKS, Amazon EMR on Amazon EC2, and Amazon EMR Serverless, so you don’t need to build or download any packages. For using it with other Apache Spark platforms, the connector is available as a public JAR file that can be directly referred to while submitting a Spark Structured Streaming job. Additionally, you can download and build the connector from the GitHub repo.

Kinesis Data Streams supports two types of consumers: shared throughput and dedicated throughput. With shared throughput, 2 Mbps of read throughput per shard is shared across consumers. With dedicated throughput, also known as enhanced fan-out, 2 Mbps of read throughput per shard is dedicated to each consumer. This new connector supports both consumer types out of the box without any additional coding, providing you the flexibility to consume records from your streams based on your requirements. By default, this connector uses a shared throughput consumer, but you can configure it to use enhanced fan-out in the configuration properties.

You can also use the connector as a sink connector to produce records to a Kinesis data stream. The configuration parameters for using the connector as a source and sink differ—for more information, see Kinesis Source Configuration. The connector also supports multiple storage options, including Amazon DynamoDB, Amazon Simple Service for Storage (Amazon S3), and HDFS, to store checkpoints and provide continuity.

For scenarios where a Kinesis data stream is deployed in an AWS producer account and the Spark Structured Streaming application is in a different AWS consumer account, you can use the connector to do cross-account processing. This requires additional Identity and Access Management (IAM) trust policies to allow the Spark Structured Streaming application in the consumer account to assume the role in the producer account.

You should also consider reviewing the security configuration with your security teams based on your data security requirements.

How the connector works

Consuming records from Kinesis Data Streams using the connector involves multiple steps. The following architecture diagram shows the internal details of how the connector works. A Spark Structured Streaming application consumes records from a Kinesis data stream source and produces records to another Kinesis data stream.

A Kinesis data stream is composed of set of shards. A shard is a uniquely identified sequence of data records in a stream and provides a fixed unit of capacity. The total capacity of the stream is the sum of the capacity of all of its shards.

A Spark application consists of a driver and a set of executor processes. The Spark driver acts as a coordinator, and the tasks running in executors are responsible for producing and consuming records to and from shards.

The solution workflow includes the following steps:

  1. Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs. At the beginning of a micro-batch run, the driver uses the Kinesis Data Streams ListShard API to determine the latest description of all available shards. The connector exposes a parameter (kinesis.describeShardInterval) to configure the interval between two successive ListShard API calls.
  2. The driver then determines the starting position in each shard. If the application is a new job, the starting position of each shard is determined by kinesis.startingPosition. If it’s a restart of an existing job, it’s read from last record metadata checkpoint from storage (for this post, DynamoDB) and ignores kinesis.startingPosition.
  3. Each shard is mapped to one task in an executor, which is responsible for reading data. The Spark application automatically creates an equal number of tasks based on the number of shards and distributes it across the executors.
  4. The tasks in an executor use either polling mode (shared) or push mode (enhanced fan-out) to get data records from the starting position for a shard.
  5. Spark tasks running in the executors write the processed data to the data sink. In this architecture, we use the Kinesis Data Streams sink to illustrate how the connector writes back to the stream. Executors can write to more than one Kinesis Data Streams output shard.
  6. At the end of each task, the corresponding executor process saves the metadata (checkpoint) about the last record read for each shard in the offset storage (for this post, DynamoDB). This information is used by the driver in the construction of the next micro-batch.

Solution overview

The following diagram shows an example architecture of how to use the connector to read from one Kinesis data stream and write to another.

In this architecture, we use the Amazon Kinesis Data Generator (KDG) to generate sample streaming data (random events per country) to a Kinesis Data Streams source. We start an interactive Spark Structured Streaming session and consume data from the Kinesis data stream, and then write to another Kinesis data stream.

We use Spark Structured Streaming to count events per micro-batch window. These events for each country are being consumed from Kinesis Data Streams. After the count, we can see the results.

Prerequisites

To get started, follow the instructions in the GitHub repo. You need the following prerequisites:

After you deploy the solution using the AWS CDK, you will have the following resources:

  • An EMR cluster with the Kinesis Spark connector installed
  • A Kinesis Data Streams source
  • A Kinesis Data Streams sink

Create your Spark Structured Streaming application

After the deployment is complete, you can access the EMR primary node to start a Spark application and write your Spark Structured Streaming logic.

As we mentioned earlier, you use the new open source Kinesis Spark connector to consume data from Amazon EMR. You can find the connector code on the GitHub repo along with examples on how to build and set up the connector in Spark.

In this post, we use Amazon EMR 7.1, where the connector is natively available. If you’re not using Amazon EMR 7.1 and above, you can use the connector by running the following code:

cd /usr/lib/spark/jars 
sudo wget https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar
sudo chmod 755 spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar

Complete the following steps:

  1. On the Amazon EMR console, navigate to the emr-spark-kinesis cluster.
  2. On the Instances tab, select the primary instance and choose the Amazon Elastic Compute Cloud (Amazon EC2) instance ID.

You’re redirected to the Amazon EC2 console.

  1. On the Amazon EC2 console, select the primary instance and choose Connect.
  2. Use Session Manager, a capability of AWS Systems Manager, to connect to the instance.
  3. Because the user that is used to connect is the ssm-user, we need to switch to the Hadoop user:
    sudo su hadoop

  4. Start a Spark shell either using Scala or Python to interactively build a Spark Structured Streaming application to consume data from a Kinesis data stream.

For this post, we use Python for writing to a stream using a PySpark shell in Amazon EMR.

  1. Start the PySpark shell by entering the command pyspark.

Because you already have the connector installed in the EMR cluster, you can now create the Kinesis source.

  1. Create the Kinesis source with the following code:
    kinesis = spark.readStream.format("aws-kinesis") \
        .option("kinesis.region", "<aws-region>") \
        .option("kinesis.streamName", "kinesis-source") \
        .option("kinesis.consumerType", "GetRecords") \
        .option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
        .option("kinesis.startingposition", "LATEST") \
        .load()

For creating the Kinesis source, the following parameters are required:

  • Name of the connector – We use the connector name aws-kinesis
  • kinesis.region – The AWS Region of the Kinesis data stream you are consuming
  • kinesis.consumerType – Use GetRecords (standard consumer) or SubscribeToShard (enhanced fan-out consumer)
  • kinesis.endpointURL – The Regional Kinesis endpoint (for more details, see Service endpoints)
  • kinesis.startingposition – Choose LATEST, TRIM_HORIZON, or AT_TIMESTAMP (refer to ShardIteratorType)

For using an enhanced fan-out consumer, additional parameters are needed, such as the consumer name. The additional configuration can be found in the connector’s GitHub repo.

kinesis_efo = spark \
.readStream \
.format("aws-kinesis") \
.option("kinesis.region", "<aws-region>") \
.option("kinesis.streamName", "kinesis-source") \
.option("kinesis.consumerType", "SubscribeToShard") \
.option("kinesis.consumerName", "efo-consumer") \
.option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
.option("kinesis.startingposition", "LATEST") \
.load()

Deploy the Kinesis Data Generator

Complete the following steps to deploy the KDG and start generating data:

  1. Choose Launch Stack:
    launch stack 1

You might need to change your Region when deploying. Make sure that the KDG is launched in the same Region as where you deployed the solution.

  1. For the parameters Username and Password, enter the values of your choice. Note these values to use later when you log in to the KDG.
  2. When the template has finished deploying, go to the Outputs tab of the stack and locate the KDG URL.
  3. Log in to the KDG, using the credentials you set when launching the CloudFormation template.
  4. Specify your Region and data stream name, and use the following template to generate test data:
    {
        "id": {{random.number(100)}},
        "data": "{{random.arrayElement(
            ["Spain","Portugal","Finland","France"]
        )}}",
        "date": "{{date.now("YYYY-MM-DD hh:mm:ss")}}"
    }

  5. Return to Systems Manager to continue working with the Spark application.
  6. To be able to apply transformations based on the fields of the events, you first need to define the schema for the events:
    from pyspark.sql.types import *
    
    pythonSchema = StructType() \
     .add("id", LongType()) \
     .add("data", StringType()) \
     .add("date", TimestampType())

  7. Run the following the command to consume data from Kinesis Data Streams:
    from pyspark.sql.functions import *
    
    events= kinesis \
      .selectExpr("cast (data as STRING) jsonData") \
      .select(from_json("jsonData", pythonSchema).alias("events")) \
      .select("events.*")

  8. Use the following code for the Kinesis Spark connector sink:
    events \
        .selectExpr("CAST(id AS STRING) as partitionKey","data","date") \
        .writeStream \
        .format("aws-kinesis") \
        .option("kinesis.region", "<aws-region>") \
        .outputMode("append") \
        .option("kinesis.streamName", "kinesis-sink") \
        .option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
        .option("checkpointLocation", "/kinesisCheckpoint") \
        .start() \
        .awaitTermination()

You can view the data in the Kinesis Data Streams console.

  1. On the Kinesis Data Streams console, navigate to kinesis-sink.
  2. On the Data viewer tab, choose a shard and a starting position (for this post, we use Latest) and choose Get records.

You can see the data sent, as shown in the following screenshot. Kinesis Data Streams uses base64 encoding by default, so you might see text with unreadable characters.

Clean up

Delete the following CloudFormation stacks created during this deployment to delete all the provisioned resources:

  • EmrSparkKinesisStack
  • Kinesis-Data-Generator-Cognito-User-SparkEFO-Blog

If you created any additional resources during this deployment, delete them manually.

Conclusion

In this post, we discussed the open source Kinesis Data Streams connector for Spark Structured Streaming. It supports the newer Data Sources API V2 and Spark Structured Streaming for building streaming applications. The connector also enables high-throughput consumption from Kinesis Data Streams with enhanced fan-out by providing dedicated throughput up to 2 Mbps per shard per consumer. With this connector, you can now effortlessly build high-throughput streaming applications with Spark Structured Streaming.

The Kinesis Spark connector is open source under the Apache 2.0 license on GitHub. To get started, visit the GitHub repo.


About the Authors


Idan Maizlits is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. Idan loves engaging with customers to learn about their challenges with real-time data and to help them achieve their business goals. Outside of work, he enjoys spending time with his family exploring the outdoors and cooking.


Subham Rakshit is a Streaming Specialist Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build search and streaming data platforms that help them achieve their business objective. Outside of work, he enjoys spending time solving jigsaw puzzles with his daughter.

Francisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon MSK and AWS’s managed offering for Apache Flink.

Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with customers to design and build real-time data processing systems. He has extensive working experience in software engineering, including architecting, designing, and developing data analytics systems. Outside of work, he enjoys traveling, reading, and watching movies.

Uplevel your data architecture with real- time streaming using Amazon Data Firehose and Snowflake

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/uplevel-your-data-architecture-with-real-time-streaming-using-amazon-data-firehose-and-snowflake/

Today’s fast-paced world demands timely insights and decisions, which is driving the importance of streaming data. Streaming data refers to data that is continuously generated from a variety of sources. The sources of this data, such as clickstream events, change data capture (CDC), application and service logs, and Internet of Things (IoT) data streams are proliferating. Snowflake offers two options to bring streaming data into its platform: Snowpipe and Snowflake Snowpipe Streaming. Snowpipe is suitable for file ingestion (batching) use cases, such as loading large files from Amazon Simple Storage Service (Amazon S3) to Snowflake. Snowpipe Streaming, a newer feature released in March 2023, is suitable for rowset ingestion (streaming) use cases, such as loading a continuous stream of data from Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Before Snowpipe Streaming, AWS customers used Snowpipe for both use cases: file ingestion and rowset ingestion. First, you ingested streaming data to Kinesis Data Streams or Amazon MSK, then used Amazon Data Firehose to aggregate and write streams to Amazon S3, followed by using Snowpipe to load the data into Snowflake. However, this multi-step process can result in delays of up to an hour before data is available for analysis in Snowflake. Moreover, it’s expensive, especially when you have small files that Snowpipe has to upload to the Snowflake customer cluster.

To solve this issue, Amazon Data Firehose now integrates with Snowpipe Streaming, enabling you to capture, transform, and deliver data streams from Kinesis Data Streams, Amazon MSK, and Firehose Direct PUT to Snowflake in seconds at a low cost. With a few clicks on the Amazon Data Firehose console, you can set up a Firehose stream to deliver data to Snowflake. There are no commitments or upfront investments to use Amazon Data Firehose, and you only pay for the amount of data streamed.

Some key features of Amazon Data Firehose include:

  • Fully managed serverless service – You don’t need to manage resources, and Amazon Data Firehose automatically scales to match the throughput of your data source without ongoing administration.
  • Straightforward to use with no code – You don’t need to write applications.
  • Real-time data delivery – You can get data to your destinations quickly and efficiently in seconds.
  • Integration with over 20 AWS services – Seamless integration is available for many AWS services, such as Kinesis Data Streams, Amazon MSK, Amazon VPC Flow Logs, AWS WAF logs, Amazon CloudWatch Logs, Amazon EventBridge, AWS IoT Core, and more.
  • Pay-as-you-go model – You only pay for the data volume that Amazon Data Firehose processes.
  • Connectivity – Amazon Data Firehose can connect to public or private subnets in your VPC.

This post explains how you can bring streaming data from AWS into Snowflake within seconds to perform advanced analytics. We explore common architectures and illustrate how to set up a low-code, serverless, cost-effective solution for low-latency data streaming.

Overview of solution

The following are the steps to implement the solution to stream data from AWS to Snowflake:

  1. Create a Snowflake database, schema, and table.
  2. Create a Kinesis data stream.
  3. Create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination using a secure private link.
  4. To test the setup, generate sample stream data from the Amazon Kinesis Data Generator (KDG) with the Firehose delivery stream as the destination.
  5. Query the Snowflake table to validate the data loaded into Snowflake.

The solution is depicted in the following architecture diagram.

Prerequisites

You should have the following prerequisites:

Create a Snowflake database, schema, and table

Complete the following steps to set up your data in Snowflake:

  • Log in to your Snowflake account and create the database:
    create database adf_snf;

  • Create a schema in the new database:
    create schema adf_snf.kds_blog;

  • Create a table in the new schema:
    create or replace table iot_sensors
    (sensorId number,
    sensorType varchar,
    internetIP varchar,
    connectionTime timestamp_ntz,
    currentTemperature number
    );

Create a Kinesis data stream

Complete the following steps to create your data stream:

  • On the Kinesis Data Streams console, choose Data streams in the navigation pane.
  • Choose Create data stream.
  • For Data stream name, enter a name (for example, KDS-Demo-Stream).
  • Leave the remaining settings as default.
  • Choose Create data stream.

Create a Firehose delivery stream

Complete the following steps to create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination:

  • On the Amazon Data Firehose console, choose Create Firehose stream.
  • For Source, choose Amazon Kinesis Data Streams.
  • For Destination, choose Snowflake.
  • For Kinesis data stream, browse to the data stream you created earlier.
  • For Firehose stream name, leave the default generated name or enter a name of your preference.
  • Under Connection settings, provide the following information to connect Amazon Data Firehose to Snowflake:
    • For Snowflake account URL, enter your Snowflake account URL.
    • For User, enter the user name generated in the prerequisites.
    • For Private key, enter the private key generated in the prerequisites. Make sure the private key is in PKCS8 format. Do not include the PEM header-BEGIN prefix and footer-END suffix as part of the private key. If the key is split across multiple lines, remove the line breaks.
    • For Role, select Use custom Snowflake role and enter the IAM role that has access to write to the database table.

You can connect to Snowflake using public or private connectivity. If you don’t provide a VPC endpoint, the default connectivity mode is public. To allow list Firehose IPs in your Snowflake network policy, refer to Choose Snowflake for Your Destination. If you’re using a private link URL, provide the VPCE ID using SYSTEM$GET_PRIVATELINK_CONFIG:

select SYSTEM$GET_PRIVATELINK_CONFIG();

This function returns a JSON representation of the Snowflake account information necessary to facilitate the self-service configuration of private connectivity to the Snowflake service, as shown in the following screenshot.

  • For this post, we’re using a private link, so for VPCE ID, enter the VPCE ID.
  • Under Database configuration settings, enter your Snowflake database, schema, and table names.
  • In the Backup settings section, for S3 backup bucket, enter the bucket you created as part of the prerequisites.
  • Choose Create Firehose stream.

Alternatively, you can use an AWS CloudFormation template to create the Firehose delivery stream with Snowflake as the destination rather than using the Amazon Data Firehose console.

To use the CloudFormation stack, choose

BDB-4100-CFN-Launch-Stack

Generate sample stream data
Generate sample stream data from the KDG with the Kinesis data stream you created:

{ 
"sensorId": {{random.number(999999999)}}, 
"sensorType": "{{random.arrayElement( ["Thermostat","SmartWaterHeater","HVACTemperatureSensor","WaterPurifier"] )}}", 
"internetIP": "{{internet.ip}}", 
"connectionTime": "{{date.now("YYYY-MM-DDTHH:m:ss")}}", 
"currentTemperature": {{random.number({"min":10,"max":150})}} 
}

Query the Snowflake table

Query the Snowflake table:

select * from adf_snf.kds_blog.iot_sensors;

You can confirm that the data generated by the KDG that was sent to Kinesis Data Streams is loaded into the Snowflake table through Amazon Data Firehose.

Troubleshooting

If data is not loaded into Kinesis Data Steams after the KDG sends data to the Firehose delivery stream, refresh and make sure you are logged in to the KDG.

If you made any changes to the Snowflake destination table definition, recreate the Firehose delivery stream.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you are not planning to use them further.

Conclusion

Amazon Data Firehose provides a straightforward way to deliver data to Snowpipe Streaming, enabling you to save costs and reduce latency to seconds. To try Amazon Kinesis Firehose with Snowflake, refer to the Amazon Data Firehose with Snowflake as destination lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Mostafa Mansour is a Principal Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Firehose. He specializes in developing intuitive product experiences that solve complex challenges for customers at scale. When he’s not hard at work on Amazon Kinesis Data Firehose, you’ll likely find Mostafa on the squash court, where he loves to take on challengers and perfect his dropshots.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.