All posts by Masudur Rahaman Sayem

How Laravel Nightwatch handles billions of observability events in real time with Amazon MSK and ClickHouse Cloud

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/how-laravel-nightwatch-handles-billions-of-observability-events-in-real-time-with-amazon-msk-and-clickhouse-cloud/

Laravel, one of the world’s most popular web frameworks, launched its first-party observability platform, Laravel Nightwatch, to provide developers with real-time insights into application performance. Built entirely on AWS managed services and ClickHouse Cloud, the service already processes over one billion events per day while maintaining sub-second query latency, giving developers instant visibility into the health of their applications.

By combining Amazon Managed Streaming for Apache Kafka (Amazon MSK) with ClickHouse Cloud and AWS Lambda, Laravel Nightwatch delivers high-volume, low-latency monitoring at scale, while maintaining the simplicity and developer experience Laravel is known for.

The challenge: Delivering real-time monitoring for a global developer community

The Laravel framework powers millions of applications worldwide, serving billions of requests each month. Each request can generate potentially hundreds of observability events, such as database queries, queued jobs, cache lookups, emails, notifications, and exceptions. For Nightwatch’s launch, Laravel anticipated instant adoption from its global community, with tens of thousands of applications sending events around the clock from day one.

Laravel Nightwatch needed an architecture that could:

  • Ingest millions of JSON events per second from customer applications reliably.
  • Provide sub-second analytical queries for real-time dashboards.
  • Scale horizontally to handle unpredictable traffic spikes.
  • Deliver all of this in a cost-effective, low-maintenance manner.

The challenge was to process data on a global scale and provide deep insights into application health without compromising on a straightforward setup experience for developers.

The solution: A decoupled streaming and analytics pipeline

Laravel Nightwatch implemented a dual-database, streaming-first architecture, shown in the preceding figure, that separates transactional and analytical workloads.

  • Transactional workloads – user accounts, organization settings, billing, and similar workloads run on Amazon RDS for PostgreSQL.
  • Analytical workloads – telemetry events, metrics, query logs, and request traces are handled by ClickHouse Cloud.

Key components

The key components of the solution include the following:

  1. Ingestion layer
    • Amazon API Gateway receives telemetry from Laravel agents embedded in customer applications
    • Lambda validates and enriches events. Validated and enriched events are published to Amazon MSK, partitioned for scalability
  2. Streaming to analytics
    • ClickPipes in ClickHouse Cloud subscribe directly to MSK topics, reducing the need to build and manage extract, transform, and load (ETL) pipelines
    • Materialized views in ClickHouse pre-aggregate and transform raw JSON into query-ready formats
  3. Dashboards and delivery

Why Amazon MSK and ClickHouse Cloud?

Nightwatch requires a durable, horizontally scalable, and low maintenance streaming backbone.

With Amazon MSK Express brokers, we have achieved over 1 million events per second during load testing, benefiting from low-latency, elastic scaling, and simplified operations. MSK Express brokers require no storage sizing or provisioning, scale up to 20 times faster, and recover 90% quicker than standard Apache Kafka brokers—all while enforcing best-practice defaults and client quotas for reliable performance. Its seamless integration with other AWS services—such as Lambda, Amazon Simple Storage Service (Amazon S3), and Amazon CloudWatch—made it straightforward to build a resilient, end-to-end streaming architecture.

To ingest and transform these events in real time, Nightwatch uses ClickHouse Cloud and its managed integration platform, ClickPipes. ClickHouse Cloud excels at analytical workloads by delivering up to 100 times faster query performance for analytics compared to traditional row-based databases. Its advanced compression algorithms provide up to 90% storage savings, significantly reducing infrastructure costs while maintaining high performance. With its columnar architecture and optimized execution engine, ClickHouse Cloud can query billions of rows in under 1 second, enabling Laravel Nightwatch to serve real-time dashboards and analytics at global scale.

By integrating Amazon MSK and ClickHouse using ClickPipes, Laravel also reduced the operational burden of building and managing ETL pipelines, reducing latency and complexity.

Overcoming challenges

Testing complexity

While synthetic benchmarking and test datasets yield useful results, a more realistic workload is required to rigorously test infrastructure and code before deployment to production. The team used Terraform to manage infrastructure alongside application code, creating multiple dev and test environments, and allowing them to test the platform internally with their own applications before each release.

Multi-region infrastructure

The need to cater to multiple data storage regions also brought challenges—with latency, complexity, and cost the foremost concerns. However, the AWS, ClickHouse Cloud, and Cloudflare stack made available a powerful set of networking tools and scaling options. While VPC peering, RDS replication, and global server load balancing did the heavy lifting on the networking side, the ability to scale and right-size each resource kept costs to a minimum.

Query performance at scale

Materialized views, intelligent time-series partitioning, and specialized ClickHouse codecs helped ensure that queries remained sub-second even as data volumes grew into the billions. Meanwhile, compute separation allowed distinct workloads to scale separately while accessing the same data, with clusters right-sized horizontally and vertically depending on the requirements of each load.

Results

Laravel Nightwatch’s launch exceeded expectations:

  • 5,300 users registered in the first 24 hours
  • 500 million events processed on day one
  • 97 ms average dashboard request latency
  • 760,000 exceptions logged and analyzed in real time

By building on Amazon MSK and ClickHouse Cloud, we were able to scale from zero to billions of events without sacrificing performance or developer experience.

What’s next

Laravel plans to expand Nightwatch with:

  • More regions to cater to customers with data sovereignty requirements outside the US and EU
  • Broader data collection to provide even deeper insight into customers’ applications
  • SOC 2 certification to cater to customers with tighter compliance requirements
  • More advanced monitoring and analysis to identify issues before they affect users

