Tag Archives: Amazon Managed Streaming for Apache Kafka

AWS Glue mutual TLS authentication for Amazon MSK

Post Syndicated from Edward Ondari original https://aws.amazon.com/blogs/big-data/aws-glue-mutual-tls-authentication-for-amazon-msk/

In today’s landscape, data streams continuously from countless sources such as social media interactions to Internet of Things (IoT) device readings. This torrent of real-time information presents both a challenge and an opportunity for businesses. To harness the power of this data effectively, organizations need robust systems for ingesting, processing, and analyzing streaming data at scale. Enter Apache Kafka: a distributed streaming platform that has revolutionized how companies handle real-time data pipelines and build responsive, event-driven applications. AWS Glue is used to process and analyze large volumes of real-time data and perform complex transformations on the streaming data from Apache Kafka.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed Apache Kafka service. You can activate a combination of authentication modes on new or existing MSK clusters. The supported authentication modes are AWS Identity and Access Management (IAM) access control, mutual Transport Layer Security (TLS), and Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM). For more information about using IAM authentication, refer to Securely process near-real-time data from Amazon MSK Serverless using an AWS Glue streaming ETL job with IAM authentication.

Mutual TLS authentication requires both the server and the client to present certificates to prove their identity. It’s ideal for hybrid applications that need a common authentication model. It’s also a commonly used authentication mechanism for business-to-business applications and is used in standards such as open banking, which enables secure open API integrations for financial institutions. For Amazon MSK, AWS Private Certificate Authority (AWS Private CA) is used to issue the X.509 certificates and for authenticating clients.

This post describes how to set up AWS Glue jobs to produce, consume, and process messages on an MSK cluster using mutual TLS authentication. AWS Glue will automatically infer the schema from the streaming data and store the metadata in the AWS Glue Data Catalog for analysis using analytics tools such as Amazon Athena.

Example use case

In our example use case, a hospital facility regularly monitors the body temperatures for patients admitted in the emergency ward using smart thermometers. Each device automatically records the patients’ temperature readings and posts the records to a central monitoring application API. Each posted record is a JSON formatted message that contains the deviceId that uniquely identifies the thermometer, a patientId to identify the patient, the patient’s temperature reading, and the eventTime when the temperature was recorded.

Record schema

The central monitoring application checks the hourly average temperature readings for each patient and notifies the hospital’s healthcare workers when a patient’s average temperature exceeds accepted thresholds (36.1–37.2°C). In our case, we use the Athena console to analyze the readings.

Overview of the solution

In this post, we use an AWS Glue Python shell job to simulate incoming data from the hospital thermometers. This job produces messages that are securely written to an MSK cluster using mutual TLS authentication.

To process the streaming data from the MSK cluster, we deploy an AWS Glue Streaming extract, transform, and load (ETL) job. This job automatically infers the schema from the incoming data, stores the schema metadata in the Data Catalog, and then stores the processed data as efficient Parquet files in Amazon Simple Storage Service (Amazon S3). We use Athena to query the output table in the Data Catalog and uncover insights.

The following diagram illustrates the architecture of the solution.

Solution architecture

The solution workflow consists of the following steps:

  1. Create a private certificate authority (CA) using AWS Certificate Manager (ACM).
  2. Set up an MSK cluster with mutual TLS authentication.
  3. Create a Java keystore (JKS) file and generate a client certificate and private key.
  4. Create a Kafka connection in AWS Glue.
  5. Create a Python shell job in AWS Glue to create a topic and push messages to Kafka.
  6. Create an AWS Glue Streaming job to consume and process the messages.
  7. Analyze the processed data in Athena.

Prerequisites

You should have the following prerequisites:

Cloud Formation stack set

This template creates two NAT gateways as shown in the following diagram. However, it’s possible to route the traffic to a single NAT gateway in one Availability Zone for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT gateway available in each Availability Zone.

VPC setup

The stack also creates a security group with a self-referencing rule to allow communication between AWS Glue components.

Create a private CA using ACM

Complete the following steps to create a root CA. For more details, refer to Creating a private CA.

  1. On the AWS Private CA console, choose Create a private CA.
  2. For Mode options, select either General-purpose or Short-lived certificate for lower pricing.
  3. For CA type options, select Root.
  4. Provide certificate details by providing at least one distinguished name.

Create private CA

  1. Leave the remaining default options and select the acknowledge checkbox.
  2. Choose Create CA.
  3. On the Actions menu, choose Install CA certificate and choose Confirm and install.

Install certificate

Set up an MSK cluster with mutual TLS authentication

Before setting up the MSK cluster, make sure you have a VPC with at least two private subnets in different Availability Zones and a NAT gateway with a route to the internet. A CloudFormation template is provided in the prerequisites section.

Complete the following steps to set up your cluster:

  1. On the Amazon MSK console, choose Create cluster.
  2. For Creation method, Custom create.
  3. For Cluster type, select Provisioned.
  4. For Broker size, you can choose kafka.t3.small for the purpose of this post.
  5. For Number of zones, choose 2.
  6. Choose Next.
  7. In the Networking section, select the VPC, private subnets, and security group you created in the prerequisites section.
  8. In the Security settings section, under Access control methods, select TLS client authentication through AWS Certificate Manager (ACM).
  9. For AWS Private CAs, choose the AWS private CA you created earlier.

The MSK cluster creation can take up to 30 minutes to complete.

Create a JKS file and generate a client certificate and private key

Using the root CA, you generate client certificates to use for authentication. The following instructions are for CloudShell, but can also be adapted for a client machine with Java and the AWS CLI installed.

  1. Open a new CloudShell session and run the following commands to create the certs directory and install Java:
mkdir certs
cd certs
sudo yum -y install java-11-amazon-corretto-headless
  1. Run the following command to create a keystore file with a private key in JKS format. Replace Distinguished-NameExample-AliasYour-Store-Pass, and Your-Key-Pass with strings of your choice:

keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass Your-Store-Pass -keypass Your-Key-Pass -dname "CN=Distinguished-Name" -alias Example-Alias -storetype pkcs12

  1. Generate a certificate signing request (CSR) with the private key created in the preceding step:

keytool -keystore kafka.client.keystore.jks -certreq -file csr.pem -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass

  1. Run the following command to remove the word NEW (and the single space that follows it) from the beginning and end of the file:

sed -i -E '1,$ s/NEW //' csr.pem

The file should start with -----BEGIN CERTIFICATE REQUEST----- and end with -----END CERTIFICATE REQUEST-----

  1. Using the CSR file, create a client certificate using the following command. Replace Private-CA-ARN with the ARN of the private CA you created.

aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://csr.pem --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS"

The command should print out the ARN of the issued certificate. Save the CertificateArn value for use in the next step.

{
"CertificateArn": "arn:aws:acm-pca:region:account:certificate-authority/CA_ID/certificate/certificate_ID"
}
  1. Use the Private-CA-ARN together with the CertificateArn (arn:aws:acp-pca:<region>:...) generated in the preceding step to retrieve the signed client certificate. This will create a client-cert.pem file.

aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN | jq -r '.Certificate + "\n" + .CertificateChain' >> client-cert.pem

  1. Add the certificate into the Java keystore so you can present it when you talk to the MSK brokers:

keytool -keystore kafka.client.keystore.jks -import -file client-cert.pem -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass -noprompt

  1. Extract the private key from the JKS file. Provide the same destkeypass and deststorepass and enter the keystore password when prompted.

keytool -importkeystore -srckeystore kafka.client.keystore.jks -destkeystore keystore.p12 -srcalias Example-Alias -deststorepass Your-Store-Pass -destkeypass Your-Key-Pass -deststoretype PKCS12

  1. Convert the private key to PEM format. Enter the keystore password you provided in the previous step when prompted.

openssl pkcs12 -in keystore.p12 -nodes -nocerts -out private-key.pem

  1. Remove the lines that begin with Bag Attributes.. from the top of the file:

sed -i -ne '/-BEGIN PRIVATE KEY-/,/-END PRIVATE KEY-/p' private-key.pem

  1. Upload the client-cert.pem, client.keystore.jks, and private-key.pem files to Amazon S3. You can either create a new S3 bucket or use an existing bucket to store the following objects. Replace <s3://aws-glue-assets-11111111222222-us-east-1/certs/> with your S3 location.

aws s3 sync ~/certs s3://aws-glue-assets-11111111222222-us-east-1/certs/ --exclude '*' --include 'client-cert.pem' --include 'private-key.pem' --include 'kafka.client.keystore.jks'

Create a Kafka connection in AWS Glue

Complete the following steps to create a Kafka connection:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. Select Apache Kafka and choose Next.
  4. For Amazon Managed Streaming for Apache Kafka Cluster, choose the MSK cluster you created earlier.

Create Glue Kafka connection

  1. Choose TLS client authentication for Authentication method.
  2. Enter the S3 path to the keystore you created earlier and provide the keystore and client key passwords you used for the -storepass and -keypass

Add authentication method to connection

  1. Under Networking options, choose your VPC, a private subnet, and a security group. The security group should contain a self-referencing rule.
  2. On the next page, provide a name for the connection (for example, Kafka-connection) and choose Create connection.

Create a Python shell job in AWS Glue to create a topic and push messages to Kafka

In this section, you create a Python shell job to create a new Kafka topic and push JSON messages to the topic. Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs.
  2. In the Script section, for Engine, choose Python shell.
  3. Choose Create script.

Create Python shell job

  1. Enter the following script in the editor:
import sys
from awsglue.utils import getResolvedOptions
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka.errors import TopicAlreadyExistsError
from urllib.parse import urlparse

import json
import uuid
import datetime
import boto3
import time
import random

# Fetch job parameters
args = getResolvedOptions(sys.argv, ['connection-names', 'client-cert', 'private-key'])

# Download client certificate and private key files from S3
TOPIC = 'example_topic'
client_cert = urlparse(args['client_cert'])
private_key = urlparse(args['private_key'])

s3 = boto3.client('s3')
s3.download_file(client_cert.netloc, client_cert.path.lstrip('/'),  client_cert.path.split('/')[-1])
s3.download_file(private_key.netloc, private_key.path.lstrip('/'),  private_key.path.split('/')[-1])

# Fetch bootstrap servers from connection
args = getResolvedOptions(sys.argv, ['connection-names'])
if ',' in args['connection_names']:
    raise ValueError("Choose only one connection name in the job details tab!")
glue_client = boto3.client('glue')
response = glue_client.get_connection(Name=args['connection_names'], HidePassword=True)
bootstrapServers = response['Connection']['ConnectionProperties']['KAFKA_BOOTSTRAP_SERVERS']

# Create topic and push messages 
admin_client = KafkaAdminClient(bootstrap_servers= bootstrapServers, security_protocol= 'SSL', ssl_certfile= client_cert.path.split('/')[-1], ssl_keyfile= private_key.path.split('/')[-1])
try:
    admin_client.create_topics(new_topics=[NewTopic(name=TOPIC, num_partitions=1, replication_factor=1)], validate_only=False)
except TopicAlreadyExistsError:
    # Topic already exists
    pass
admin_client.close()

# Generate JSON messages for the new topic
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers=bootstrapServers, security_protocol='SSL', 
                         ssl_check_hostname=True, ssl_certfile= client_cert.path.split('/')[-1], ssl_keyfile= private_key.path.split('/')[-1])
                         