The current architecture comfortably supports applications of all sizes, from hobby to enterprise (including a generous free tier), and is designed to handle over one trillion monthly events without performance degradation.

Conclusion

Laravel Nightwatch demonstrates how Amazon MSK, ClickHouse Cloud, and AWS serverless technologies can be combined to build a cost-effective, real-time monitoring platform at global scale. By designing for scale from day one, Laravel delivered sub-second analytics across billions of events, while maintaining the developer-friendly experience their community expects.


About the authors

Jess Archer

Jess Archer

Jess is an Engineering Manager and Head of Nightwatch at Laravel, focusing on application observability, performance monitoring, and developer experience. She leads the Nightwatch team while staying hands-on in the codebase. Prior to Laravel, Jess worked on clinical data collection platforms, software for law enforcement, and anti-phishing solutions in banking. She later contributed extensively to Laravel’s open-source ecosystem before moving into her current leadership role. Jess is deeply passionate about open source and creating tools that make developers more productive.

James Carpenter

James Carpenter

James is a Senior Infrastructure Engineer joined Laravel in 2024 as Infrastructure Lead for the Nightwatch team, bringing experience from 15 years in sport and healthcare. Specialising in DevOps and Infrastructure, he is passionate about solving complex problems and creating exceptional experiences for both customers and developers.

Johnny Mirza

Johnny Mirza

Johnny is a Solution Architect with ClickHouse, working with users across APAC. With over 20 years of background in solutions engineering, he’s experienced in architecting and enabling solutions for enterprise clients in the telecommunications, media, insurance, and financial services sectors. Johnny has a high level of expertise of integration between both public cloud and on-premise infrastructure, while focussing on service assurance, monitoring platforms, and open-source technologies. Prior to ClickHouse, Johnny was part of the solution engineering teams at Confluent, Splunk, and Optus, to name a few.

Masudur Rahaman Sayem

Masudur Rahaman Sayem

Masudur is a Streaming Data Architect at AWS with over 25 years of experience in the IT industry. He collaborates with AWS customers worldwide to architect and implement sophisticated data streaming solutions that address complex business challenges. As an expert in distributed computing, Sayem specializes in designing large-scale distributed systems architecture for maximum performance and scalability. He has a keen interest and passion for distributed architecture, which he applies to designing enterprise-grade solutions at internet scale.

Express brokers for Amazon MSK: Turbo-charged Kafka scaling with up to 20 times faster performance

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/express-brokers-for-amazon-msk-turbo-charged-kafka-scaling-with-up-to-20-times-faster-performance/

Managing and scaling data streams efficiently is a cornerstone of success for many organizations. Apache Kafka has emerged as a leading platform for real-time data streaming, offering unmatched scalability and reliability. However, setting up and scaling Kafka clusters can be challenging, requiring significant time, expertise, and resources. This is where Amazon Managed Streaming for Apache Kafka (Amazon MSK) Express brokers come into play.

Express brokers are a new broker type in Amazon MSK that are designed to simplify Kafka deployment and scaling.

In this post, we walk you through the implementation of MSK Express brokers, highlighting their core features, benefits, and best practices for rapid Kafka scaling.

Key features of MSK Express brokers

MSK Express brokers revolutionize Kafka cluster management by delivering exceptional performance and operational simplicity. With up to three times more throughput per broker, Express brokers can sustainably handle an impressive 500 MBps ingress and 1000 MBps egress on m7g.16xl instances, setting new standards for data streaming performance.

Their standout feature is their fast scaling capability—up to 20 times faster than standard Kafka brokers—allowing rapid cluster expansion within minutes. This is complemented by 90% faster recovery from failures and built-in three-way replication, providing robust reliability for mission-critical applications.

Express brokers eliminate traditional storage management responsibility by offering unlimited storage without pre-provisioning, while simplifying operations through preconfigured best practices and automated cluster management. With full compatibility with existing Kafka APIs and comprehensive monitoring through Amazon CloudWatch and Prometheus, MSK Express brokers provide an ideal solution for organizations seeking a highly-performant and low-maintenance data streaming infrastructure.

Comparison with traditional Kafka deployment

Although Kafka provides robust fault-tolerance mechanisms, its traditional architecture, where brokers store data locally on attached storage volumes, can lead to several issues impacting the availability and resiliency of the cluster. The following diagram compares the deployment architecture.

Comparison with traditional Kafka deployment

The traditional architecture comes with the following limitations:

  • Extended recovery times – When a broker fails, recovery requires copying data from surviving replicas to the newly assigned broker. This replication process can be time-consuming, particularly for high-throughput workloads or in cases where recovery requires a new volume, resulting in extended recovery periods and reduced system availability.
  • Suboptimal load distribution – Kafka achieves load balancing by redistributing partitions across brokers. However, this rebalancing operation can strain system resources and take considerable time due to the volume of data that must be transferred between nodes.
  • Complex scaling operations – Expanding a Kafka cluster requires adding brokers and redistributing existing partitions across the new nodes. For large clusters with substantial data volumes, this scaling operation can impact performance and require significant time to complete.

MSK Express brokers offers fully managed and highly available Regional Kafka storage. This significantly decouples compute and storage resources, addressing the aforementioned challenges and improving the availability and resiliency of Kafka clusters. The benefits include:

  • Faster and more reliable broker recovery – When Express brokers recover, they do so in up to 90% less time than standard brokers and place negligible strain on the clusters’ resources, which makes recovery faster and more reliable.
  • Efficient load balancing – Load balancing in MSK Express brokers is faster and less resource-intensive, enabling more frequent and seamless load balancing operations.
  • Faster scaling – MSK Express brokers enable efficient cluster scaling through rapid broker addition, minimizing data transfer overhead and partition rebalancing time. New brokers become operational quickly due to accelerated catch-up processes, resulting in faster throughput improvements and minimal disruption during scaling operations.