for i in range(1200):
    _event = {
        "deviceId": str(uuid.uuid4()),
        "patientId": "PI" + str(random.randint(1,15)).rjust(5, '0'),
        "temperature": round(random.uniform(32.1, 40.9), 1),
        "eventTime": str(datetime.datetime.now())
    }
    producer.send(TOPIC, _event)
    time.sleep(3)
    
producer.close()
  1. On the Job details tab, provide a name for your job, such as Kafka-msk-producer.
  2. Choose an IAM role. If you don’t have one, create one following the instructions in Configuring IAM permissions for AWS Glue.
  3. Under Advanced properties, for Connections, choose the Kafka-connection connection you created.
  4. Under Job parameters, add the following parameters and values:
    1. Key: --additional-python-modules, value: kafka-python.
    2. Key: --client-cert, value: s3://aws-glue-assets-11111111222222-us-east-1/certs/client-cert.pem. Replace with your client-cert.pem Amazon S3 location from earlier.
    3. Key: --private-key, value: s3://aws-glue-assets-11111111222222-us-east-1/certs/private-key.pem. Replace with your private-key.pem Amazon S3 location from earlier.
      AWS Glue Job parameters
  5. Save and run the job.

You can confirm that the job run status is Running on the Runs tab.

At this point, we have successfully created a Python shell job to simulate the thermometers sending temperature readings to the monitoring application. The job will run for approximately 1 hour and push 1,200 records to Amazon MSK.

Alternatively, you can replace the Python shell job with a Scala ETL job to act as a producer to send messages to the MSK cluster. In this case, use the JKS file for authentication using ssl.keystore.type=JKS. If you’re using PEM format keys, the current version of Kafka clients libraries (2.4.1) installed in AWS Glue version 4 don’t yet support authentication through certificates in PEM format (as of this writing).

Create an AWS Glue Streaming job to consume and process the messages

You can now create an AWS Glue ETL job to consume and process the messages in the Kafka topic. AWS Glue will automatically infer the schema from the files. Complete the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. Choose Visual ETL to author a new job.
  3. For Sources, choose Apache Kafka.
  4. For Connection name, choose the node and connection name you created earlier.
  5. For Topic name, enter the topic name (example_topic) you created earlier.
  6. Leave the rest of the options as default.

Kafka data source

  1. Add a new target node called Amazon S3 to store the output Parquet files generated from the streaming data.
  2. Choose Parquet as the data format and provide an S3 output location for the generated files.
  3. Select the option to allow AWS Glue to create a table in the Data Catalog and provide the database and table names.

S3 Output node

  1. On the job details tab, provide the following options:
    1. For the requested number of workers, enter 2.
    2. For IAM Role, choose an IAM role with permissions to read and write to the S3 output location.
    3. For Job timeout, enter 60 (for the job to stop after 60 minutes).
    4. Under Advanced properties, for Connections, choose the connection you created.
  2. Save and run the job.

You can confirm the S3 output location for new Parquet files created under the prefixes s3://<output-location>/ingest_year=XXXX/ingest_month=XX/ingest_day=XX/ingest_hour=XX/.

At this point, you have created a streaming job to process events from Amazon MSK and store the JSON formatted records as Parquet files in Amazon S3. AWS Glue streaming jobs are meant to be running continuously to process streaming data. We have set the timeout to stop the job after 60 minutes. You can also stop the job manually after the records have been processed to Amazon S3.

Analyze the data in Athena

Going back to our example use case, you can run the following query in Athena to monitor and track the hourly average temperature readings for patients that exceed the normal thresholds (36.1–37.2°C):

SELECT
date_format(parse_datetime(eventTime, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), '%h %p') hour,
patientId,
round(avg(temperature), 1) average_temperature,
count(temperature) readings
FROM "default"."devices_data"
GROUP BY 1, 2
HAVING avg(temperature) > 37.2 or avg(temperature) < 36.1
ORDER BY 2, 1 DESC

Amazon Athena Console

Run the query multiple times and observe how the average_temperature and the number of readings changes with new incoming data from the AWS Glue Streaming job. In our example scenario, healthcare workers can use this information to identify patients who are experiencing consistent high or low body temperatures and give the required attention.

At this point, we have successfully created and ingested streaming data to our MSK cluster using mutual TLS authentication. We only needed the certificates generated by AWS Private CA to authenticate our AWS Glue clients to the MSK cluster and process the streaming data with an AWS Glue Streaming job. Finally, we used Athena to visualize the data and observed how the data changes in near real time.

Clean up

To clean up the resources created in this post, complete the following steps:

  1. Delete the private CA you created.
  2. Delete the MSK cluster you created.
  3. Delete the AWS Glue connection you created.
  4. Stop the jobs if they are still running and delete the jobs you created.
  5. If you used the CloudFormation stack provided in the prerequisites, delete the CloudFormation stack to delete the VPC and other networking components.

Conclusion

This post demonstrated how you can use AWS Glue to consume, process, and store streaming data for Amazon MSK using mutual TLS authentication. AWS Glue Streaming automatically infers the schema and creates a table in the Data Catalog. You can then query the table using other data analysis tools like Athena, Amazon Redshift, and Amazon QuickSight to provide insights into the streaming data.

Try out the solution for yourself, and let us know your questions and feedback in the comments section.


About the Authors

Edward Okemwa OndariEdward Okemwa is a Big Data Cloud Support Engineer (ETL) at AWS Nairobi specializing in AWS Glue and Amazon Athena. He is dedicated to providing customers with technical guidance and resolving issues related to processing and analyzing large volumes of data. In his free time, he enjoys singing choral music and playing football.

Edward Okemwa OndariEmmanuel Mashandudze is a Senior Big Data Cloud Engineer specializing in AWS Glue. He collaborates with product teams to help customers efficiently transform data in the cloud. He helps customers design and implements robust data pipelines. Outside of work, Emmanuel is an avid marathon runner, sports enthusiast and enjoys creating memories with his family.