Scaling use case example

Consider a use case requiring 300 MBps data ingestion on a Kafka topic. We implemented this using an MSK cluster with three m7g.4xlarge Express brokers. The configuration included a topic with 3,000 partitions and 24-hour data retention, with each broker initially managing 1,000 partitions.

To prepare for anticipated midday peak traffic, we needed to double the cluster capacity. This scenario highlights one of Express brokers’ key advantages: rapid, safe scaling without disrupting application traffic or requiring extensive advance planning. During this scenario, the cluster was actively handling approximately 300 MBps of ingestion. The following graph shows the total ingress on this cluster and the number of partitions it is holding across three brokers.

Scaling use case example

The scaling process involved two main steps:

  • Adding three additional brokers to the cluster, which completed in approximately 18 minutes
  • Using Cruise Control to redistribute the 3,000 partitions evenly across all six brokers, which took about 10 minutes

Scaling use case example

As shown in the following graph, the scaling operation completed smoothly, with partition rebalancing occurring rapidly across all six brokers while maintaining uninterrupted producer traffic.

Scaling use case example

Notably, throughout the entire process, we observed no disruption to producer traffic. The entire operation to double the cluster’s capacity was completed in just 28 minutes, demonstrating MSK Express brokers’ ability to scale efficiently with minimal impact on ongoing operations.

Best practices

Consider the following guidelines to adopt MSK Express brokers:

  • When implementing new streaming workloads on Kafka, select MSK Express brokers as your default option. If uncertain about your workload requirements, begin with express.m7g.large instances.
  • Use the Amazon MSK sizing tool to calculate optimal broker count and type for your workload. Although this provides a good baseline, always validate through load testing that simulates your real-world usage patterns.
  • Review and implement MSK Express broker best practices.
  • Choose larger instance types for high-throughput workloads. A smaller number of large instances is preferable to many smaller instances, because fewer total brokers can simplify cluster management operations and reduce operational overhead.

Conclusion

MSK Express brokers represent a significant advancement in Kafka deployment and management, offering a compelling solution for organizations seeking to modernize their data streaming infrastructure. Through its innovative architecture that decouples compute and storage, MSK Express brokers deliver simplified operations, superior performance, and rapid scaling capabilities.

The key advantages demonstrated throughout this post—including 3 times higher throughput, 20 times faster scaling, and 90% faster recovery times—make MSK Express brokers an attractive option for both new Kafka implementations and migrations from traditional deployments.

As organizations continue to face growing demands for real-time data processing, MSK Express brokers provide a future-proof solution that combines the reliability of Kafka with the operational simplicity of a fully managed service.

To get started, refer to Amazon MSK Express brokers.


About the Author

masudursMasudur Rahaman Sayem is a Streaming Data Architect at AWS with over 25 years of experience in the IT industry. He collaborates with AWS customers worldwide to architect and implement sophisticated data streaming solutions that address complex business challenges. As an expert in distributed computing, Sayem specializes in designing large-scale distributed systems architecture for maximum performance and scalability. He has a keen interest and passion for distributed architecture, which he applies to designing enterprise-grade solutions at internet scale.

Build an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK using Python

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-serverless-streaming-pipeline-with-apache-kafka-on-amazon-msk-using-python/

The volume of data generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and travel. Organizations are looking for more ways to quickly use the constant inflow of data to innovate for their businesses and customers. They have to reliably capture, process, analyze, and load the data into a myriad of data stores, all in real time.

Apache Kafka is a popular choice for these real-time streaming needs. However, it can be challenging to set up a Kafka cluster along with other data processing components that scale automatically depending on your application’s needs. You risk under-provisioning for peak traffic, which can lead to downtime, or over-provisioning for base load, leading to wastage. AWS offers multiple serverless services like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Data Firehose, Amazon DynamoDB, and AWS Lambda that scale automatically depending on your needs.

In this post, we explain how you can use some of these services, including MSK Serverless, to build a serverless data platform to meet your real-time needs.

Solution overview

Let’s imagine a scenario. You’re responsible for managing thousands of modems for an internet service provider deployed across multiple geographies. You want to monitor the modem connectivity quality that has a significant impact on customer productivity and satisfaction. Your deployment includes different modems that need to be monitored and maintained to ensure minimal downtime. Each device transmits thousands of 1 KB records every second, such as CPU usage, memory usage, alarm, and connection status. You want real-time access to this data so you can monitor performance in real time, and detect and mitigate issues quickly. You also need longer-term access to this data for machine learning (ML) models to run predictive maintenance assessments, find optimization opportunities, and forecast demand.

Your clients that gather the data onsite are written in Python, and they can send all the data as Apache Kafka topics to Amazon MSK. For your application’s low-latency and real-time data access, you can use Lambda and DynamoDB. For longer-term data storage, you can use managed serverless connector service Amazon Data Firehose to send data to your data lake.

The following diagram shows how you can build this end-to-end serverless application.

end-to-end serverless application

Let’s follow the steps in the following sections to implement this architecture.

Create a serverless Kafka cluster on Amazon MSK

We use Amazon MSK to ingest real-time telemetry data from modems. Creating a serverless Kafka cluster is straightforward on Amazon MSK. It only takes a few minutes using the AWS Management Console or AWS SDK. To use the console, refer to Getting started using MSK Serverless clusters. You create a serverless cluster, AWS Identity and Access Management (IAM) role, and client machine.