Improve Apache Kafka scalability and resiliency using Amazon MSK tiered storage

Post Syndicated from Sai Maddali original https://aws.amazon.com/blogs/big-data/improve-apache-kafka-scalability-and-resiliency-using-amazon-msk-tiered-storage/

Since the launch of tiered storage for Amazon Managed Streaming for Apache Kafka (Amazon MSK), customers have embraced this feature for its ability to optimize storage costs and improve performance. In previous posts, we explored the inner workings of Kafka, maximized the potential of Amazon MSK, and delved into the intricacies of Amazon MSK tiered storage. In this post, we deep dive into how tiered storage helps with faster broker recovery and quicker partition migrations, facilitating faster load balancing and broker scaling.

Apache Kafka availability

Apache Kafka is a distributed log service designed to provide high availability and fault tolerance. At its core, Kafka employs several mechanisms to provide reliable data delivery and resilience against failures:

  • Kafka replication – Kafka organizes data into topics, which are further divided into partitions. Each partition is replicated across multiple brokers, with one broker acting as the leader and the others as followers. If the leader broker fails, one of the follower brokers is automatically elected as the new leader, providing continuous data availability. The replication factor determines the number of replicas for each partition. Kafka maintains a list of in-sync replicas (ISRs) for each partition, which are the replicas that are up to date with the leader.
  • Producer acknowledgments – Kafka producers can specify the required acknowledgment level for write operations. This makes sure the data is durably persisted on the configured number of replicas before the producer receives an acknowledgment, reducing the risk of data loss.
  • Consumer group rebalancing – Kafka consumers are organized into consumer groups, where each consumer in the group is responsible for consuming a subset of the partitions. If a consumer fails, the partitions it was consuming are automatically reassigned to the remaining consumers in the group, providing continuous data consumption.
  • Zookeeper or KRaft for cluster coordination – Kafka relies on Apache ZooKeeper or KRaft for cluster coordination and metadata management. It maintains information about brokers, topics, partitions, and consumer offsets, enabling Kafka to recover from failures and maintain a consistent state across the cluster.

Kafka’s storage architecture and its impact on availability and resiliency

Although Kafka provides robust fault-tolerance mechanisms, in the traditional Kafka architecture, brokers store data locally on their attached storage volumes. This tight coupling of storage and compute resources can lead to several issues, impacting availability and resiliency of the cluster:

  • Slow broker recovery – When a broker fails, the recovery process involves transferring data from the remaining replicas to the new broker. This data transfer can be slow, especially for large data volumes, leading to prolonged periods of reduced availability and increased recovery times.
  • Inefficient load balancing – Load balancing in Kafka involves moving partitions between brokers to distribute the load evenly. However, this process can be resource-intensive and time-consuming, because it requires transferring large amounts of data between brokers.
  • Scaling limitations – Scaling a Kafka cluster traditionally involves adding new brokers and rebalancing partitions across the expanded set of brokers. This process can be disruptive and time-consuming, especially for large clusters with high data volumes.

How Amazon MSK tiered storage improves availability and resiliency

Amazon MSK offers tiered storage, a feature that allows configuring local and remote tiers. This greatly decouples compute and storage resources and thereby addresses the aforementioned challenges, improving availability and resiliency of Kafka clusters. You can benefit from the following:

  • Faster broker recovery – With tiered storage, data automatically moves from the faster Amazon Elastic Block Store (Amazon EBS) volumes to the more cost-effective storage tier over time. New messages are initially written to Amazon EBS for fast performance. Based on your local data retention policy, Amazon MSK transparently transitions that data to tiered storage. This frees up space on the EBS volumes for new messages. When broker fails and recovers either due to node or volume failure, the catch-up is faster because it only needs to catch up data stored on the local tier from the leader.
  • Efficient load balancing – Load balancing in Amazon MSK with tiered storage is more efficient because there is less data to move while reassigning partition. This process is faster and less resource-intensive, enabling more frequent and seamless load balancing operations.
  • Faster scaling – Scaling an MSK cluster with tiered storage is a seamless process. New brokers can be added to the cluster without the need for a large amount of data transfer and longer time for partition rebalancing. The new brokers can start serving traffic much faster, because the catch-up process takes less time, improving the overall cluster throughput and reducing downtime during scaling operations.

As shown in the following figure, MSK brokers and EBS volumes are tightly coupled. On a three-AZ deployed cluster, when you create a topic with replication factor three, Amazon MSK spreads those three replicas across all three Availability Zones and the EBS volumes attached with that broker store all the topic data spread across three Availability Zones. If you need to move a partition from one broker to another, Amazon MSK needs to move all the segments (both active and closed) from the existing broker to the new brokers, as illustrated in the following figure.

However, when you enable tiered storage for that topic, Amazon MSK transparently moves all closed segments for a topic from EBS volumes to tiered storage. That storage provides the built-in capability for durability and high availability with virtually unlimited storage capacity. With closed segments moved to tiered storage and only active segments on the local volume, your local storage footprint remains minimal regardless of topic size. If you need to move the partition to a new broker, the data movement is very minimal across the brokers. The following figure illustrates this updated configuration.

Amazon MSK tiered storage addresses the challenges posed by Kafka’s traditional storage architecture, enabling faster broker recovery, efficient load balancing, and seamless scaling, thereby improving availability and resiliency of your cluster. To learn more about the core components of Amazon MSK tiered storage, refer to Deep dive on Amazon MSK tiered storage.

A real-world test

We hope that you now understand how Amazon MSK tiered storage can improve your Kafka resiliency and availability. To test it, we created a three-node cluster with the new m7g instance type. We created a topic with a replication factor of three and without using tiered storage. Using the Kafka performance tool, we ingested 300 GB of data into the topic. Next, we added three new brokers to the cluster. Because Amazon MSK doesn’t automatically move partitions to these three new brokers, they will remain idle until we rebalance the partitions across all six brokers.