Create a Kafka topic using Python

When your cluster and client machine are ready, SSH to your client machine and install Kafka Python and the MSK IAM library for Python.

  • Run the following commands to install Kafka Python and the MSK IAM library:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python
  • Create a new file called createTopic.py.
  • Copy the following code into this file, replacing the bootstrap_servers and region information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))
  • Run the createTopic.py script to create a new Kafka topic called mytopic on your serverless cluster:
python createTopic.py

Produce records using Python

Let’s generate some sample modem telemetry data.

  • Create a new file called kafkaDataGen.py.
  • Copy the following code into this file, updating the BROKERS and region information with the details for your cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())
  • Run the kafkaDataGen.py to continuously generate random data and publish it to the specified Kafka topic:
python kafkaDataGen.py

Store events in Amazon S3

Now you store all the raw event data in an Amazon Simple Storage Service (Amazon S3) data lake for analytics. You can use the same data to train ML models. The integration with Amazon Data Firehose allows Amazon MSK to seamlessly load data from your Apache Kafka clusters into an S3 data lake. Complete the following steps to continuously stream data from Kafka to Amazon S3, eliminating the need to build or manage your own connector applications:

  • On the Amazon S3 console, create a new bucket. You can also use an existing bucket.
  • Create a new folder in your S3 bucket called streamingDataLake.
  • On the Amazon MSK console, choose your MSK Serverless cluster.
  • On the Actions menu, choose Edit cluster policy.

cluster policy

  • Select Include Firehose service principal and choose Save changes.

firehose service principal

  • On the S3 delivery tab, choose Create delivery stream.

delivery stream

  • For Source, choose Amazon MSK.
  • For Destination, choose Amazon S3.

source and destination

  • For Amazon MSK cluster connectivity, select Private bootstrap brokers.
  • For Topic, enter a topic name (for this post, mytopic).

source settings

  • For S3 bucket, choose Browse and choose your S3 bucket.
  • Enter streamingDataLake as your S3 bucket prefix.
  • Enter streamingDataLakeErr as your S3 bucket error output prefix.

destination settings

  • Choose Create delivery stream.

create delivery stream

You can verify that the data was written to your S3 bucket. You should see that the streamingDataLake directory was created and the files are stored in partitions.

amazon s3

Store events in DynamoDB

For the last step, you store the most recent modem data in DynamoDB. This allows the client application to access the modem status and interact with the modem remotely from anywhere, with low latency and high availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

Lets first create a table in DynamoDB. Refer to DynamoDB API permissions: Actions, resources, and conditions reference to verify that your client machine has the necessary permissions.

  • Create a new file called createTable.py.
  • Copy the following code into the file, updating the region information:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")
  • Run the createTable.py script to create a table called device_status in DynamoDB:
python createTable.py

Now let’s configure the Lambda function.

  • On the Lambda console, choose Functions in the navigation pane.
  • Choose Create function.
  • Select Author from scratch.
  • For Function name¸ enter a name (for example, my-notification-kafka).
  • For Runtime, choose Python 3.11.
  • For Permissions, select Use an existing role and choose a role with permissions to read from your cluster.
  • Create the function.

On the Lambda function configuration page, you can now configure sources, destinations, and your application code.

  • Choose Add trigger.
  • For Trigger configuration, enter MSK to configure Amazon MSK as a trigger for the Lambda source function.
  • For MSK cluster, enter myCluster.
  • Deselect Activate trigger, because you haven’t configured your Lambda function yet.
  • For Batch size, enter 100.
  • For Starting position, choose Latest.
  • For Topic name¸ enter a name (for example, mytopic).
  • Choose Add.
  • On the Lambda function details page, on the Code tab, enter the following code:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")
  • Deploy the Lambda function.
  • On the Configuration tab, choose Edit to edit the trigger.

edit trigger

  • Select the trigger, then choose Save.
  • On the DynamoDB console, choose Explore items in the navigation pane.
  • Select the table device_status.

You will see Lambda is writing events generated in the Kafka topic to DynamoDB.

ddb table

Summary

Streaming data pipelines are critical for building real-time applications. However, setting up and managing the infrastructure can be daunting. In this post, we walked through how to build a serverless streaming pipeline on AWS using Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose, and other services. The key benefits are no servers to manage, automatic scalability of the infrastructure, and a pay-as-you-go model using fully managed services.

Ready to build your own real-time pipeline? Get started today with a free AWS account. With the power of serverless, you can focus on your application logic while AWS handles the undifferentiated heavy lifting. Let’s build something awesome on AWS!


About the Authors

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Michael Oguike is a Product Manager for Amazon MSK. He is passionate about using data to uncover insights that drive action. He enjoys helping customers from a wide range of industries improve their businesses using data streaming. Michael also loves learning about behavioral science and psychology from books and podcasts.

Processing large records with Amazon Kinesis Data Streams

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/

In today’s digital era, data is abundant and constantly flowing. Businesses across industries are seeking ways to harness this wealth of information to gain valuable insights and make real-time decisions. To meet this need, AWS offers Amazon Kinesis Data Streams, a powerful and scalable real-time data streaming service. With Kinesis Data Streams, you can effortlessly collect, process, and analyze streaming data in real time at any scale. This service seamlessly integrates into your data architecture, allowing you to tap into the full potential of your data for informed decision-making.

Data streaming technologies like Kinesis Data Streams are designed to efficiently process and manage continuous streams of data in real time at large scale. The individual pieces of data within these streams are often referred to as records. In scenarios like large file processing or performing image, audio, or video analytics, your record may exceed 1 MB. You may struggle to ingest such a large record with Kinesis Data Streams because, as of this writing, the service has a 1 MB upper limit for maximum data record size.

In this post, we show you some different options for handling large records within Kinesis Data Streams and the benefits and disadvantages of each approach. We provide some sample code for each option to help you get started with any of these approaches with your own workloads.

Understanding the default behavior of Kinesis Data Streams

You can send records to Kinesis Data Streams using the PutRecord or PutRecords API calls. These APIs include a mandatory field known as PartitionKey, where you must provide a specific value. This partition key is used by the service to map records with the same partition keys to the same shard to ensure ordering and locality for consumption. Locality means that you want the same consumer to process all records for a given partition key. This helps ensure that data with the same partition key stays together within the same shard, maintaining data order.

Each shard, which holds your data, can handle writing up to 1 MB per second. Let’s consider a scenario where you define a partition key and attempt to send a data record that exceeds 1 MB in size. Based on the explanation so far, the service will reject this request because the record size is over 1 MB. To help you understand better, we experimented by trying to send a record of 1.5 MB to a stream, and the outcome was the following exception message:

import json
import boto3
client = boto3.client('kinesis', region_name='ap-southeast-2')

def lambda_handler(event, context):
    try:
        response = client.put_record(
            StreamName='test',
            Data=b'Sample 1 MB....',
            PartitionKey='string'
            #StreamARN='string'
        )
    
    except Exception as e:
        print (e)

START RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb Version: $LATEST An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value at 'data' failed to satisfy constraint: Member must have length less than or equal to 1048576 END RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb

Strategies for handling large records

Now that we understand the behavior of the PutRecord and PutRecords APIs, let’s discuss strategies you can use to overcome this situation. One thing to keep in mind is that there is no single best solution; in the following sections, we discuss some of the approaches that you can evaluate based on your use case:

  • Store large records in Amazon Simple Storage Service (Amazon S3) with a reference in Kinesis Data Streams
  • Split one large record into multiple records
  • Compress your large records

Let’s discuss these points one by one.

Store large records in Amazon S3 with a reference in Kinesis Data Streams

A useful approach for storing large records involves utilizing an alternative storage solution while employing a reference within Kinesis Data Streams. In this context, Amazon S3 stands out as an excellent choice due to its exceptional durability and cost-effectiveness. The procedure involves uploading the record as an object to an S3 bucket and subsequently writing a reference entry in Kinesis Data Streams. This entry incorporates an attribute that serves as a pointer, indicating the location of the object within Amazon S3.

With this approach, you can generate a pre-signed URL associated with the S3 object’s location. This link can be shared with the requester, offering them direct access to the object without the need for intermediary server-side data transfers.

The following diagram illustrates the architecture of this solution.

The following is the sample code to write data to Kinesis Data Streams using this approach:

import json
import boto3
import random

def lambda_handler(event, context):
    try:
        s3 = boto3.client('s3', region_name='ap-southeast-2')
        kds = boto3.client('kinesis', region_name='ap-southeast-2')
        expiration=3600
        pk=str(random.randint(100,100000000))
        bucket_name = 'MY_BUCKET'
        object_key = 'air/' + pk + '.txt'
        file_content = b'LARGE OBJECT'
        response = s3.put_object(Bucket=bucket_name, Key=object_key, Body=file_content)
        presigned_url = s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': bucket_name, 'Key': object_key},
            ExpiresIn=expiration
        )
        
        kdata = {'message': presigned_url}
        response = kds.put_record(
            StreamName='test',
            Data=json.dumps(kdata),
            PartitionKey=pk
        )
        print (response)
    except Exception as e:
        print (e)

If you are using an AWS Lambda consumer to process this data, you can now decode the record to get the S3 pre-signed URL to efficiently retrieve the object from Amazon S3. Then you can implement your business logic to effectively process the data. The following is sample code for reference:

import json
import base64
import json

def lambda_handler(event, context):
    item = None
    decoded_record_data = [base64.b64decode(record['kinesis']['data']).decode().replace('\n','') for record in event['Records']]
    deserialized_data = [json.loads(decoded_record) for decoded_record in decoded_record_data]
    
    
    for item in deserialized_data:
        LOB=(item['message'])
        #process LOB implementing your business logic

An inherent benefit of adopting this technique is the capability to store data in Amazon S3, accommodating an extensive range of sizes per individual object. This method helps you reduce the costs of using Kinesis Data Streams because it uses less storage space and requires fewer read and write throughput for item access. This optimization is achieved by storing just the URL within Kinesis Data Streams. However, it’s important to acknowledge that accessing the sizable object necessitates an additional call to Amazon S3, thereby introducing higher latency for clients as they manage the additional request.

Split one large record into multiple records

Splitting large records into smaller ones in Kinesis Data Streams brings advantages like faster processing, improved throughput, efficient resource use, and more straightforward error handling. Let’s say you have a large record that you want to split into smaller chunks before sending them to a Kinesis data stream. First, you need to set up a Kinesis producer. Suppose you have a large record as a string. You can split it into smaller chunks of a predefined size. For this example, let’s say you’re splitting the record into chunks of 100 characters each. After you split that, loop through the record chunks and send each chunk as a separate message to a Kinesis data stream. The following is the sample code:

import boto3
kinesis = boto3.client('kinesis', region_name='ap-southeast-2')  

def split_record(record, chunk_size):
    chunks = [record[i:i + chunk_size] for i in range(0, len(record), chunk_size)]
    return chunks

def send_to_kinesis(stream_name, record):
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=record,
        PartitionKey= '100'
    )
    return response