Let’s consider a scenario where we need to move all the partitions from the existing three brokers to the three new brokers. We used the kafka-reassign-partitions tool to move the partitions from the existing three brokers to the newly added three brokers. During this partition movement operation, we observed that the CPU usage was high, even though we weren’t performing any other operations on the cluster. This indicates that the high CPU usage was due to the data replication to the new brokers. As shown in the following metrics, the partition movement operation from broker 1 to broker 2 took approximately 75 minutes to complete.

Additionally, during this period, CPU utilization was elevated.

After completing the test, we enabled tiered storage on the topic with local.retention.ms=3600000 (1 hour) and retention.ms=31536000000. We continuously monitored the RemoteCopyBytesPerSec metrics to determine when the data migration to tiered storage was complete. After 6 hours, we observed zero activity on the RemoteCopyBytesPerSec metrics, indicating that all closed segments had been successfully moved to tiered storage. For instructions to enable tiered storage on an existing topic, refer to Enabling and disabling tiered storage on an existing topic.

We then performed the same test again, moving partitions to three empty brokers. This time, the partition movement operation was completed in just under 15 minutes, with no noticeable CPU usage, as shown in the following metrics. This is because, with tiered storage enabled, all the data has already been moved to the tiered storage, and we only have the active segment in the EBS volume. The partition movement operation is only moving the small active segment, which is why it takes less time and minimal CPU to complete the operation.

Conclusion

In this post, we explored how Amazon MSK tiered storage can significantly improve the scalability and resilience of Kafka. By automatically moving older data to the cost-effective tiered storage, Amazon MSK reduces the amount of data that needs to be managed on the local EBS volumes. This dramatically improves the speed and efficiency of critical Kafka operations like broker recovery, leader election, and partition reassignment. As demonstrated in the test scenario, enabling tiered storage reduced the time taken to move partitions between brokers from 75 minutes to just under 15 minutes, with minimal CPU impact. This enhanced the responsiveness and self-healing ability of the Kafka cluster, which is crucial for maintaining reliable, high-performance operations, even as data volumes continue to grow.

If you’re running Kafka and facing challenges with scalability or resilience, we highly recommend using Amazon MSK with the tiered storage feature. By taking advantage of this powerful capability, you can unlock the true scalability of Kafka and make sure your mission-critical applications can keep pace with ever-increasing data demands.

To get started, refer to Enabling and disabling tiered storage on an existing topic. Additionally, check out Automated deployment template of Cruise Control for Amazon MSK for effortlessly rebalancing your workload.


About the Authors

Sai Maddali is a Senior Manager Product Management at AWS who leads the product team for Amazon MSK. He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for real time tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Building a scalable streaming data platform that enables real-time and batch analytics of electric vehicles on AWS

Post Syndicated from Ayush Agrawal original https://aws.amazon.com/blogs/big-data/building-a-scalable-streaming-data-platform-that-enables-real-time-and-batch-analytics-of-electric-vehicles-on-aws/

The automobile industry has undergone a remarkable transformation because of the increasing adoption of electric vehicles (EVs). EVs, known for their sustainability and eco-friendliness, are paving the way for a new era in transportation. As environmental concerns and the push for greener technologies have gained momentum, the adoption of EVs has surged, promising to reshape our mobility landscape.

The surge in EVs brings with it a profound need for data acquisition and analysis to optimize their performance, reliability, and efficiency. In the rapidly evolving EV industry, the ability to harness, process, and derive insights from the massive volume of data generated by EVs has become essential for manufacturers, service providers, and researchers alike.

As the EV market is expanding with many new and incumbent players trying to capture the market, the major differentiating factor will be the performance of the vehicles.

Modern EVs are equipped with an array of sensors and systems that continuously monitor various aspects of their operation including parameters such as voltage, temperature, vibration, speed, and so on. From battery management to motor performance, these data-rich machines provide a wealth of information that, when effectively captured and analyzed, can revolutionize vehicle design, enhance safety, and optimize energy consumption. The data can be used to do predictive maintenance, device anomaly detection, real-time customer alerts, remote device management, and monitoring.

However, managing this deluge of data isn’t without its challenges. As the adoption of EVs accelerates, the need for robust data pipelines capable of collecting, storing, and processing data from an exponentially growing number of vehicles becomes more pronounced. Moreover, the granularity of data generated by each vehicle has increased significantly, making it essential to efficiently handle the ever-increasing number of data points. The challenges include not only the technical intricacies of data management but also concerns related to data security, privacy, and compliance with evolving regulations.

In this blog post, we delve into the intricacies of building a reliable data analytics pipeline that can scale to accommodate millions of vehicles, each generating hundreds of metrics every second using Amazon OpenSearch Ingestion. We also provide guidelines and sample configurations to help you implement a solution.

Of the prerequisites that follow, the IOT topic rule and the Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster can be set up by following How to integrate AWS IoT Core with Amazon MSK. The steps to create an Amazon OpenSearch Service cluster are available in Creating and managing Amazon OpenSearch Service domains.

Prerequisites

Before you begin the implementing the solution, you need the following:

  • IOT topic rule
  • Amazon MSK Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM) cluster
  • Amazon OpenSearch Service domain

Solution overview

The following architecture diagram provides a scalable and fully managed modern data streaming platform. The architecture uses Amazon OpenSearch Ingestion to stream data into OpenSearch Service and Amazon Simple Storage Service (Amazon S3) to store the data. The data in OpenSearch powers real-time dashboards. The data can also be used to notify customers of any failures occurring on the vehicle (see Configuring alerts in Amazon OpenSearch Service). The data in Amazon S3 is used for business intelligence and long-term storage.

Architecture diagram

In the following sections, we focus on the following three critical pieces of the architecture in depth:

1. Amazon MSK to OpenSearch ingestion pipeline

2. Amazon OpenSearch Ingestion pipeline to OpenSearch Service

3. Amazon OpenSearch Ingestion to Amazon S3

Solution Walkthrough

Step 1: MSK to Amazon OpenSearch Ingestion pipeline

Because each electric vehicle streams massive volumes of data to Amazon MSK clusters through AWS IoT Core, making sense of this data avalanche is critical. OpenSearch Ingestion provides a fully managed serverless integration to tap into these data streams.

The Amazon MSK source in OpenSearch Ingestion uses Kafka’s Consumer API to read records from one or more MSK topics. The MSK source in OpenSearch Ingestion seamlessly connects to MSK to ingest the streaming data into OpenSearch Ingestion’s processing pipeline.

The following snippet illustrates the pipeline configuration for an OpenSearch Ingestion pipeline used to ingest data from an MSK cluster.

While creating an OpenSearch Ingestion pipeline, add the following snippet in the Pipeline configuration section.

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                  
      topics: 
         - name: "ev-device-topic " 
           group_id: "opensearch-consumer" 
           serde_format: json                 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam:: ::<<account-id>>:role/opensearch-pipeline-Role"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:<<region>>:<<account-id>>:cluster/<<name>>/<<id>>" 

When configuring Amazon MSK and OpenSearch Ingestion, it’s essential to establish an optimal relationship between the number of partitions in your Kafka topics and the number of OpenSearch Compute Units (OCUs) allocated to your ingestion pipelines. This optimal configuration ensures efficient data processing and maximizes throughput. You can read more about it in Configure recommended compute units (OCUs) for the Amazon MSK pipeline.

Step 2: OpenSearch Ingestion pipeline to OpenSearch Service

OpenSearch Ingestion offers a direct method for streaming EV data into OpenSearch. The OpenSearch sink plugin channels data from multiple sources directly into the OpenSearch domain. Instead of manually provisioning the pipeline, you define the capacity for your pipeline using OCUs. Each OCU provides 6 GB of memory and two virtual CPUs. To use OpenSearch Ingestion auto-scaling optimally, it’s essential to configure the maximum number of OCUs for a pipeline based on the number of partitions in the topics being ingested. If a topic has a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), it’s recommended to configure the pipeline with a maximum of 1–96 OCUs. This way, the pipeline can automatically scale up or down within this range as needed. However, if a topic has a low number of partitions (for example, fewer than 96), it’s advisable to set the maximum number of OCUs to be equal to the number of partitions. This approach ensures that each partition is processed by a dedicated OCU enabling parallel processing and optimal performance. In scenarios where a pipeline ingests data from multiple topics, the topic with the highest number of partitions should be used as a reference to configure the maximum OCUs. Additionally, if higher throughput is required, you can create another pipeline with a new set of OCUs for the same topic and consumer group, enabling near-linear scalability.

OpenSearch Ingestion provides several pre-defined configuration blueprints that can help you quickly build your ingestion pipeline on AWS

The following snippet illustrates pipeline configuration for an OpenSearch Ingestion pipeline using OpenSearch as a SINK with a dead letter queue (DLQ) to Amazon S3. When a pipeline encounters write errors, it creates DLQ objects in the configured S3 bucket. DLQ objects exist within a JSON file as an array of failed events.

sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<domain-name>>.<<region>>.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam:: <<account-id>>:role/<<role-name>>"

Step 3: OpenSearch Ingestion to Amazon S3

OpenSearch Ingestion offers a built-in sink for loading streaming data directly into S3. The service can compress, partition, and optimize the data for cost-effective storage and analytics in Amazon S3. Data loaded into S3 can be partitioned for easier query isolation and lifecycle management. Partitions can be based on vehicle ID, date, geographic region, or other dimensions as needed for your queries.

The following snippet illustrates how we’ve partitioned and stored EV data in Amazon S3.

- s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "evbucket"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

The pipeline can be created following the steps in Creating Amazon OpenSearch Ingestion pipelines.

The following is the complete pipeline configuration, combining the configuration of all three steps. Update the Amazon Resource Names (ARNs), AWS Region, Open Search Service domain endpoint, and S3 names as needed.

The entire OpenSearch Ingestion pipeline configuration can be directly copied into the ‘Pipeline configuration’ field in the AWS Management Console while creating the OpenSearch Ingestion pipeline

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true           # Default is false  
      topics: 
         - name: "<<msk-topic-name>>" 
           group_id: "opensearch-consumer" 
           serde_format: json        
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-east-1:<<account-id>>:cluster/<<cluster-name>>/<<cluster-id>>" 
  processor:
      - parse_json:
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<opensearch-service-domain-endpoint>>.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
      - s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "<<bucket-name>>"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

Real-time analytics

After the data is available in OpenSearch Service, you can build real-time monitoring and notifications. OpenSearch Service has robust support for multiple notification channels, allowing you to receive alerts through services like Slack, Chime, custom webhooks, Microsoft Teams, email, and Amazon Simple Notification Service (Amazon SNS).

The following screenshot illustrates supported notification channels in OpenSearch Service.

The notification feature in OpenSearch Service allows you to create monitors that will watch for certain conditions or changes in your data and launch alerts, such as monitoring vehicle telemetry data and launching alerts for issues like battery degradation or abnormal energy consumption. For example, you can create a monitor that analyzes battery capacity over time and notifies the on-call team using Slack if capacity drops below expected degradation curves in a significant number of vehicles. This could indicate a potential manufacturing defect requiring investigation.

In addition to notifications, OpenSearch Service makes it easy to build real-time dashboards to visually track metrics across your fleet of vehicles. You can ingest vehicle telemetry data like location, speed, fuel consumption, and so on, and visualize it on maps, charts, and gauges. Dashboards can provide real-time visibility into vehicle health and performance.