def main():
    stream_name = 'test'  
    large_record = 'Your large record'  # Replace with your actual record
    chunk_size = 100  

    record_chunks = split_record(large_record, chunk_size)

    for chunk in record_chunks:
        response = send_to_kinesis(stream_name, chunk)
        print(f"Record sent: {response['SequenceNumber']}")

if __name__ == "__main__":
    main()

Ensure that all chunks of a given message are directed to a single partition, thereby guaranteeing the preservation of their order. In the final chunk, include metadata within the header indicating the conclusion of the message during production. This enables consumers to identify the ultimate chunk and facilitates seamless message reconstruction. The drawback of this method is that it adds complexity to the client-side tasks of dividing and putting back together the different parts. Therefore, these functions need thorough testing to prevent any loss of data.

Compress your large records

Applying data compression prior to transmitting it to Kinesis Data Streams has numerous advantages. This approach not only reduces the data’s size, enabling swifter travel and more efficient utilization of network resources, but also leads to cost savings in terms of storage expenses while optimizing overall resource consumption. Additionally, this practice simplifies storage and data retention. By using compression algorithms such as GZIP, Snappy, or LZ4, you can achieve substantial reduction in the size of large records. Compression brings the benefit of simplicity because it’s implemented seamlessly without requiring the caller to make changes to the item or use extra AWS services to support storage. However, compression introduces additional CPU overhead and latency on the producer side, and its impact on the compression ratio and efficiency can vary depending on the data type and format. Also, compression can enhance consumer throughput at the expense of some decompression overhead.

Conclusion

For real-time data streaming use cases, it’s essential to carefully consider the handling of large records when using Kinesis Data Streams. In this post, we discussed the challenges associated with managing large records and explored strategies such as utilizing Amazon S3 references, record splitting, and compression. Each approach has its own set of benefits and drawbacks, so it’s crucial to evaluate the nature of your data and the tasks you need to perform. Select the most suitable approach based on your data’s characteristics and your processing task requirements.

We encourage you to try out the approaches discussed in this post and share your thoughts in the comments section.


About the author

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Connect to Amazon MSK Serverless from your on-premises network

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/connect-to-amazon-msk-serverless-from-your-on-premises-network/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available, and secure Apache Kafka service. Amazon MSK reduces the work needed to set up, scale, and manage Apache Kafka in production. With Amazon MSK, you can create a cluster in minutes and start sending data.

With Amazon MSK Serverless, you can run Apache Kafka without having to manage the underlying infrastructure. Amazon MSK will automatically provision, scale, and manage your Apache Kafka clusters, so you can focus on your applications without worrying about the operational overhead. Additionally, MSK Serverless offers fine-grained, pay-as-you-go pricing, making it a cost-effective option for organizations with unpredictable workloads.

Connecting to MSK Serverless is easy. You can set up a serverless cluster using the API or AWS Management Console in minutes. MSK Serverless provides bootstrap information as a private DNS endpoint, allowing clients to connect to the serverless Apache Kafka cluster. A common use case of using MSK Serverless is an on-premises client that needs to process real-time data streams. However, the private DNS endpoint is only accessible from virtual private clouds (VPCs) that have been configured to connect and isn’t directly resolvable from an on-premises network. This can pose a challenge for on-premises clients to discover and connect to the MSK Serverless cluster.
In this post, we guide you through a step-by-step process to connect your on-premises client to MSK Serverless, overcoming this challenge.

Solution overview

The following diagram illustrates the solution architecture.

The flow of the solution is as follows:

  1. The DNS query for your MSK endpoint is routed to a locally configured on-premises DNS server.
  2. The on-premises DNS as configured performs conditional forwarding for kafka-serverless.REPLACE-MSK-SERVERLESS-REGION.amazonaws.com to an Amazon Route 53 inbound resolver endpoint IP address.
  3. The inbound resolver endpoint performs DNS resolution by forwarding the query to the private hosted zone that was created along with the MSK Serverless cluster.
  4. The IP addresses returned by the DNS query are the private IP addresses of the interface VPC endpoint, which allow your on-premises host to establish private connectivity over AWS VPN or AWS Direct Connect.
  5. The interface endpoint is a collection of one or more elastic network interfaces with a private IP address in your account that serves as an entry point for traffic destined to a MSK Serverless service.

Note that at this time, this solution works only for MSK Serverless clusters with a single VPC.

Prerequisites

In this section, we discuss the prerequisite steps to complete in order to implement this solution.

Establish network connectivity between on premises and the AWS Cloud

To use MSK Serverless from your on-premises network, you need to establish a network connection between your on-premises environment and the VPC that you have set up for MSK Serverless. Various secure methods are available to connect your on-premises network to the AWS Cloud. Refer to Network-to-Amazon VPC connectivity options for more information.

Create a security group for allowing inbound TCP/UDP connections from your on-premises network

Create a security group with the following configurations on the same VPC that you configured for MSK Serverless:

Inbound rule:

  • Source: [On-premises CIDR range]
  • Protocol: TCP/UDP
  • Port Range: 53

Outbound rule: Leave it to default

For more information, refer to Work with security groups.

Update the MSK security group for inbound connections from your on-premises network

To ensure that your MSK Serverless cluster can be accessed from your on-premises network, you need to adjust the cluster’s security group settings to allow incoming traffic from your network on TCP port 9098. Complete the following steps:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Navigate to your serverless MSK cluster’s properties.

  1. Choose the security group associated with your MSK cluster.

Because MSK Serverless supports configuring multiple VPCs, make sure to choose the security group associated with the VPC that you configured for connecting from your on-premises network.

  1. To enable connections from your on-premises CIDR block to MSK Serverless, add an inbound rule that allows traffic on TCP port 9098 from your on-premises CIDR.

This ensures that your on-premises network can communicate with MSK Serverless on the specified port.

Configure a Route 53 inbound resolver endpoint

MSK Serverless provides a DNS endpoint that serves as the starting point for an Apache Kafka client to connect to the cluster. However, this endpoint isn’t publicly discoverable and can only be accessed from within the configured VPC. To resolve the serverless DNS endpoint outside of your VPC, you can set up a Route 53 resolver endpoint. This allows you to access the endpoint securely by creating a hybrid cloud setup over VPN or Direct Connect.

To configure the Route 53 resolver using the console, complete the following steps:

  1. On the Route 53 console, under Resolver in the navigation pane, choose Inbound endpoints.
  2. Choose Create inbound endpoint.

  1. For Endpoint name, enter the endpoint name.
  2. For VPC in the Region, choose the VPC where you configured MSK Serverless.
  3. For Security group for this endpoint, choose the security group that you created as a prerequisite for inbound TCP/UDP connections.

The security group of the inbound resolver endpoint should allow traffic from the on-premises DNS Server IP address on TCP/UDP port 53.

In the next step, you add your IP addresses, ensuring that the number of IP addresses matches the number of subnets in your MSK cluster.

  1. Choose the Availability Zones and subnets that are the same as your MSK Serverless network configuration.
  2. Select Use an IP address that is selected automatically.

  1. Choose Create inbound endpoint.

  1. Copy the inbound endpoint IP addresses.

Configure the on-premises DNS server

In this example, we use a Microsoft DNS server. To configure a conditional forwarder, complete the following steps:

  1. Open DNS Manager.
  2. Run the following command in the Run command window:
dnsmgmt.msc
  1. Choose (right-click) Conditional Forwarders under the server of your choosing, then choose New Conditional Forwarder.


In the next step, you enter kafka-serverless.REPLACE-MSK-SERVERLESS-REGION.amazonaws.com, using the IP address of Route 53 inbound resolver endpoints that you created earlier. You can find the MSK endpoint information by accessing the cluster’s client information. To learn more about getting client information, refer to Getting the bootstrap brokers for an Amazon MSK cluster.

  1. For DNS Domain, enter your endpoint name. For example, kafka-serverless.ap-southeast-2.amazonaws.com. Do not enter the entire endpoint name.
  2. Choose OK.

Test the DNS resolution

DNS (Domain Name System) uses TCP/UDP port 53. To test whether you can connect any of the Route 53 inbound endpoints, run the following command from your on-premises client:

telnet Route53-INBOUND-ENDPOINT-IP 53

For example: telnet 10.1.0.133 53

The following is a sample output:

Trying 10.1.0.133...
Connected to 10.1.0.133.
Escape character is '^]'.
Connection closed by foreign host.

Run the following command to check whether you can connect with the MSK Serverless endpoint from your on-premises client. To get the MSK Serverless endpoint information, refer to Create an MSK Serverless cluster.

dig MSK-SERVERLESS-ENDPOINT-REMOVE-PORT-NUMBER +short

For example: dig boot-abcdc9.c3.kafka-serverless.ap-southeast-2.amazonaws.com +short

The following is a sample output:

vpce-0bcb06d53aab34111-vt8yzx2b.vpce-svc-05dc791a527abcd.ap-southeast-2.vpce.amazonaws.com.
10.1.1.185
10.1.0.191

If the DNS resolution fails, check your network connectivity from on premises. For more information about troubleshooting connectivity issues, refer to How do I troubleshoot VPN tunnel connectivity to an Amazon VPC or Troubleshooting AWS Direct Connect.

After you create a serverless MSK cluster, the service automatically creates an interface VPC endpoint for the cluster. You can use the dig command as shown above to retrieve the VPC endpoint ID and its associated IP address, which confirms that you are now able to connect to the MSK Serverless cluster from your on-premises environment.

Test your Kafka client

Once you complete the configuration of the Route 53 inbound resolver endpoint and on-premises DNS server, you can test your Kafka client from an on-premises network. For instructions, refer to Create a client machine. This documentation guides you through the necessary steps to set up your client machine and verify that it can successfully connect to your MSK cluster from your on-premises network.

Conclusion

MSK Serverless makes it easy for you to manage your data. You don’t have to worry about setting up and running your own Kafka cluster, which saves time and effort. In this post, we explored the option of on-premises connectivity with MSK Serverless and how it can greatly benefit organizations. By establishing this connection, you can gain access to a wide range of real-time analytics use case possibilities and unlock the full potential of your data.

We encourage you to try on-premises connectivity with MSK serverless.


About the Authors

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Akeef Khan is a Solutions Architect at Amazon Web Services. He helps SMB Greenfield customers adopt the cloud. Whilst being a generalist SA, Akeef is passionate about networking.

Retain more for less with tiered storage for Amazon MSK

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/retain-more-for-less-with-tiered-storage-for-amazon-msk/

Organizations are adopting Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to capture and analyze data in real-time. Amazon MSK allows you to build and run production applications on Apache Kafka without needing Kafka infrastructure management expertise or having to deal with the complex overheads associated with running Apache Kafka on your own. With increasing maturity, customers seek to build sophisticated use cases that combine aspects of real time and batch processing. For instance, you may want to train machine learning (ML) models based on historic data and then use these models to do real time inferencing. Or you may want to be able to recompute previous results when the application logic changed, e.g., when a new KPI is added to a streaming analytics application or when a bug was fixed that caused incorrect output. These use cases often require storing data for several weeks, months, or even years.

Apache Kafka is well positioned to support these kind of use cases. Data is retained in the Kafka cluster as long as required by configuring the retention policy. In this way, the most recent data can be processed in real time for low-latency use cases while historic data remains accessible in the cluster and can be processed in a batch fashion.