The following screenshot illustrates creating a sample dashboard on OpenSearch Service

Opensearch Dashboard

A key benefit of OpenSearch Service is its ability to handle high sustained ingestion and query rates with millisecond latencies. It distributes incoming vehicle data across data nodes in a cluster for parallel processing. This allows OpenSearch to scale out to handle very large fleets while still delivering the real-time performance needed for operational visibility and alerting.

Batch analytics

After the data is available in Amazon S3, you can build a secure data lake to power a variety of analytics use cases deriving powerful insights. As an immutable store, new data is continually stored in S3 while existing data remains unaltered. This serves as a single source of truth for downstream analytics.

For business intelligence and reporting, you can analyze trends, identify insights, and create rich visualizations powered by the data lake. You can use Amazon QuickSight to build and share dashboards without needing to set up servers or infrastructure. Here’s an example of a Quicksight dashboard for IoT device data. For example, you can use a dashboard to gain insights from historical data that can help with better vehicle and battery design.

The Amazon Quicksight public gallery shows examples of dashboards across different domains.

You should consider Amazon OpenSearch dashboards for your operational day-to-day use cases to identify issues and alert in near real time whereas Amazon Quicksight should be used to analyze big data stored in a lake house and generate actionable insights from them.

Clean up

Delete the OpenSearch pipeline and Amazon MSK cluster to stop incurring costs on these services.

Conclusion

In this post, you learned how Amazon MSK, OpenSearch Ingestion, OpenSearch Services, and Amazon S3 can be integrated to ingest, process, store, analyze, and act on endless streams of EV data efficiently.

With OpenSearch Ingestion as the integration layer between streams and storage, the entire pipeline scales up and down automatically based on demand. No more complex cluster management or lost data from bursts in streams.

See Amazon OpenSearch Ingestion to learn more.


About the authors

Ayush Agrawal is a Startups Solutions Architect from Gurugram, India with 11 years of experience in Cloud Computing. With a keen interest in AI, ML, and Cloud Security, Ayush is dedicated to helping startups navigate and solve complex architectural challenges. His passion for technology drives him to constantly explore new tools and innovations. When he’s not architecting solutions, you’ll find Ayush diving into the latest tech trends, always eager to push the boundaries of what’s possible.

Fraser SequeiraFraser Sequeira is a Solutions Architect with AWS based in Mumbai, India. In his role at AWS, Fraser works closely with startups to design and build cloud-native solutions on AWS, with a focus on analytics and streaming workloads. With over 10 years of experience in cloud computing, Fraser has deep expertise in big data, real-time analytics, and building event-driven architecture on AWS.

Best practices for running production workloads using Amazon MSK tiered storage

Post Syndicated from Nagarjuna Koduru original https://aws.amazon.com/blogs/big-data/best-practices-for-running-production-workloads-using-amazon-msk-tiered-storage/

In the second post of the series, we discussed some core concepts of the Amazon Managed Streaming for Apache Kafka (Amazon MSK) tiered storage feature and explained how read and write operations work in a tiered storage enabled cluster.

This post focuses on how to properly size your MSK tiered storage cluster, which metrics to monitor, and the best practices to consider when running a production workload.

Sizing a tiered storage cluster

Sizing and capacity planning are critical aspects of designing and operating a distributed system. It involves estimating the resources required to handle the expected workload and ensure the system can scale and perform efficiently. In the context of a distributed system like Kafka, sizing involves determining the number of brokers, the number of partitions, and the amount of storage and memory required for each broker. Capacity planning involves estimating the expected workload, including the number of producers, consumers, and the throughput requirements.

Let’s assume a scenario where the producers are evenly balancing the load between brokers, brokers host the same number of partitions, there are enough partitions to ingest the throughput, and consumers consume directly from the tip of the stream. The brokers are receiving the same load and doing the same work. We therefore just focus on Broker1 in the following diagram of a data flow within a cluster.

Theoretical sustained throughput with tiered storage

We derive the following formula for the theoretical sustained throughput limit tcluster given the infrastructure characteristics of a specific cluster with tiered storage enabled on all topics:

max(tcluster) <= min {

max(tstorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tNetworkAttachedStorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tEC2network) * #brokers/(#tip_consumer_groups + r + #remote_consumer_groups)

}

This formula contains the following values:

  • tCluster – Total ingress produce throughput sent to the cluster
  • tStorage – Storage volume throughput supported
  • tNetworkAttachedStorage – Network attached storage to the Amazon Elastic Compute Cloud (Amazon EC2) instance network throughput
  • tEC2network – EC2 instance network bandwidth
  • non_tip_local_consumer_groups – Number of consumer groups reading from network attached storage at ingress rate
  • tip_consumer_groups – Number of consumer groups reading from page cache at ingress rate
  • remote_consumer_groups – Number of consumer groups reading from remote tier at ingress rate
  • r – Replication factor of the Kafka topic

Note that in the first post , we didn’t differentiate between different types of consumer groups. With tiered storage, some consumer groups might be consuming from remote. These remote consumers might ultimately catch up and start reading from local storage and finally catch up to the tip. Therefore, we model these three different consumer groups in the equation to account for their impact on infrastructure usage. In the following sections, we provide derivations of this equation.

Derivation of throughput limit from network attached storage bottleneck

Because Amazon MSK uses network attached storage for local storage, both network attached storage throughput and bandwidth should be accounted for. Total throughput bandwidth requirement is a combination of ingress and egress from the network attached storage backend. The ingress throughput of the storage backend depends on the data that producers are sending directly to the broker plus the replication traffic the broker is receiving from its peers. With tiered storage, Amazon MSK also uses network attached storage to read and upload rolled segments to the remote tier. This doesn’t come from the page cache and needs to be accounted for at the rate of ingress. Any non-tip consumers at ingress rate also consume network attached storage throughput and are accounted for in the equation. Therefore, max throughput is bounded by network attached storage based on the following equation:

max(tcluster) <= min {

max(tstorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tNetworkAttachedStorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups)

}

Derivation of throughput limit from EC2 network bottleneck

Unlike network attached storage, the network is full duplex, meaning that if the EC2 instance supports X MB/s network, it supports X MB/s in and X MB/s out. The network throughput requirement depends on the data that producers are sending directly to the broker plus the replication traffic the broker is receiving from its peers. It also includes the replication traffic out and consumers traffic out from this broker. With tiered storage, we need to reserve additional ingress rate for uploads to the remote tier and support reads from the remote tier for consumer groups reading from remote offset. Both of these add to the network out requirements, which is bounded by the following equation:

max(tcluster) <= min {

max(tEC2network) * #brokers/(#tip_consumer_groups + r + #remote_consumer_groups)

}

Combining the second and third equations provides the first formula, which determines the max throughput bound based on broker infrastructure limits.

How to apply this formula to size your cluster

With this formula, you can calculate the upper bound for throughput you can achieve for your workloads. In practice, the workloads may be bottlenecked by other broker resources like CPU, memory, and disk, so it’s important to do load tests. To simplify your sizing estimate, you can use the MSK Sizing and Pricing spreadsheet (for more information, refer to Best Practices).

Let’s consider a workload where your ingress and egress rates are 20MB/s, with a replication factor of 3, and you want to retain data in your Kafka cluster for 7 days. This workload requires 6x m5.large brokers, with 34.6 TB local storage, which will cost $6,034.00 monthly (estimated). But if you use tiered storage for the same workload with local retention of 4 hours and overall data retention of 7 days, it requires 3x m5.large brokers, with 0.8 TB local storage and 12 TB of tiered storage, which will cost $1,958.00 monthly(estimated). If you want to read all the historic data one time, it will cost $17.00 ($0.0015 per GB retrieval cost). In this example with tiered storage, you save around 67.6% of your overall cost.

We recommend planning for Availability Zone redundancy in production workloads considering the broker safety factor in the calculation, which is 1 in this example. We also recommend running performance tests to ensure CPU is less than 70% on your brokers at the target throughput derived based on this formula or Excel calculation. In addition, you should also use the per-broker partition limit in your calculation to account for other bottlenecks based on the partition count.

The following figure shows an example of Amazon MSK sizing.

Monitoring and continuous optimization for a tiered storage enabled cluster

In previous sections, we emphasized the importance of determining the correct initial cluster size. However, it’s essential to recognize that sizing efforts shouldn’t cease after the initial setup. Continual monitoring and evaluation of your workload are necessary to ensure that the broker size remains appropriate. Amazon MSK offers metric monitoring and alarm capabilities to provide visibility into cluster performance. In the post Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost, we discussed key metrics to focus on. In this post, we delve deeper into additional metrics related to tiered storage and other optimization considerations for a tiered storage enabled cluster:

  • TotalTierBytesLag indicates the total number of bytes of the data that is eligible for tiering on the broker and hasn’t been transferred to the tiered storage yet. This metric shows the efficiency of upstream data transfer. As the lag increases, the amount of data that hasn’t yet persisted in the tiered storage increases. The impact is the network attached storage disk may fill up, which you should monitor. You should also monitor this metric and generate an alarm if the lag is continuously growing and you see increased network attached storage usage. If the tiering lag is too high, you can reduce ingress traffic to allow the tiered storage to catch up.
  • Although tiered storage provides on-demand, virtually unlimited storage capacity without provisioning any additional resources, you should still do proper capacity planning for your local storage, configure alerts for KafkaDataLogsDiskUsed metrics, and have a buffer on network attached storage capacity planning. Monitor this metric and generate an alarm if the metric reaches or exceeds 60%. For a tiered storage enabled topic, configure local retention accordingly to reduce network attached storage usage.
  • The theoretical max ingress we can achieve on an MSK cluster with tiered storage is 20–25% lower than a non-tiered storage enabled cluster due to additional network attached storage bandwidth required to transparently move data from the local to the remote tier. Plan for the capacity (brokers, storage, gp2 vs. gp3) using the formula we discussed to derive max ingress for your cluster based on the number of consumer groups and load test your workloads to identify the sustained throughput limit. Exercising excess ingress to the cluster or egress from the remote tier above the planned capacity can impact your tip produce or consume traffic.
  • The gp3 volume type offers SSD-performance at a 20% lower cost per GB than gp2 volumes. Furthermore, by decoupling storage performance from capacity, you can easily provision higher IOPS and throughput without the need to provision additional block storage capacity. Therefore, we recommend using gp3 for a tiered storage enabled cluster by specifying provisioned throughput for larger instance types.
  • If you specified a custom cluster configuration, check the num.replica.fetchers, num.io.threads, and num.network.threads configuration parameters on your cluster. We recommend leaving it as the default Amazon MSK configuration unless you have specific use case.

This is only the most relevant guidance related to tiered storage. For further guidance on monitoring and best practices of your cluster, refer to Best practices.

Conclusion

You should now have a solid understanding of how Amazon MSK tiered storage works and the best practices to consider for your production workload when utilizing this cost-effective storage tier. With tiered storage, we remove the compute and storage coupling, which can benefit workloads that need larger disk capacity and are underutilizing compute just to provision storage.

We are eager to learn about your current approach in building real-time data streaming applications. If you’re starting your journey with Amazon MSK tiered storage, we suggest following the comprehensive Getting Started guide available in Tiered storage. This guide provides detailed instructions and practical steps to help you gain hands-on experience and effectively take advantage of the benefits of tiered storage for your streaming applications.

If you have any questions or feedback, please leave them in the comments section.


About the authors

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for realtime tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.