However, retaining data in a Kafka cluster can become expensive because storage and compute are tightly coupled in a cluster. To scale storage, you need to add more brokers. But adding more brokers with the sole purpose of increasing the storage squanders the rest of the compute resources like CPU and memory. Also, a large cluster with more nodes adds operational complexity with a longer time to recover and rebalance when a broker fails. To avoid that operational complexity and higher cost, you can move your data to Amazon Simple Storage Service (Amazon S3) for long-term access and with cost-effective storage classes in Amazon S3 you can optimize your overall storage cost. This solves cost challenges, but now you have to build and maintain that part of the architecture for data movement to a different data store. You also need to build different data processing logic using different APIs for consuming data (Kafka API for streaming, Amazon S3 API for historic reads).

Today, we’re announcing Amazon MSK tiered storage, which brings a virtually unlimited and low-cost storage tier for Amazon MSK, making it simpler and cost-effective for developers to build streaming data applications. Since the launch of Amazon MSK in 2019, we have enabled capabilities such as vertical scaling and automatic scaling of broker storage so you can operate your Kafka workloads in a cost-effective way. Earlier this year, we launched provisioned throughput which enables seamlessly scaling I/O without having to provision additional brokers. Tiered storage makes it even more cost-effective for you to run Kafka workloads. You can now store data in Apache Kafka without worrying about limits. You can effectively balance your performance and costs by using the performance-optimized primary storage for real-time data and the new low-cost tier for the historical data. With a few clicks, you can move streaming data into a lower-cost tier to store data and only pay for what you use.

Tiered storage frees you from making hard trade-offs between supporting the data retention needs of your application teams and the operational complexity that comes with it. This enables you to use the same code to process both real-time and historical data to minimize redundant workflows and simplify architectures. With Amazon MSK tiered storage, you can implement a Kappa architecture – a streaming-first software architecture deployment pattern – to use the same data processing pipeline for correctness and completeness of data over a much longer time horizon for business analysis.

How Amazon MSK tiered storage works

Let’s look at how tiered storage works for Amazon MSK. Apache Kafka stores data in files called log segments. As each segment completes, based on the segment size configured at cluster or topic level, it’s copied to the low-cost storage tier. Data is held in performance-optimized storage for a specified retention time, or up to a specified size, and then deleted. There is a separate time and size limit setting for the low-cost storage, which must be longer than the performance-optimized storage tier. If clients request data from segments stored in the low-cost tier, the broker reads the data from it and serves the data in the same way as if it were being served from the performance-optimized storage. The APIs and existing clients work with minimal changes. When your application starts reading data from the low-cost tier, you can expect an increase in read latency for the first few bytes. As you start reading the remaining data sequentially from the low-cost tier, you can expect latencies that are similar to the primary storage tier. With tiered storage, you pay for the amount of data you store and the amount of data you retrieve.

For a pricing example, let’s consider a workload where your ingestion rate is 15 MB/s, with a replication factor of 3, and you want to retain data in your Kafka cluster for 7 days. For such a workload, it requires 6x m5.large brokers, with 32.4 TB EBS storage, which costs $4,755. But if you use tiered storage for the same workload with local retention of 4 hours and overall data retention of 7 days, it requires 3x m5.large brokers, with 0.8 TB EBS storage and 9 TB of tiered storage, which costs $1,584. If you want to read all the historic data at once, it costs $13 ($0.0015 per GB retrieval cost). In this example with tiered storage, you save around 66% of your overall cost.

Get started using Amazon MSK tiered storage

To enable tiered storage on your existing cluster, upgrade your MSK cluster to Kafka version 2.8.2.tiered and then choose Tiered storage and EBS storage as your cluster storage mode on the Amazon MSK console.

After tiered storage is enabled on the cluster level, run the following command to enable tiered storage on an existing topic. In this example, you’re enabling tiered storage on a topic called msk-ts-topic with 7 days’ retention (local.retention.ms=604800000) for a local high-performance storage tier, setting 180 days’ retention (retention.ms=15550000000) to retain the data in the low-cost storage tier, and updating the log segment size to 48 MB:

bin/kafka-configs.sh --bootstrap-server $bsrv --alter --entity-type topics --entity-name msk-ts-topic --add-config 'remote.storage.enable=true, local.retention.ms=604800000, retention.ms=15550000000, segment.bytes=50331648'

Availability and pricing

Amazon MSK tiered storage is available in all AWS regions where Amazon MSK is available excluding the AWS China, AWS GovCloud regions. This low-cost storage tier scales to virtually unlimited storage and requires no upfront provisioning. You pay only for the volume of data retained and retrieved in the low-cost tier.

For more information about this feature and its pricing, see the Amazon MSK developer guide and Amazon MSK pricing page. For finding the right sizing for your cluster, see the best practices page.

Summary

With Amazon MSK tiered storage you don’t need to provision storage for the low-cost tier or manage the infrastructure. Tiered storage enables you to scale to virtually unlimited storage. You can access data in the low-cost tier using the same clients you currently use to read data from the high-performance primary storage tier. Apache Kafka’s consumer API, streams API, and connectors consume data from both tiers without changes. You can modify the retention limits on the low-cost storage tier similarly as to how you can modify the retention limits on the high-performance storage.

Enable tiered storage on your MSK clusters today to retain data longer at a lower cost.


About the Author

Masudur Rahaman Sayem is a Streaming Architect at AWS. He works with AWS customers globally to design and build data streaming architecture to solve real-world business problems. He is passionate about distributed systems. He also likes to read, especially classic comic books.