Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

Deep dive on Amazon MSK tiered storage

Post Syndicated from Nagarjuna Koduru original https://aws.amazon.com/blogs/big-data/deep-dive-on-amazon-msk-tiered-storage/

In the first post of the series, we described some core concepts of Apache Kafka cluster sizing, the best practices for optimizing the performance, and the cost of your Kafka workload.

This post explains how the underlying infrastructure affects Kafka performance when you use Amazon Managed Streaming for Apache Kafka (Amazon MSK) tiered storage. We delve deep into the core components of Amazon MSK tiered storage and address questions such as: How does read and write work in a tiered storage-enabled cluster?

In the subsequent post, we’ll discuss the latency impact, recommended metrics to monitor, and conclude with guidance on key considerations in a production tiered storage-enabled cluster.

How Amazon MSK tiered storage works

To understand the internal architecture of Amazon MSK tiered storage, let’s first discuss some fundamentals of Kafka topics, partitions, and how read and write works.

A logical stream of data in Kafka is referred to as a topic. A topic is broken down into partitions, which are physical entities used to distribute load across multiple server instances (brokers) that serve reads and writes.

A partition––also designated as topic-partition as it’s relative to a given topic––can be replicated, which means there are several copies of the data in the group of brokers forming a cluster. Each copy is called a replica or a log. One of these replicas, called the leader, serves as the reference. It’s where the ingress traffic is accepted for the topic-partition.

A log is an append-only sequence of log segments. Log segments contain Kafka data records, which are added to the end of the log or the active segment.

Log segments are stored as regular files. On the file system, Kafka identifies the file of a log segment by putting the offset of the first data record it contains in its file name. The offset of a record is simply a monotonic index assigned to a record by Kafka when it’s appended to the log. The segment files of a log are stored in a directory dedicated to the associated topic-partition.

When Kafka reads data from an arbitrary offset, it first looks up the segment that contains that offset from the segment file name, then the specific record location inside that file using an offset index. Offset indexes are materialized on a dedicated file stored with segment files in the topic-partition directory. There is also timeindex to seek by timestamp.

For every partition, Kafka also stores a journal of leadership changes in a file called leader-epoch-checkpoint. This file contains mapping of leader epoch to startOffset of the epoch. Whenever a new leader is elected for a partition by the Kafka controller, this data is updated and propagated to all brokers. A leader epoch is a 32-bit, monotonically increasing number representing continuous period of leadership of a single partition. It’s marked on all the Kafka records. The following code is the local storage layout of topic cars and partition 0 containing two segments (0, 35):

$ ls /kafka-cluster/broker-1/data/cars-0/

00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000000035.log
00000000000000000035.index
00000000000000000035.timeindex
leader-epoch-checkpoint

Kafka manages the lifecycle of these segment files. It creates a new one when a new segment needs to be created, for instance, if the current segment reaches its configured max size. It deletes one when the target retention period of the data it contains is reached, or the total maximal size of the log is reached. Data is deleted from the tail of the logs and corresponds to the oldest data of the append-only log of the topic-partition.

KIP-405 or tiered storage in Apache Kafka

The ability to tier data, in other words, transfer data (log, index, timeindex, and leader-epoch-checkpoint) from a local file system to another storage system based on time and size-based retention policies, is a feature built in Apache Kafka as part of KIP-405.

The KIP-405 isn’t in official Kafka version yet. Amazon MSK internally implemented tiered storage functionality on top of official Kafka version 2.8.2. Amazon MSK exposes this functionality on AWS specific 2.8.2.tiered Kafka version. With this feature, you can separate retention settings for local and remote retention. Data in the local tier is retained until the data gets copied to the remote tier even after the local retention expires. Data in the remote tier is retained until the remote retention expires. KIP-405 proposes a pluggable architecture allowing you to plugin custom remote storage and metadata storage backends. The following diagram illustrates the broker three key components.

The components are as follows:

  • RemoteLogManager (RLM) – A new component corresponding to LogManager for the local tier. It delegates copy, fetch, and delete of completed and non-active partition segments to a pluggable RemoteStorageManager implementation and maintains respective remote log segment metadata through pluggable RemoteLogMetadataManager implementation.
  • RemoteStorageManager (RSM) – A pluggable interface that provides the lifecycle of remote log segments.
  • RemoteLogMetadataManager (RLMM) – A pluggable interface that provides the lifecycle of metadata about remote log segments.

How data is moved to the remote tier for a tiered storage-enabled topic

In a tiered storage-enabled topic, each completed segment for a topic-partition triggers the leader of the partition to copy the data to the remote storage tier. The completed log segment is removed from the local disks when Amazon MSK finishes moving that log segment to the remote tier and after it meets the local retention policy. This frees up local storage space.

Let’s consider a hypothetical scenario: you have a topic with one partition. Prior to enabling tiered storage for this topic, there are three log segments. One of the segments is active and receiving data, and the other two segments are complete.

After you enable tiered storage for this topic with two days of local retention and five days of overall retention, Amazon MSK copies log segment 1 and 2 to tiered storage. Amazon MSK also retains the primary storage copy of segments 1 and 2. The active segment 3 isn’t eligible to copy over to tiered storage yet. In this timeline, none of the retention settings are applied yet for any of the messages in segment 1 and segment 2.

After 2 days, the primary retention settings take effect for the segment 1 and segment 2 that Amazon MSK copied to the tiered storage. Segments 1 and 2 now expire from the local storage. Active segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s an active segment.

After 5 days, overall retention settings take effect, and Amazon MSK clears log segments 1 and 2 from tiered storage. Segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s active.

That’s how the data lifecycle works on a tiered storage-enabled cluster.

Amazon MSK immediately starts moving data to tiered storage as soon as a segment is closed. The local disks are freed up when Amazon MSK finishes moving that log segment to remote tier and after it meets the local retention policy.

How read works in a tiered storage-enabled topic

For any read request, ReplicaManager tries to process the request by sending it to ReadFromLocalLog. And if the process returns offset out of range exception, it delegates the read call to RemoteLogManager to read from tiered storage. On the read path, the RemoteStorageManager starts fetching the data in chunks from remote storage, which means that for the first few bytes, your consumer experiences higher latency, but as the system starts buffering the segment locally, your consumer experiences latency similar to reading from local storage. One of the advantages of this approach is that the data is served instantly from the local buffer if there are multiple consumers reading from the same segment.

If your consumer is configured to read from the closest replica, there might be a possibility that the consumer from a different consumer group reads the same remote segment using a different broker. In that case, they experience the same latency behavior we described previously.

Conclusion

In this post, we discussed the core components of the Amazon MSK tiered storage feature and explained how the data lifecycle works in a cluster enabled with tiered storage. Stay tuned for our upcoming post, in which we delve into the best practices for sizing and running a tiered storage-enabled cluster in production.

We would love to hear how you’re building your real-time data streaming applications today. If you’re just getting started with Amazon MSK tiered storage, we recommend getting hands-on with the guidelines available in the tiered storage documentation.

If you have any questions or feedback, please leave them in the comments section.


About the authors

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for realtime tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Stream data with Amazon MSK Connect using an open-source JDBC connector

Post Syndicated from Manish Virwani original https://aws.amazon.com/blogs/big-data/stream-data-with-amazon-msk-connect-using-an-open-source-jdbc-connector/

Customers are adopting Amazon Managed Service for Apache Kafka (Amazon MSK) as a fast and reliable streaming platform to build their enterprise data hub. In addition to streaming capabilities, setting up Amazon MSK enables organizations to use a pub/sub model for data distribution with loosely coupled and independent components.

To publish and distribute the data between Apache Kafka clusters and other external systems including search indexes, databases, and file systems, you’re required to set up Apache Kafka Connect, which is the open-source component of Apache Kafka framework, to host and run connectors for moving data between various systems. As the number of upstream and downstream applications grow, so does the complexity to manage, scale, and administer the Apache Kafka Connect clusters. To address these scalability and manageability concerns, Amazon MSK Connect provides the functionality to deploy fully managed connectors built for Apache Kafka Connect, with the capability to automatically scale to adjust with the workload changes and pay only for the resources consumed.

In this post, we walk through a solution to stream data from Amazon Relation Database Service (Amazon RDS) for MySQL to an MSK cluster in real time by configuring and deploying a connector using Amazon MSK Connect.

Solution overview

For our use case, an enterprise wants to build a centralized data repository that has multiple producer and consumer applications. To support streaming data from applications with different tools and technologies, Amazon MSK is selected as the streaming platform. One of the primary applications that currently writes data to Amazon RDS for MySQL would require major design changes to publish data to MSK topics and write to the database at the same time. Therefore, to minimize the design changes, this application would continue writing the data to Amazon RDS for MySQL with the additional requirement to synchronize this data with the centralized streaming platform Amazon MSK to enable real-time analytics for multiple downstream consumers.

To solve this use case, we propose the following architecture that uses Amazon MSK Connect, a feature of Amazon MSK, to set up a fully managed Apache Kafka Connect connector for moving data from Amazon RDS for MySQL to an MSK cluster using the open-source JDBC connector from Confluent.

Set up the AWS environment

To set up this solution, you need to create a few AWS resources. The AWS CloudFormation template provided in this post creates all the AWS resources required as prerequisites:

The following table lists the parameters you must provide for the template.

Parameter Name Description Keep Default Value
Stack name Name of CloudFormation stack. No
DBInstanceID Name of RDS for MySQL instance. No
DBName Database name to store sample data for streaming. Yes
DBInstanceClass Instance type for RDS for MySQL instance. No
DBAllocatedStorage Allocated size for DB instance (GiB). No
DBUsername Database user for MySQL database access. No
DBPassword Password for MySQL database access. No
JDBCConnectorPluginBukcetName Bucket for storing MSK Connect connector JAR files and plugin. No
ClientIPCIDR IP address of client machine to connect to EC2 instance. No
EC2KeyPair Key pair to be used in your EC2 instance. This EC2 instance will be used as a proxy to connect from your local machine to the EC2 client instance. No
EC2ClientImageId Latest AMI ID of Amazon Linux 2. You can keep the default value for this post. Yes
VpcCIDR IP range (CIDR notation) for this VPC. No
PrivateSubnetOneCIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. No
PrivateSubnetTwoCIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. No
PrivateSubnetThreeCIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. No
PublicSubnetCIDR IP range (CIDR notation) for the public subnet. No

To launch the CloudFormation stack, choose Launch Stack:

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the resource details.

Validate sample data in the RDS for MySQL instance

To prepare the sample data for this use case, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>

  2. Run the following commands to validate the data has been loaded successfully:
    $ mysql -h <rds_instance_endpoint_name> -u <user_name> -p
    
    MySQL [(none)]> use dms_sample;
    
    MySQL [dms_sample]> select mlb_id, mlb_name, mlb_pos, mlb_team_long, bats, throws from mlb_data limit 5;

Synchronize all tables’ data from Amazon RDS to Amazon MSK

To sync all tables from Amazon RDS to Amazon MSK, create an Amazon MSK Connect managed connector with the following steps:

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane under MSK Connect.
  2. Choose Create custom plugin.
  3. For S3 URI – Custom plugin object, browse to the ZIP file named confluentinc-kafka-connect-jdbc-plugin.zip (created by the CloudFormation template) for the JDBC connector in the S3 bucket bkt-msk-connect-plugins-<aws_account_id>.
  4. For Custom plugin name, enter msk-confluent-jdbc-plugin-v1.
  5. Enter an optional description.
  6. Choose Create custom plugin.

After the custom plugin has been successfully created, it will be available in Active status

  1. Choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the plugin msk-confluent-jdbc-plugin-v1 that you created earlier.
  4. Choose Next.
  5. For Connector name, enter msk-jdbc-connector-rds-to-msk.
  6. Enter an optional description.
  7. For Cluster type, select MSK cluster.
  8. For MSK clusters, select the cluster you created earlier.
  9. For Authentication, choose IAM.
  10. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    ### Provide the configuration properties to connect to source and destination endpoints including authentication
    
    ### mechanism, credentials and task details such as polling interval, source and destination object names, data
    
    ### transfer mode, parallelism
    
    ### Many of these properties are connector and end-point specific, so please review the connector documentation ### for more details
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-
    
    mode=bulk
    
    connection.attempts=1
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

The following table provides a brief summary of all the preceding configuration options.

Configuration Options Description
connector.class JAVA class for the connector
connection.user User name to authenticate with the MySQL endpoint
connection.url JDBC URL identifying the hostname and port number for the MySQL endpoint
connection.password Password to authenticate with the MySQL endpoint
tasks.max Maximum number of tasks to be launched for this connector
poll.interval.ms Time interval in milliseconds between subsequent polls for each table to pull new data
topic.prefix Custom prefix value to append with each table name when creating topics in the MSK cluster
mode The operation mode for each poll, such as bulk, timestamp, incrementing, or timestamp+incrementing
connection.attempts Maximum number of retries for JDBC connection
security.protocol Sets up TLS for encryption
sasl.mechanism Identifies the SASL mechanism to use
ssl.truststore.location Location for storing trusted certificates
ssl.keystore.location Location for storing private keys
sasl.client.callback.handler.class Encapsulates constructing a SigV4 signature based on extracted credentials
sasl.jaas.config Binds the SASL client implementation

  1. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  2. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  3. For Worker configuration, select Use the MSK default configuration.
  4. Under Access permissions, choose the custom IAM role msk-connect-rds-jdbc-MSKConnectServiceIAMRole-* created earlier.
  5. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  6. For Log group, choose the log group msk-jdbc-source-connector created earlier.
  7. Choose Next.
  8. Under Review and Create, validate all the settings and choose Create connector.

After the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

Validate the data

To validate and compare the data, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>

  2. To connect to the MSK cluster with IAM authentication, enter the latest version of the aws-msk-iam-auth JAR file in the class path:
    $ export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar

  3. On the Amazon MSK console, choose Clusters in the navigation pane and choose the cluster MSKConnect-msk-connect-rds-jdbc.
  4. On the Cluster summary page, choose View client information.
  5. In the View client information section, under Bootstrap servers, copy the private endpoint for Authentication type IAM.

  1. Set up additional environment variables for working with the latest version of Apache Kafka installation and connecting to Amazon MSK bootstrap servers, where <bootstrap servers> is the list of bootstrap servers that allow connecting to the MSK cluster with IAM authentication:
    $ export PATH=~/kafka/bin:$PATH
    
    $ cp ~/aws-msk-iam-auth-1.1.0-all.jar ~/kafka/libs/.
    
    $ export BOOTSTRAP_SERVERS=<bootstrap servers>

  2. Set up a config file named client/properties to be used for authentication:
    $ cd /home/ec2-user/kafka/config/
    
    $ vi client.properties
    
    # Sets up TLS for encryption and SASL for authN.
    
    security.protocol = SASL_SSL
    
    # Identifies the SASL mechanism to use.
    
    sasl.mechanism = AWS_MSK_IAM
    
    # Binds SASL client implementation.
    
    sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
    
    # Encapsulates constructing a SigV4 signature based on extracted credentials.
    
    # The SASL client bound by "sasl.jaas.config" invokes this class.
    
    sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

  3. Validate the list of topics created in the MSK cluster:
    $ cd /home/ec2-user/kafka/
    
    $ bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVERS --command-config /home/ec2-user/kafka/config/client.properties

  1. Validate that data has been loaded to the topics in the MSK cluster:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-seat --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Synchronize data using a query to Amazon RDS and write to Amazon MSK

To synchronize the results of a query that flattens data by joining multiple tables in Amazon RDS for MySQL, create an Amazon MSK Connect managed connector with the following steps:

  1. On Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the pluginmsk-confluent-jdbc-plugin-v1.
  4. For Connector name, enter msk-jdbc-connector-rds-to-msk-query.
  5. Enter an optional description.
  6. For Cluster type, select MSK cluster.
  7. For MSK clusters, select the cluster you created earlier.
  8. For Authentication, choose IAM.
  9. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-query-topic
    
    mode=bulk
    
    connection.attempts=1
    
    query=select last_name, name as team_name, sport_type_name, sport_league_short_name, sport_division_short_name from dms_sample.sport_team join dms_sample.player on player.sport_team_id = sport_team.id;
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

  10. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  11. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  12. For Worker configuration, select Use the MSK default configuration.
  13. Under Access permissions, choose the custom IAM role role_msk_connect_serivce_exec_custom.
  14. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  15. For Log group, choose the log group created earlier.
  16. Choose Next.
  17. Under Review and Create, validate all the settings and choose Create connector.

Once the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

  1. For data validation, SSH into the EC2 client instance MSKEC2Client and run the following command to see the data in the topic:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-query-topic --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Clean up

To clean up your resources and avoid ongoing charges, complete the following the steps:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Select the connectors you created and choose Delete.
  3. On the Amazon S3 console, choose Buckets in the navigation pane.
  4. Search for the bucket with the naming convention bkt-msk-connect-plugins-<aws_account_id>.
  5. Delete all the folders and objects in this bucket.
  6. Delete the bucket after all contents have been removed.
  7. To delete all other resources created using the CloudFormation stack, delete the stack via the AWS CloudFormation console.

Conclusion

Amazon MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we saw how to set up the open-source JDBC connector from Confluent to stream data between an RDS for MySQL instance and an MSK cluster. We also explored different options to synchronize all the tables as well as use the query-based approach to stream denormalized data into the MSK topics.

For more information about Amazon MSK Connect, see Getting started using MSK Connect.


About the Authors

Manish Virwani is a Sr. Solutions Architect at AWS. He has more than a decade of experience designing and implementing large-scale big data and analytics solutions. He provides technical guidance, design advice, and thought leadership to some of the key AWS customers and partners.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

How Zoom implemented streaming log ingestion and efficient GDPR deletes using Apache Hudi on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/how-zoom-implemented-streaming-log-ingestion-and-efficient-gdpr-deletes-using-apache-hudi-on-amazon-emr/

In today’s digital age, logging is a critical aspect of application development and management, but efficiently managing logs while complying with data protection regulations can be a significant challenge. Zoom, in collaboration with the AWS Data Lab team, developed an innovative architecture to overcome these challenges and streamline their logging and record deletion processes. In this post, we explore the architecture and the benefits it provides for Zoom and its users.

Application log challenges: Data management and compliance

Application logs are an essential component of any application; they provide valuable information about the usage and performance of the system. These logs are used for a variety of purposes, such as debugging, auditing, performance monitoring, business intelligence, system maintenance, and security. However, although these application logs are necessary for maintaining and improving the application, they also pose an interesting challenge. These application logs may contain personally identifiable data, such as user names, email addresses, IP addresses, and browsing history, which creates a data privacy concern.

Laws such as the General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA) require organizations to retain application logs for a specific period of time. The exact length of time required for data storage varies depending on the specific regulation and the type of data being stored. The reason for these data retention periods is to ensure that companies aren’t keeping personal data longer than necessary, which could increase the risk of data breaches and other security incidents. This also helps ensure that companies aren’t using personal data for purposes other than those for which it was collected, which could be a violation of privacy laws. These laws also give individuals the right to request the deletion of their personal data, also known as the “right to be forgotten.” Individuals have the right to have their personal data erased, without undue delay.

So, on one hand, organizations need to collect application log data to ensure the proper functioning of their services, and keep the data for a specific period of time. But on the other hand, they may receive requests from individuals to delete their personal data from the logs. This creates a balancing act for organizations because they must comply with both data retention and data deletion requirements.

This issue becomes increasingly challenging for larger organizations that operate in multiple countries and states, because each country and state may have their own rules and regulations regarding data retention and deletion. For example, the Personal Information Protection and Electronic Documents Act (PIPEDA) in Canada and the Australian Privacy Act in Australia are similar laws to GDPR, but they may have different retention periods or different exceptions. Therefore, organizations big or small must navigate this complex landscape of data retention and deletion requirements, while also ensuring that they are in compliance with all applicable laws and regulations.

Zoom’s initial architecture

During the COVID-19 pandemic, the use of Zoom skyrocketed as more and more people were asked to work and attend classes from home. The company had to rapidly scale its services to accommodate the surge and worked with AWS to deploy capacity across most Regions globally. With a sudden increase in the large number of application endpoints, they had to rapidly evolve their log analytics architecture and worked with the AWS Data Lab team to quickly prototype and deploy an architecture for their compliance use case.

At Zoom, the data ingestion throughput and performance needs are very stringent. Data had to be ingested from several thousand application endpoints that produced over 30 million messages every minute, resulting in over 100 TB of log data per day. The existing ingestion pipeline consisted of writing the data to Apache Hadoop HDFS storage through Apache Kafka first and then running daily jobs to move the data to persistent storage. This took several hours while also slowing the ingestion and creating the potential for data loss. Scaling the architecture was also an issue because HDFS data would have to be moved around whenever nodes were added or removed. Furthermore, transactional semantics on billions of records were necessary to help meet compliance-related data delete requests, and the existing architecture of daily batch jobs was operationally inefficient.

It was at this time, through conversations with the AWS account team, that the AWS Data Lab team got involved to assist in building a solution for Zoom’s hyper-scale.

Solution overview

The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data, analytics, artificial intelligence (AI), machine learning (ML), serverless, and container modernization initiatives. The Data Lab has three offerings: the Build Lab, the Design Lab, and Resident Architect. During the Build and Design Labs, AWS Data Lab Solutions Architects and AWS experts supported Zoom specifically by providing prescriptive architectural guidance, sharing best practices, building a working prototype, and removing technical roadblocks to help meet their production needs.

Zoom and the AWS team (collectively referred to as “the team” going forward) identified two major workflows for data ingestion and deletion.

Data ingestion workflow

The following diagram illustrates the data ingestion workflow.

Data Ingestion Workflow

The team needed to quickly populate millions of Kafka messages in the dev/test environment to achieve this. To expedite the process, we (the team) opted to use Amazon Managed Streaming for Apache Kafka (Amazon MSK), which makes it simple to ingest and process streaming data in real time, and we were up and running in under a day.

To generate test data that resembled production data, the AWS Data Lab team created a custom Python script that evenly populated over 1.2 billion messages across several Kafka partitions. To match the production setup in the development account, we had to increase the cloud quota limit via a support ticket.

We used Amazon MSK and the Spark Structured Streaming capability in Amazon EMR to ingest and process the incoming Kafka messages with high throughput and low latency. Specifically, we inserted the data from the source into EMR clusters at a maximum incoming rate of 150 million Kafka messages every 5 minutes, with each Kafka message holding 7–25 log data records.

To store the data, we chose to use Apache Hudi as the table format. We opted for Hudi because it’s an open-source data management framework that provides record-level insert, update, and delete capabilities on top of an immutable storage layer like Amazon Simple Storage Service (Amazon S3). Additionally, Hudi is optimized for handling large datasets and works well with Spark Structured Streaming, which was already being used at Zoom.

After 150 million messages were buffered, we processed the messages using Spark Structured Streaming on Amazon EMR and wrote the data into Amazon S3 in Apache Hudi-compatible format every 5 minutes. We first flattened the message array, creating a single record from the nested array of messages. Then we added a unique key, known as the Hudi record key, to each message. This key allows Hudi to perform record-level insert, update, and delete operations on the data. We also extracted the field values, including the Hudi partition keys, from incoming messages.

This architecture allowed end-users to query the data stored in Amazon S3 using Amazon Athena with the AWS Glue Data Catalog or using Apache Hive and Presto.

Data deletion workflow

The following diagram illustrates the data deletion workflow.

Data Deletion Workflow

Our architecture allowed for efficient data deletions. To help comply with the customer-initiated data retention policy for GDPR deletes, scheduled jobs ran daily to identify the data to be deleted in batch mode.

We then spun up a transient EMR cluster to run the GDPR upsert job to delete the records. The data was stored in Amazon S3 in Hudi format, and Hudi’s built-in index allowed us to efficiently delete records using bloom filters and file ranges. Because only those files that contained the record keys needed to be read and rewritten, it only took about 1–2 minutes to delete 1,000 records out of the 1 billion records, which had previously taken hours to complete as entire partitions were read.

Overall, our solution enabled efficient deletion of data, which provided an additional layer of data security that was critical for Zoom, in light of its GDPR requirements.

Architecting to optimize scale, performance, and cost

In this section, we share the following strategies Zoom took to optimize scale, performance, and cost:

  • Optimizing ingestion
  • Optimizing throughput and Amazon EMR utilization
  • Decoupling ingestion and GDPR deletion using EMRFS
  • Efficient deletes with Apache Hudi
  • Optimizing for low-latency reads with Apache Hudi
  • Monitoring

Optimizing ingestion

To keep the storage in Kafka lean and optimal, as well as to get a real-time view of data, we created a Spark job to read incoming Kafka messages in batches of 150 million messages and wrote to Amazon S3 in Hudi-compatible format every 5 minutes. Even during the initial stages of the iteration, when we hadn’t started scaling and tuning yet, we were able to successfully load all Kafka messages consistently under 2.5 minutes using the Amazon EMR runtime for Apache Spark.

Optimizing throughput and Amazon EMR utilization

We launched a cost-optimized EMR cluster and switched from uniform instance groups to using EMR instance fleets. We chose instance fleets because we needed the flexibility to use Spot Instances for task nodes and wanted to diversify the risk of running out of capacity for a specific instance type in our Availability Zone.

We started experimenting with test runs by first changing the number of Kafka partitions from 400 to 1,000, and then changing the number of task nodes and instance types. Based on the results of the run, the AWS team came up with the recommendation to use Amazon EMR with three core nodes (r5.16xlarge (64 vCPUs each)) and 18 task nodes using Spot fleet instances (a combination of r5.16xlarge (64 vCPUs), r5.12xlarge (48 vCPUs), r5.8xlarge (32 vCPUs)). These recommendations helped Zoom to reduce their Amazon EMR costs by more than 80% while meeting their desired performance goals of ingesting 150 million Kafka messages under 5 minutes.

Decoupling ingestion and GDPR deletion using EMRFS

A well-known benefit of separation of storage and compute is that you can scale the two independently. But a not-so-obvious advantage is that you can decouple continuous workloads from sporadic workloads. Previously data was stored in HDFS. Resource-intensive GDPR delete jobs and data movement jobs would compete for resources with the stream ingestion, causing a backlog of more than 5 hours in upstream Kafka clusters, which was close to filling up the Kafka storage (which only had 6 hours of data retention) and potentially causing data loss. Offloading data from HDFS to Amazon S3 allowed us the freedom to launch independent transient EMR clusters on demand to perform data deletion, helping to ensure that the ongoing data ingestion from Kafka into Amazon EMR is not starved for resources. This enabled the system to ingest data every 5 minutes and complete each Spark Streaming read in 2–3 minutes. Another side effect of using EMRFS is a cost-optimized cluster, because we removed reliance on Amazon Elastic Block Store (Amazon EBS) volumes for over 300 TB storage that was used for three copies (including two replicas) of HDFS data. We now pay for only one copy of the data in Amazon S3, which provides 11 9s of durability and is relatively inexpensive storage.

Efficient deletes with Apache Hudi

What about the conflict between ingest writes and GDPR deletes when running concurrently? This is where the power of Apache Hudi stands out.

Apache Hudi provides a table format for data lakes with transactional semantics that enables the separation of ingestion workloads and updates when run concurrently. The system was able to consistently delete 1,000 records in less than a minute. There were some limitations in concurrent writes in Apache Hudi 0.7.0, but the Amazon EMR team quickly addressed this by back-porting Apache Hudi 0.8.0, which supports optimistic concurrency control, to the current (at the time of the AWS Data Lab collaboration) Amazon EMR 6.4 release. This saved time in testing and allowed for a quick transition to the new version with minimal testing. This enabled us to query the data directly using Athena quickly without having to spin up a cluster to run ad hoc queries, as well as to query the data using Presto, Trino, and Hive. The decoupling of the storage and compute layers provided the flexibility to not only query data across different EMR clusters, but also delete data using a completely independent transient cluster.

Optimizing for low-latency reads with Apache Hudi

To optimize for low-latency reads with Apache Hudi, we needed to address the issue of too many small files being created within Amazon S3 due to the continuous streaming of data into the data lake.

We utilized Apache Hudi’s features to tune file sizes for optimal querying. Specifically, we reduced the degree of parallelism in Hudi from the default value of 1,500 to a lower number. Parallelism refers to the number of threads used to write data to Hudi; by reducing it, we were able to create larger files that were more optimal for querying.

Because we needed to optimize for high-volume streaming ingestion, we chose to implement the merge on read table type (instead of copy on write) for our workload. This table type allowed us to quickly ingest the incoming data into delta files in row format (Avro) and asynchronously compact the delta files into columnar Parquet files for fast reads. To do this, we ran the Hudi compaction job in the background. Compaction is the process of merging row-based delta files to produce new versions of columnar files. Because the compaction job would use additional compute resources, we adjusted the degree of parallelism for insertion to a lower value of 1,000 to account for the additional resource usage. This adjustment allowed us to create larger files without sacrificing performance throughput.

Overall, our approach to optimizing for low-latency reads with Apache Hudi allowed us to better manage file sizes and improve the overall performance of our data lake.

Monitoring

The team monitored MSK clusters with Prometheus (an open-source monitoring tool). Additionally, we showcased how to monitor Spark streaming jobs using Amazon CloudWatch metrics. For more information, refer to Monitor Spark streaming applications on Amazon EMR.

Outcomes

The collaboration between Zoom and the AWS Data Lab demonstrated significant improvements in data ingestion, processing, storage, and deletion using an architecture with Amazon EMR and Apache Hudi. One key benefit of the architecture was a reduction in infrastructure costs, which was achieved through the use of cloud-native technologies and the efficient management of data storage. Another benefit was an improvement in data management capabilities.

We showed that the costs of EMR clusters can be reduced by about 82% while bringing the storage costs down by about 90% compared to the prior HDFS-based architecture. All of this while making the data available in the data lake within 5 minutes of ingestion from the source. We also demonstrated that data deletions from a data lake containing multiple petabytes of data can be performed much more efficiently. With our optimized approach, we were able to delete approximately 1,000 records in just 1–2 minutes, as compared to the previously required 3 hours or more.

Conclusion

In conclusion, the log analytics process, which involves collecting, processing, storing, analyzing, and deleting log data from various sources such as servers, applications, and devices, is critical to aid organizations in working to meet their service resiliency, security, performance monitoring, troubleshooting, and compliance needs, such as GDPR.

This post shared what Zoom and the AWS Data Lab team have accomplished together to solve critical data pipeline challenges, and Zoom has extended the solution further to optimize extract, transform, and load (ETL) jobs and resource efficiency. However, you can also use the architecture patterns presented here to quickly build cost-effective and scalable solutions for other use cases. Please reach out to your AWS team for more information or contact Sales.


About the Authors

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects focused on underprivileged Children’s education.

Chandra DhandapaniChandra Dhandapani is a Senior Solutions Architect at AWS, where he specializes in creating solutions for customers in Analytics, AI/ML, and Databases. He has a lot of experience in building and scaling applications across different industries including Healthcare and Fintech. Outside of work, he is an avid traveler and enjoys sports, reading, and entertainment.

Amit Kumar Agrawal is a Senior Solutions Architect at AWS, based out of San Francisco Bay Area. He works with large strategic ISV customers to architect cloud solutions that address their business challenges. During his free time he enjoys exploring the outdoors with his family.

Viral Shah is a Analytics Sales Specialist working with AWS for 5 years helping customers to be successful in their data journey. He has over 20+ years of experience working with enterprise customers and startups, primarily in the data and database space. He loves to travel and spend quality time with his family.

How SOCAR handles large IoT data with Amazon MSK and Amazon ElastiCache for Redis

Post Syndicated from Younggu Yun original https://aws.amazon.com/blogs/big-data/how-socar-handles-large-iot-data-with-amazon-msk-and-amazon-elasticache-for-redis/

This is a guest blog post co-written with SangSu Park and JaeHong Ahn from SOCAR. 

As companies continue to expand their digital footprint, the importance of real-time data processing and analysis cannot be overstated. The ability to quickly measure and draw insights from data is critical in today’s business landscape, where rapid decision-making is key. With this capability, businesses can stay ahead of the curve and develop new initiatives that drive success.

This post is a continuation of How SOCAR built a streaming data pipeline to process IoT data for real-time analytics and control. In this post, we provide a detailed overview of streaming messages with Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon ElastiCache for Redis, covering technical aspects and design considerations that are essential for achieving optimal results.

SOCAR is the leading Korean mobility company with strong competitiveness in car-sharing. SOCAR wanted to design and build a solution for a new Fleet Management System (FMS). This system involves the collection, processing, storage, and analysis of Internet of Things (IoT) streaming data from various vehicle devices, as well as historical operational data such as location, speed, fuel level, and component status.

This post demonstrates a solution for SOCAR’s production application that allows them to load streaming data from Amazon MSK into ElastiCache for Redis, optimizing the speed and efficiency of their data processing pipeline. We also discuss the key features, considerations, and design of the solution.

Background

SOCAR operates about 20,000 cars and is planning to include other large vehicle types such as commercial vehicles and courier trucks. SOCAR has deployed in-car devices that capture data using AWS IoT Core. This data was then stored in Amazon Relational Database Service (Amazon RDS). The challenge with this approach included inefficient performance and high resource usage. Therefore, SOCAR looked for purpose-built databases tailored to the needs of their application and usage patterns while meeting the future requirements of SOCAR’s business and technical requirements. The key requirements for SOCAR included achieving maximum performance for real-time data analytics, which required storing data in an in-memory data store.

After careful consideration, ElastiCache for Redis was selected as the optimal solution due to its ability to handle complex data aggregation rules with ease. One of the challenges faced was loading data from Amazon MSK into the database, because there was no built-in Kafka connector and consumer available for this task. This post focuses on the development of a Kafka consumer application that was designed to tackle this challenge by enabling performant data loading from Amazon MSK to Redis.

Solution overview

Extracting valuable insights from streaming data can be a challenge for businesses with diverse use cases and workloads. That’s why SOCAR built a solution to seamlessly bring data from Amazon MSK into multiple purpose-built databases, while also empowering users to transform data as needed. With fully managed Apache Kafka, Amazon MSK provides a reliable and efficient platform for ingesting and processing real-time data.

The following figure shows an example of the data flow at SOCAR.

solution overview

This architecture consists of three components:

  • Streaming data – Amazon MSK serves as a scalable and reliable platform for streaming data, capable of receiving and storing messages from a variety of sources, including AWS IoT Core, with messages organized into multiple topics and partitions
  • Consumer application – With a consumer application, users can seamlessly bring data from Amazon MSK into a target database or data storage while also defining data transformation rules as needed
  • Target databases – With the consumer application, the SOCAR team was able to load data from Amazon MSK into two separate databases, each serving a specific workload

Although this post focuses on a specific use case with ElastiCache for Redis as the target database and a single topic called gps, the consumer application we describe can handle additional topics and messages, as well as different streaming sources and target databases such as Amazon DynamoDB. Our post covers the most important aspects of the consumer application, including its features and components, design considerations, and a detailed guide to the code implementation.

Components of the consumer application

The consumer application comprises three main parts that work together to consume, transform, and load messages from Amazon MSK into a target database. The following diagram shows an example of data transformations in the handler component.

consumer-application

The details of each component are as follows:

  • Consumer – This consumes messages from Amazon MSK and then forwards the messages to a downstream handler.
  • Loader – This is where users specify a target database. For example, SOCAR’s target databases include ElastiCache for Redis and DynamoDB.
  • Handler – This is where users can apply data transformation rules to the incoming messages before loading them into a target database.

Features of the consumer application

This connection has three features:

  • Scalability – This solution is designed to be scalable, ensuring that the consumer application can handle an increasing volume of data and accommodate additional applications in the future. For instance, SOCAR sought to develop a solution capable of handling not only the current data from approximately 20,000 vehicles but also a larger volume of messages as the business and data continue to grow rapidly.
  • Performance – With this consumer application, users can achieve consistent performance, even as the volume of source messages and target databases increases. The application supports multithreading, allowing for concurrent data processing, and can handle unexpected spikes in data volume by easily increasing compute resources.
  • Flexibility – This consumer application can be reused for any new topics without having to build the entire consumer application again. The consumer application can be used to ingest new messages with different configuration values in the handler. SOCAR deployed multiple handlers to ingest many different messages. Also, this consumer application allows users to add additional target locations. For example, SOCAR initially developed a solution for ElastiCache for Redis and then replicated the consumer application for DynamoDB.

Design considerations of the consumer application

Note the following design considerations for the consumer application:

  • Scale out – A key design principle of this solution is scalability. To achieve this, the consumer application runs with Amazon Elastic Kubernetes Service (Amazon EKS) because it can allow users to increase and replicate consumer applications easily.
  • Consumption patterns – To receive, store, and consume data efficiently, it’s important to design Kafka topics depending on messages and consumption patterns. Depending on messages consumed at the end, messages can be received into multiple topics of different schemas. For example, SOCAR has many different topics that are consumed by different workloads.
  • Purpose-built database – The consumer application supports loading data into multiple target options based on the specific use case. For example, SOCAR stored real-time IoT data in ElastiCache for Redis to power real-time dashboard and web applications, while storing recent trip information in DynamoDB that didn’t require real-time processing.

Walkthrough overview

The producer of this solution is AWS IoT Core, which sends out messages into a topic called gps. The target database of this solution is ElastiCache for Redis. ElastiCache for Redis a fast in-memory data store that provides sub-millisecond latency to power internet-scale, real-time applications. Built on open-source Redis and compatible with the Redis APIs, ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with the manageability, security, and scalability from Amazon to power the most demanding real-time applications.

The target location can be either another database or storage depending on the use case and workload. SOCAR uses Amazon EKS to operate the containerized solution to achieve scalability, performance, and flexibility. Amazon EKS is a managed Kubernetes service to run Kubernetes in the AWS Cloud. Amazon EKS automatically manages the availability and scalability of the Kubernetes control plane nodes responsible for scheduling containers, managing application availability, storing cluster data, and other key tasks.

For the programming language, the SOCAR team decided to use the Go Programming language, utilizing both the AWS SDK for Go and a Goroutine, a lightweight logical or virtual thread managed by the Go runtime, which makes it easy to manage multiple threads. The AWS SDK for Go simplifies the use of AWS services by providing a set of libraries that are consistent and familiar for Go developers.

In the following sections, we walk through the steps to implement the solution:

  1. Create a consumer.
  2. Create a loader.
  3. Create a handler.
  4. Build a consumer application with the consumer, loader, and handler.
  5. Deploy the consumer application.

Prerequisites

For this walkthrough, you should have the following:

Create a consumer

In this example, we use a topic called gps, and the consumer includes a Kafka client that receives messages from the topic. SOCAR created a struct and built a consumer (called NewConsumer in the code) to make it extendable. With this approach, any additional parameters and rules can be added easily.

To authenticate with Amazon MSK, SOCAR uses IAM. Because SOCAR already uses IAM to authenticate other resources, such as Amazon EKS, it uses the same IAM role (aws_msk_iam_v2) to authenticate clients for both Amazon MSK and Apache Kafka actions.

The following code creates the consumer:

type Consumer struct {
	logger      *zerolog.Logger
	kafkaReader *kafka.Reader
}

func NewConsumer(logger *zerolog.Logger, awsCfg aws.Config, brokers []string, consumerGroupID, topic string) *Consumer {
	return &Consumer{
		logger: logger,
		kafkaReader: kafka.NewReader(kafka.ReaderConfig{
			Dialer: &kafka.Dialer{
				TLS:           &tls.Config{MinVersion: tls.VersionTLS12},
				Timeout:       10 * time.Second,
				DualStack:     true,
				SASLMechanism: aws_msk_iam_v2.NewMechanism(awsCfg),
			},
			Brokers:     brokers, //
			GroupID:     consumerGroupID, //
			Topic:       topic, //
			StartOffset: kafka.LastOffset, //
		}),
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.message, error) {
	return consumer.kafkaReader.Readmessage(ctx)
}

Create a loader

The loader function, represented by the Loader struct, is responsible for loading messages to the target location, which in this case is ElastiCache for Redis. The NewLoader function initializes a new instance of the Loader struct with a logger and a Redis cluster client, which is used to communicate with the ElastiCache cluster. The redis.NewClusterClient object is initialized using the NewRedisClient function, which uses IAM to authenticate the client for Redis actions. This ensures secure and authorized access to the ElastiCache cluster. The Loader struct also contains the Close method to close the Kafka reader and free up resources.

The following code creates a loader:

type Loader struct {
	logger      *zerolog.Logger
	redisClient *redis.ClusterClient
}

func NewLoader(logger *zerolog.Logger, redisClient *redis.ClusterClient) *Loader {
	return &Loader{
		logger:      logger,
		redisClient: redisClient,
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.Message, error) {
	return consumer.kafkaReader.ReadMessage(ctx)
}

func NewRedisClient(ctx context.Context, awsCfg aws.Config, addrs []string, replicationGroupID, username string) (*redis.ClusterClient, error) {
	redisClient := redis.NewClusterClient(&redis.ClusterOptions{
		NewClient: func(opt *redis.Options) *redis.Client {
			return redis.NewClient(&redis.Options{
				Addr: opt.Addr,
				CredentialsProvider: func() (username string, password string) {
					token, err := BuildRedisIAMAuthToken(ctx, awsCfg, replicationGroupID, opt.Username)
					if err != nil {
						panic(err)
					}
					return opt.Username, token
				},
				PoolSize:    opt.PoolSize,
				PoolTimeout: opt.PoolTimeout,
				TLSConfig:   &tls.Config{InsecureSkipVerify: true},
			})
		},
		Addrs:       addrs,
		Username:    username,
		PoolSize:    100,
		PoolTimeout: 1 * time.Minute,
	})
	pong, err := redisClient.Ping(ctx).Result()
	if err != nil {
		return nil, err
	}
	if pong != "PONG" {
		return nil, fmt.Errorf("failed to verify connection to redis server")
	}
	return redisClient, nil
}

Create a handler

A handler is used to include business rules and data transformation logic that prepares data before loading it into the target location. It acts as a bridge between a consumer and a loader. In this example, the topic name is cars.gps.json, and the message includes two keys, lng and lat, with data type Float64. The business logic can be defined in a function like handlerFuncGpsToRedis and then applied as follows:

type (
	handlerFunc    func(ctx context.Context, loader *Loader, key, value []byte) error
	handlerFuncMap map[string]handlerFunc
)

var HandlerRedis = handlerFuncMap{
	"cars.gps.json":   handlerFuncGpsToRedis
}

func GetHandlerFunc(funcMap handlerFuncMap, topic string) (handlerFunc, error) {
	handlerFunc, exist := funcMap[topic]
	if !exist {
		return nil, fmt.Errorf("failed to find handler func for '%s'", topic)
	}
	return handlerFunc, nil
}

func handlerFuncGpsToRedis(ctx context.Context, loader *Loader, key, value []byte) error {
	// unmarshal raw data to map
	data := map[string]interface{}{}
	err := json.Unmarshal(value, &data)
	if err != nil {
		return err
	}

	// prepare things to load on redis as geolocation
	name := string(key)
	lng, err := getFloat64ValueFromMap(data, "lng")
	if err != nil {
		return err
	}
	lat, err := getFloat64ValueFromMap(data, "lat")
	if err != nil {
		return err
	}

	// add geolocation to redis
	return loader.RedisGeoAdd(ctx, "cars#gps", name, lng, lat)
}

Build a consumer application with the consumer, loader, and handler

Now you have created the consumer, loader, and handler. The next step is to build a consumer application using them. In a consumer application, you read messages from your stream with a consumer, transform them using a handler, and then load transformed messages into a target location with a loader. These three components are parameterized in a consumer application function such as the one shown in the following code:

type Connector struct {
	ctx    context.Context
	logger *zerolog.Logger

	consumer *Consumer
	handler  handlerFuncMap
	loader   *Loader
}

func NewConnector(ctx context.Context, logger *zerolog.Logger, consumer *Consumer, handler handlerFuncMap, loader *Loader) *Connector {
	return &Connector{
		ctx:    ctx,
		logger: logger,

		consumer: consumer,
		handler:  handler,
		loader:   loader,
	}
}

func (connector *Connector) Close() error {
	var err error = nil
	if connector.consumer != nil {
		err = connector.consumer.Close()
	}
	if connector.loader != nil {
		err = connector.loader.Close()
	}
	return err
}

func (connector *Connector) Run() error {
	wg := sync.WaitGroup{}
	defer wg.Wait()
	handlerFunc, err := GetHandlerFunc(connector.handler, connector.consumer.kafkaReader.Config().Topic)
	if err != nil {
		return err
	}
	for {
		msg, err := connector.consumer.Consume(connector.ctx)
		if err != nil {
			if errors.Is(context.Canceled, err) {
				break
			}
		}

		wg.Add(1)
		go func(key, value []byte) {
			defer wg.Done()
			err = handlerFunc(connector.ctx, connector.loader, key, value)
			if err != nil {
				connector.logger.Err(err).Msg("")
			}
		}(msg.Key, msg.Value)
	}
	return nil
}

Deploy the consumer application

To achieve maximum parallelism, SOCAR containerizes the consumer application and deploys it into multiple pods on Amazon EKS. Each consumer application contains a unique consumer, loader, and handler. For example, if you need to receive messages from a single topic with five partitions, you can deploy five identical consumer applications, each running in its own pod. Similarly, if you have two topics with three partitions each, you should deploy two consumer applications, resulting in a total of six pods. It’s a best practice to run one consumer application per topic, and the number of pods should match the number of partitions to enable concurrent message processing. The pod number can be specified in the Kubernetes deployment configuration

There are two stages in the Dockerfile. The first stage is the builder, which installs build tools and dependencies, and builds the application. The second stage is the runner, which uses a smaller base image (Alpine) and copies only the necessary files from the builder stage. It also sets the appropriate user permissions and runs the application. It’s also worth noting that the builder stage uses a specific version of the Golang image, while the runner stage uses a specific version of the Alpine image, both of which are considered to be lightweight and secure images.

The following code is an example of the Dockerfile:

# builder
FROM golang:1.18.2-alpine3.16 AS builder
RUN apk add build-base
WORKDIR /usr/src/app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o connector .

# runner
FROM alpine:3.16.0 AS runner
WORKDIR /usr/bin/app
RUN apk add --no-cache tzdata
RUN addgroup --system app && adduser --system --shell /bin/false --ingroup app app
COPY --from=builder /usr/src/app/connector .
RUN chown -R app:app /usr/bin/app
USER app
ENTRYPOINT ["/usr/bin/app/connector"]

Conclusion

In this post, we discussed SOCAR’s approach to building a consumer application that enables IoT real-time streaming from Amazon MSK to target locations such as ElastiCache for Redis. We hope you found this post informative and useful. Thank you for reading!


About the Authors

SangSu Park is the Head of Operation Group at SOCAR. His passion is to keep learning, embrace challenges, and strive for mutual growth through communication. He loves to travel in search of new cities and places.

jaehongJaeHong Ahn is a DevOps Engineer in SOCAR’s cloud infrastructure team. He is dedicated to promoting collaboration between developers and operators. He enjoys creating DevOps tools and is committed to using his coding abilities to help build a better world. He loves to cook delicious meals as a private chef for his wife.

bdb-2857-youngguYounggu Yun works at AWS Data Lab in Korea. His role involves helping customers across the APAC region meet their business objectives and overcome technical challenges by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

Connect Kafka client applications securely to your Amazon MSK cluster from different VPCs and AWS accounts

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/connect-kafka-client-applications-securely-to-your-amazon-msk-cluster-from-different-vpcs-and-aws-accounts/

You can now use Amazon Managed Streaming for Apache Kafka (Amazon MSK) multi-VPC private connectivity (powered by AWS PrivateLink) and cluster policy support for MSK clusters to simplify connectivity of your Kafka clients to your brokers. Amazon MSK is a fully managed service that makes it easy for you to build and run applications that use Kafka to process streaming data. When you create an MSK cluster, the cluster resources are available to clients within the same Amazon VPC. This allows you to launch the cluster within specific subnets of the VPC, associate it with security groups, and attach IP addresses from your VPC’s address space through elastic network interfaces (ENIs). Network traffic between clients and the cluster stays within the AWS network, with internet access to the cluster not possible by default.

If you have workloads segmented across several VPCs and AWS accounts, there may be scenarios in which you need to make your MSK brokers accessible to Kafka clients across VPCs. With the launch of Amazon MSK multi-VPC private connectivity, you can now privately access your MSK brokers from your client applications in another VPC within the same AWS account or another AWS account without enabling public access or creating and managing your own networking infrastructure for private connectivity. A cluster policy is an AWS Identity and Access Management (IAM) resource-based policy, which is defined for your MSK cluster to provide cross-account IAM principals permissions to set up private connectivity to the cluster.

This post introduces Amazon MSK multi-VPC connectivity and how you can privately access your MSK clusters from your clients in other VPCs. It also shows how to define a cluster policy for your MSK clusters. These new two capabilities simplify configuring cross-VPC network access and setting up permissions needed for Kafka clients to privately connect to MSK brokers in a different account.

Before Amazon MSK multi-VPC connectivity

Before Amazon MSK multi-VPC connectivity, the network admin needed to choose one of the following secure connectivity patterns. Admins had to repeat certain steps for each broker in the cluster.

  • Amazon VPC peering is the simplest networking construct that enables bidirectional connectivity between two VPCs. In this approach, the network admin had to update each VPC with the IP addresses of each broker in the routing tables of all subnets. You can’t use this connectivity pattern when there are overlapping IPv4 or IPv6 CIDR blocks in the VPCs.
  • AWS Transit Gateway provides a highly available and scalable design for connecting VPCs. In this approach, the network admin constantly had to update the routing tables attached to each transit gateway. Unlike VPC peering that can go cross-Region, AWS Transit Gateway is a regional service, but you can use inter-Region peering between transit gateways to route traffic across regions. AWS Transit Gateway has the maximum bandwidth (burst) per Availability Zone per VPC connection (50 Gbps). This could become a challenge for some workloads.
  • AWS PrivateLink is an AWS networking service that provides private access to a specific service instead of all resources within a VPC and without traversing the public internet. It also eliminates the need to expose the entire VPC or subnet, and prevents issues like having to deal with overlapping CIDR blocks between the VPC that hosts the MSK cluster ENIs and the Kafka client VPC. AWS PrivateLink can scale to an unlimited number of VPCs and unlike the other options, traffic here is unidirectional. Because of these benefits, AWS PrivateLink is a popular choice to manage private connectivity. However, this connectivity pattern comes with additional complexity. It requires creating multiple Network Load Balancers (NLBs) per cluster and creating private service endpoints per NLB in the service account. Furthermore, admins had to create private endpoints per private service endpoint, and an Amazon Route 53 alias record per private endpoint in every client account.

The following diagram illustrates the architecture of customer-managed VPC endpoints between different VPCs in different AWS accounts with IAM authentication.

Before multi-vpc connectivity

After Amazon MSK multi-VPC connectivity and cluster policy

You can now enable multi-VPC and cross-account connectivity for your MSK clusters in a few simple steps and pay for what you use. This eliminates the overhead of creating and managing AWS PrivateLink infrastructure. When new brokers are added to a cluster, private connectivity is maintained without the need to make configuration changes, saving you from the overhead and complexity of managing the underlying network infrastructure.

The following diagram illustrates this updated architecture of using Amazon MSK multi-VPC connectivity to connect a client from a different AWS account.

after multi-vpc connectivity

Solution overview

Establishing multi-VPC private connectivity involves turning on this feature for the cluster and configuring the Kafka clients to connect privately to the cluster.

The following are the high-level steps to configure the cluster:

  1. Enable the multi-VPC private connectivity feature for a subset of authentication schemes that are enabled for your MSK cluster.
  2. If a Kafka client is in an AWS account that is different than the cluster, attach a resource-based policy to the MSK cluster to authorize IAM principals for creating cross-account connectivity.
  3. Share the cluster ARN with the IAM principal associated with the Kafka client that needs to create the cross-account access to MSK cluster.

The following are the high-level steps to configure the clients:

  1. Create a managed VPC endpoint for the client VPC that needs to connect privately to the MSK cluster.
  2. Update the VPC endpoint’s security group settings to enable outbound connectivity to the MSK cluster.
  3. Set up the client to use the cluster’s connection string to connect privately to the cluster.

Cluster setup

In this post, we only show the steps for enabling Amazon MSK multi-VPC connectivity for a provisioned cluster.

  1. To enable Amazon MSK multi-VPC connectivity on your existing cluster, choose Turn on multi-VPC connectivity on the Amazon MSK console.
    turn on multi-vpc connectivity
    Note that multi-VPC connectivity cannot be turned on with a cluster that allows unauthenticated access. This is to prevent unauthenticated access from different VPCs.
  2. Select the authentication methods that you allow clients in other VPCs to use.
    The list of authentication methods is populated based on your cluster’s security configuration.
  3. Review the settings and choose Turn on selection. After the multi-VPC connectivity is enabled on your cluster, Amazon MSK will create the NLB and VPC endpoint service infrastructure required for private connectivity. Amazon MSK will vend a new set of bootstrap broker strings that can be used for private connectivity. These can be accessed using the View client info option on the Amazon MSK console. The next step is to provide the IAM principals associated with your clients the permissions to connect privately to your cluster. To do this, you need to attach a cluster policy to the cluster. Turn on selection
  4. Choose Edit cluster policy in the Security section of the cluster details page on the Amazon MSK console.
    The new cluster policy allows for defining a Basic or Advanced cluster policy. With the Basic option, you can simply enter AWS account IDs of your client’s VPCs. This policy allows all allowed principals in those AWS accounts to perform CreateVPCConnection, GetBootstrapBrokers, DescribeCluster, and DescribeClusterV2 actions that are required for creating the cross-VPC connectivity to your cluster. However, in other cases, you may need a more complex policy that allows for additional actions, or principals other than AWS accounts, such as IAM roles, role sessions, IAM users, and more. You can author a cluster policy according to IAM JSON policy guidance and provide that to the cluster in Advanced mode.
  5. Define your cluster policy and choose Save changes.cluster policy

Client setup

On the client side, first you need to attach an identity policy to the IAM principal who wants to create a managed VPC connection. The identity policy must provide permission for creating a managed VPC connection. The necessary permissions are part of the AWS managed policy AmazonMSKFullAccess.

  1. In the other AWS account with the IAM principal you configured, use the new Managed VPC connection page on the Amazon MSK console to create Amazon MSK managed VPC connections.
    A managed VPC connection maps to an AWS PrivateLink endpoint under the hood, and Amazon MSK uses the managed VPC connection to orchestrate private connectivity to the cluster. You simply need to create the managed VPC connection and pay standard AWS PrivateLink charges for the underlying endpoint.Create a connection
  2. Enter the AWS Resource Name (ARN) of the cluster that you want to connect to.
  3. Choose Verify to verify the cluster information and its minimum requirements for cross-connectivity.
  4. Select an authentication method from the provided values.
  5. Choose the VPC ID where your Kafka clients are located, and choose their subnet IDs. You can add more subnets using the Add subnet option.
    The specified client subnet must have Availability Zone IDs that match the cluster’s Availability Zone IDs. This makes sure the clients are located in a same physical Availability Zone as the cluster brokers. Amazon MSK uses the port range 14001:14100 for all authentication methods. You need to select a security group that allows outbound traffic to this port. The following screenshot shows an example.
  6. Review the settings and choose Create connection.Review and create a connection
    The process will take a few minutes.
  7. When it’s complete, you can obtain the clients’ connection string from the details page of your connection.
  8. The next step is to update the outbound rules for the VPC endpoint security group to allow communication to the port range 14001:14100.client setup review

Use the Amazon MSK-managed VPC connection

After you create the managed VPC connection, connecting privately to the cluster is easy. Simply use the new connection string to connect to the cluster. For example, you may connect from an Amazon Elastic Compute Cloud (Amazon EC2) instance on your client VPC. Then run the following command to verify if you can connect and perform actions against the topics in the MSK cluster:

export MSK_VPC=<YOUR CLIENT CONNECTION STRING GOES HERE>
bin/kafka-topics.sh --bootstrap-server $MSK_VPC -command-config /home/ec2-user/kafka/config/client-config.properties –list

console results

IAM authentication

Before the launch of Amazon MSK multi-VPC connectivity, Kafka clients in other AWS accounts who opted in IAM authentication, needed to assume another IAM role in the cluster’s account. To facilitate this, admins had to create multiple IAM roles and write a trust policy that allows authenticated principals from the client’s accounts to assume corresponding roles through the sts:AssumeRole API call. This approach was challenging to scale when the number of VPCs or AWS accounts grew. With the launch of this cluster policy, cross-account access control is now simplified because you can attach a cluster policy to your clusters to specify which cross-account clients have what permissions on resources within the cluster.

This capability allows you to manage all access to the cluster and topics in one place. For example, you can control which IAM principals have write access to certain topics, and which principals can only read from them. Users who are using IAM client authentication can also add permissions for required kafka-cluster actions in the cluster resource policy.

Availability and pricing

You can now use Amazon MSK multi-VPC connectivity in all commercial Regions where Amazon MSK is offered, including China and GovCloud (US) Regions.

You pay $0.006 per GB data processed for private connectivity and $0.0225 per private connectivity hour per authentication scheme in US East (Ohio). Refer to our Pricing page for more details.

Conclusion

With Amazon MSK multi-VPC private connectivity, you can now privately access your MSK brokers from your client applications in another VPC within the same AWS account or another AWS account, with minimal configuration. You no longer have to create, manage, and update multiple networking resources in multiple VPCs, or make Amazon MSK configuration changes to connect your Kafka clients across VPCs and accounts. Amazon MSK creates and manages the resources for you. With Cluster policy support, you can easily provide your cross-account client principals permissions to connect privately to your MSK cluster. Further, if you are using IAM client authentication, you can also leverage the cluster policy to centrally control clients’ permissions to perform operations on the cluster. Use the Amazon MSK multi-VPC connectivity and the cluster policy feature today to simplify your secure connectivity infrastructure.

For further reading on Amazon MSK, visit the official product page and our AWS Documentation.


About the authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

Connect to Amazon MSK Serverless from your on-premises network

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/connect-to-amazon-msk-serverless-from-your-on-premises-network/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available, and secure Apache Kafka service. Amazon MSK reduces the work needed to set up, scale, and manage Apache Kafka in production. With Amazon MSK, you can create a cluster in minutes and start sending data.

With Amazon MSK Serverless, you can run Apache Kafka without having to manage the underlying infrastructure. Amazon MSK will automatically provision, scale, and manage your Apache Kafka clusters, so you can focus on your applications without worrying about the operational overhead. Additionally, MSK Serverless offers fine-grained, pay-as-you-go pricing, making it a cost-effective option for organizations with unpredictable workloads.

Connecting to MSK Serverless is easy. You can set up a serverless cluster using the API or AWS Management Console in minutes. MSK Serverless provides bootstrap information as a private DNS endpoint, allowing clients to connect to the serverless Apache Kafka cluster. A common use case of using MSK Serverless is an on-premises client that needs to process real-time data streams. However, the private DNS endpoint is only accessible from virtual private clouds (VPCs) that have been configured to connect and isn’t directly resolvable from an on-premises network. This can pose a challenge for on-premises clients to discover and connect to the MSK Serverless cluster.
In this post, we guide you through a step-by-step process to connect your on-premises client to MSK Serverless, overcoming this challenge.

Solution overview

The following diagram illustrates the solution architecture.

The flow of the solution is as follows:

  1. The DNS query for your MSK endpoint is routed to a locally configured on-premises DNS server.
  2. The on-premises DNS as configured performs conditional forwarding for kafka-serverless.REPLACE-MSK-SERVERLESS-REGION.amazonaws.com to an Amazon Route 53 inbound resolver endpoint IP address.
  3. The inbound resolver endpoint performs DNS resolution by forwarding the query to the private hosted zone that was created along with the MSK Serverless cluster.
  4. The IP addresses returned by the DNS query are the private IP addresses of the interface VPC endpoint, which allow your on-premises host to establish private connectivity over AWS VPN or AWS Direct Connect.
  5. The interface endpoint is a collection of one or more elastic network interfaces with a private IP address in your account that serves as an entry point for traffic destined to a MSK Serverless service.

Note that at this time, this solution works only for MSK Serverless clusters with a single VPC.

Prerequisites

In this section, we discuss the prerequisite steps to complete in order to implement this solution.

Establish network connectivity between on premises and the AWS Cloud

To use MSK Serverless from your on-premises network, you need to establish a network connection between your on-premises environment and the VPC that you have set up for MSK Serverless. Various secure methods are available to connect your on-premises network to the AWS Cloud. Refer to Network-to-Amazon VPC connectivity options for more information.

Create a security group for allowing inbound TCP/UDP connections from your on-premises network

Create a security group with the following configurations on the same VPC that you configured for MSK Serverless:

Inbound rule:

  • Source: [On-premises CIDR range]
  • Protocol: TCP/UDP
  • Port Range: 53

Outbound rule: Leave it to default

For more information, refer to Work with security groups.

Update the MSK security group for inbound connections from your on-premises network

To ensure that your MSK Serverless cluster can be accessed from your on-premises network, you need to adjust the cluster’s security group settings to allow incoming traffic from your network on TCP port 9098. Complete the following steps:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Navigate to your serverless MSK cluster’s properties.

  1. Choose the security group associated with your MSK cluster.

Because MSK Serverless supports configuring multiple VPCs, make sure to choose the security group associated with the VPC that you configured for connecting from your on-premises network.

  1. To enable connections from your on-premises CIDR block to MSK Serverless, add an inbound rule that allows traffic on TCP port 9098 from your on-premises CIDR.

This ensures that your on-premises network can communicate with MSK Serverless on the specified port.

Configure a Route 53 inbound resolver endpoint

MSK Serverless provides a DNS endpoint that serves as the starting point for an Apache Kafka client to connect to the cluster. However, this endpoint isn’t publicly discoverable and can only be accessed from within the configured VPC. To resolve the serverless DNS endpoint outside of your VPC, you can set up a Route 53 resolver endpoint. This allows you to access the endpoint securely by creating a hybrid cloud setup over VPN or Direct Connect.

To configure the Route 53 resolver using the console, complete the following steps:

  1. On the Route 53 console, under Resolver in the navigation pane, choose Inbound endpoints.
  2. Choose Create inbound endpoint.

  1. For Endpoint name, enter the endpoint name.
  2. For VPC in the Region, choose the VPC where you configured MSK Serverless.
  3. For Security group for this endpoint, choose the security group that you created as a prerequisite for inbound TCP/UDP connections.

The security group of the inbound resolver endpoint should allow traffic from the on-premises DNS Server IP address on TCP/UDP port 53.

In the next step, you add your IP addresses, ensuring that the number of IP addresses matches the number of subnets in your MSK cluster.

  1. Choose the Availability Zones and subnets that are the same as your MSK Serverless network configuration.
  2. Select Use an IP address that is selected automatically.

  1. Choose Create inbound endpoint.

  1. Copy the inbound endpoint IP addresses.

Configure the on-premises DNS server

In this example, we use a Microsoft DNS server. To configure a conditional forwarder, complete the following steps:

  1. Open DNS Manager.
  2. Run the following command in the Run command window:
dnsmgmt.msc
  1. Choose (right-click) Conditional Forwarders under the server of your choosing, then choose New Conditional Forwarder.


In the next step, you enter kafka-serverless.REPLACE-MSK-SERVERLESS-REGION.amazonaws.com, using the IP address of Route 53 inbound resolver endpoints that you created earlier. You can find the MSK endpoint information by accessing the cluster’s client information. To learn more about getting client information, refer to Getting the bootstrap brokers for an Amazon MSK cluster.

  1. For DNS Domain, enter your endpoint name. For example, kafka-serverless.ap-southeast-2.amazonaws.com. Do not enter the entire endpoint name.
  2. Choose OK.

Test the DNS resolution

DNS (Domain Name System) uses TCP/UDP port 53. To test whether you can connect any of the Route 53 inbound endpoints, run the following command from your on-premises client:

telnet Route53-INBOUND-ENDPOINT-IP 53

For example: telnet 10.1.0.133 53

The following is a sample output:

Trying 10.1.0.133...
Connected to 10.1.0.133.
Escape character is '^]'.
Connection closed by foreign host.

Run the following command to check whether you can connect with the MSK Serverless endpoint from your on-premises client. To get the MSK Serverless endpoint information, refer to Create an MSK Serverless cluster.

dig MSK-SERVERLESS-ENDPOINT-REMOVE-PORT-NUMBER +short

For example: dig boot-abcdc9.c3.kafka-serverless.ap-southeast-2.amazonaws.com +short

The following is a sample output:

vpce-0bcb06d53aab34111-vt8yzx2b.vpce-svc-05dc791a527abcd.ap-southeast-2.vpce.amazonaws.com.
10.1.1.185
10.1.0.191

If the DNS resolution fails, check your network connectivity from on premises. For more information about troubleshooting connectivity issues, refer to How do I troubleshoot VPN tunnel connectivity to an Amazon VPC or Troubleshooting AWS Direct Connect.

After you create a serverless MSK cluster, the service automatically creates an interface VPC endpoint for the cluster. You can use the dig command as shown above to retrieve the VPC endpoint ID and its associated IP address, which confirms that you are now able to connect to the MSK Serverless cluster from your on-premises environment.

Test your Kafka client

Once you complete the configuration of the Route 53 inbound resolver endpoint and on-premises DNS server, you can test your Kafka client from an on-premises network. For instructions, refer to Create a client machine. This documentation guides you through the necessary steps to set up your client machine and verify that it can successfully connect to your MSK cluster from your on-premises network.

Conclusion

MSK Serverless makes it easy for you to manage your data. You don’t have to worry about setting up and running your own Kafka cluster, which saves time and effort. In this post, we explored the option of on-premises connectivity with MSK Serverless and how it can greatly benefit organizations. By establishing this connection, you can gain access to a wide range of real-time analytics use case possibilities and unlock the full potential of your data.

We encourage you to try on-premises connectivity with MSK serverless.


About the Authors

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Akeef Khan is a Solutions Architect at Amazon Web Services. He helps SMB Greenfield customers adopt the cloud. Whilst being a generalist SA, Akeef is passionate about networking.

Accelerating revenue growth with real-time analytics: Poshmark’s journey

Post Syndicated from Mahesh Pasupuleti original https://aws.amazon.com/blogs/big-data/accelerating-revenue-growth-with-real-time-analytics-poshmarks-journey/

This post was co-written by Mahesh Pasupuleti and Gaurav Shah from Poshmark.

Poshmark is a leading social marketplace for new and secondhand styles for women, men, kids, pets, home, and more. By combining the human connection of physical shopping with the scale, ease, and selection benefits of Ecommerce, Poshmark makes buying and selling simple, social, and sustainable. Its community of more than 80 million registered users across the US, Canada, Australia, and India is driving a more sustainable future for the fashion industry.

An important goal to achieve for any organization is to grow the top line revenue. Top line revenue refers to the total value of sales of an organization’s services or products. The two main approaches organizations employ to increase revenue are to expand geographically to enter new markets and to increase market share within a market by improving customer experience (CX).

Improving CX is a well-known guideline to attract and retain customers and thereby increase the market share. In this post, we share how Poshmark improved CX and accelerated revenue growth by using a real-time analytics solution. We discuss how to create such a solution using Amazon Kinesis Data Streams, Amazon Managed Streaming for Kafka (Amazon MSK), Amazon Kinesis Data Analytics for Apache Flink; the design decisions that went into the architecture; and the observed business benefits by Poshmark.

High-level challenge: The need for real-time analytics

Previous efforts at Poshmark for improving CX through analytics were based on batch processing of analytics data and using it on a daily basis to improve CX. Although these batch analytics-based efforts were successful to some extent, they saw opportunities to improve the customer experience with real-time personalization and security guidance during the customer’s interaction with the Poshmark app. The customer insights gathered from the batch analytics couldn’t be paired with the current customer activities in real time due to the latencies involved in enriching the current activities with the knowledge gained through batch processes. Therefore, the opportunity to provide tailored offers or showcase products based on customers’ preference and behaviors in near-real time, which contributes to a much better customer experience, was missing. Similarly, the opportunity to catch fraud within a second, before checkout, was also missing.

To improve the customer experience, Poshmark decided to invest in building a real-time analytics platform to enable real-time capabilities, as explained further in this post. Poshmark engineers worked closely with AWS architects through the AWS Data Lab program. The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. The Design Lab is one half to two day engagement with customer team offering prescriptive guidance to arrive at the optimal solution architecture design before you embark on building the platform.

Designing the solution architecture through the AWS Data Lab process

The business and technical stakeholders from Poshmark and the AWS Data Lab architects discussed near-to-long-term business requirements along with the functional and non-functional capabilities required to decide on the architecture approach. They reviewed the current state architecture and constraints to understand data flow and technical integration points. The joint team discussed the pros and cons of various AWS services that already exist in Poshmark’s current architecture, as well as other AWS services that can meet the requirements.

Poshmark wanted to address the following business use cases via the real-time analytics platform:

  • Sessionization – Poshmark captures both server-side application events and client-side tracking events. They wanted to use these events to identify and analyze user sessions to track behavior.
  • Illegitimate sign-up and sign-in prevention – Poshmark wanted to detect and ban illegitimate sign-up or sign-in events from bots or non-human traffic in real time on the Poshmark application.
  • IP translation – The IP addresses present in events will be translated to city, state, and zip, and enriched with other information to implement near-real-time, location-aware services encompassing security-related functions as well as personalization functions.
  • Anonymization – Poshmark wanted to anonymize events and make the data available for internal users for querying in near-real time.
  • Personalized recommendations – User behavior based on clickstream events can be captured up to the last second before enriching it for personalization and sending it to the model to predict the recommendations.
  • Other use cases – Additional use cases relating to aggregations and machine learning (ML) inference use cases such as authorization to operate, listing spam detection, and avoiding account takeovers (ATOs), among others.

One common pattern identified for these use cases was the need for a central data enrichment pipeline to enrich incoming raw events before event data can be utilized for actual business processing. In the Design Lab, we focused on design for data enrichment pipelines aimed at enriching events with data from static files, dynamic data stores such as databases, APIs, or within the same event stream for the aforementioned streaming use cases. Later in this post, we cover the salient points discussed during the lab around design and architecture.

Batch analytics solution architecture

The following diagram shows the previous architecture at Poshmark. For brevity, only the flow pertaining to the real-time analytics platform is explained.

User interactions on Poshmark web and mobile applications generate server-side events. These events include add to cart, orders, transactions, and more on application servers, and the page view, clicks, and more on tracking servers. Fluentd with an Amazon Kinesis plugin is set up on both the application and tracking servers to send these events to Amazon Kinesis Data Streams. The Fluentd Kinesis plugin aggregates events before sending to Kinesis Data Streams. A single Kinesis data stream is currently set up to capture these events. A random partition key is configured in Fluentd for the events to allow even distribution of events across shards. The event data format is nested JSON. Poshmark maintains the same schema grammar at the first level of JSON for both server-side and client-side server events. The attributes at nested level can differ between server-side and client-side events.

Poshmark receives around 1 billion events per day (100 million per hour during peak hours, 10 million per hour during non-peak hours). The average size of the event record is 1.2 KB.

The data from the Kinesis data stream is consumed by two applications:

  • A Spark streaming application on Amazon EMR is used to write data from the Kinesis data stream to a data lake hosted on Amazon Simple Storage Service (Amazon S3) in a partitioned way. The data from the S3 data lake is used for batch processing and analytics through Amazon EMR and Amazon Redshift.
  • Druid hosted on Amazon Elastic Compute Cloud (Amazon EC2) integrates with the Kinesis data stream for streaming ingestion and allows users to run slice-and-dice OLAP queries. Operational dashboards are hosted on Grafana integrated with Druid.

Desired enhancements to the initial solution

The use cases discussed during the architecture sessions fall into one or more combinations of the following stream processing requirements:

  • Stateless event processing – For example, near-real-time anonymization.
  • External lookup – Looking up a value from external stores. For example, IP address, city, zip, state, or ID.
  • Stateful data processing – Accessing past events or aggregations or ML inferences.

To meet these requirements, the streaming platform is divided into two layers:

  • Central data enrichment – This layer runs enrichments commonly required by downstream streaming applications. This will help avoid replication of the same enrichment logic in each application and enable better operational maintenance. The enrichment should strive for per-record processing in most cases.
  • Specific streaming applications – This layer will house specific streaming applications with respect to use cases and utilize enriched data from the central data enrichment pipeline.

For central data enrichment, we made the following enhancements to the platform:

  • The total latency including ingestion and data enrichment was super critical and should be in the range of double-digit millisecond latency based on the overall latency budget of Poshmark to achieve real-time ML responses to events. The absolute lowest ingestion latency was achieved by Kafka, and the team decided to go with the managed version of Kafka, Amazon MSK.
  • Similarly, low-latency processing of data is also required, and appropriate framework should be considered accordingly.
  • Exactly-once delivery guarantees were required to avoid data duplication resulting in wrong calculations.
  • The enrichment source could be any source such as static files, databases, and APIs and latencies can vary between them. A number of server-side and client-side events are generated when a user interacts with a Poshmark application. As a result, the same information from the enrichment source is required to enrich each event. This frequently accessed information cached in a centralized cache will optimize fetch time.

Design decisions for the new solution

Poshmark made the following design decisions for central data enrichment:

  • Kafka can support double-digit millisecond latency from producer to consumer with appropriate performance tuning. Kafka can provide exactly-once semantics both at producers and consumer applications. AWS provides Kafka as part of its Amazon MSK offering, eliminating the operational overhead of maintaining and running Kafka cluster infrastructure on AWS, thereby allowing you to focus on developing and running Kafka-based applications. Poshmark decided to use Amazon MSK for their streaming ingestion and storage requirements.
  • We also decided to use Flink for streaming data enrichment applications for the following reasons:
    • Flink can provide low-latency processing even at higher throughput with exactly-once guarantees. Spark Structured Streaming on the other hand can provide low latency with low throughput due to microbatch-based processing. Spark Structured Streaming continuous processing is an experimental feature and provides at-least once guarantees.
    • The enrichment requests call to an external store if modeled in a map function (Spark’s map API or Flink’s MapFunction API) will make synchronous calls to the external store. The call will wait for a response from the external store before processing the next event, adding to delays and reducing overall throughput. The asynchronous interaction will allow sending requests and receiving responses concurrently from external stores. This will reduce wait time and improve overall throughput. Flink supports async I/O operators natively, allowing users to use asynchronous request clients with data streams. The API handles the integration with data streams, well as handling order, event time, fault tolerance, and more. Spark Structured Streaming doesn’t provide any such support natively and leaves it to users for custom implementation.
    • Poshmark selected Kinesis Data Analytics for Apache Flink to run the data enrichment application. Kinesis Data Analytics for Apache Flink provides the underlying infrastructure for your Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots).
  • An enrichment microservice accompanying Amazon ElastiCache for Redis was set up to abstract access from data enrichment applications. The AsyncFunction in the Flink async I/O operator isn’t multi-threaded and won’t work in a truly asynchronous way if the call is blocked or waiting for a response. The enrichment microservice handles requests and responses asynchronously coming from Flink async I/O operators. The data is also cached in ElastiCache for Redis to improve the latency of the microservice.
  • The Poshmark ML applications are the consumers of this enriched data. The team has built and deployed different ML models over time. These models include a learning to rank algorithm, fraud detection, personalization and recommendations, and online spam filtering. Previously, for deploying each model into production, the Poshmark team had to go through a series of infrastructure setup steps that involved data extraction from real-time sources, building real-time aggregate features from streaming data, storing these features in a low-latency database (Redis) for sub-millisecond inferences, and finally performing inferences via Amazon SageMaker hosted endpoints.
  • We also designed an ML feature storage pipeline that consumes data from the enriched streaming sources (Kinesis or Kafka), generate single-level and aggregated-level features, and ingest these generated features into a feature store repository with a very low latency of less than 80 milliseconds.
  • The ML models are now able to extract the needed features with latency less than 10 milliseconds from the feature repository and perform real-time model inferencing.

Real-time analytics solution architecture

The following diagram illustrates the solution architecture for real-time analytics with Amazon MSK and Kinesis Data Analytics for Apache Flink.

The workflow is as follows:

  1. Users interact on Poshmark’s web or mobile application.
  2. Server-side events are captured on application servers and client-side events are captured on tracking servers. These events are written in the downstream MSK cluster.
  3. The raw events will be ingested into the MSK cluster using the Fluentd plugin to produce data for Kafka.
  4. The enrichment microservice consists of reactive (asynchronous) enrichment lookup APIs fetching data from persistent data stores. ElastiCache for Redis caches frequently accessed data, reducing fetch time for enrichment lookup APIs.
  5. The Flink application running on Kinesis Data Analytics for Apache Flink consumes raw events from Amazon MSK and runs data enrichment on a per-record basis. The Flink data enrichment application uses Flink’s async I/O to read external data from the enrichment lookup store for enriching stream events.
  6. Enriched events are written in the MSK cluster under different enriched events topics.
  7. The existing Spark streaming application consumes from the enriched events topic (or raw events topic) in Amazon MSK and writes the data into an S3 data lake.
  8. Druid streaming ingestion now reads from the enriched events topic or raw events topic in Amazon MSK depending on the requirements.

Enrichment of the captured event data

In this section, we discuss the different steps to enrich the captured event data.

Enrichment processing

Kinesis Data Analytics for Apache Flink provides the underlying infrastructure for the Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots). You can use the high-level Flink programming features (such as operators, functions, sources, and sinks) in the same way that you use them when hosting the Flink infrastructure yourself.

Flink on Amazon EMR gives the flexibility to choose your Flink version, installation, configuration, instances, and storage. However, you also have to take care of cluster management and operational requirements such as scaling, application backup, and provisioning.

Enrichment lookup store

The AsyncFunction in the Flink async I/O operator isn’t multi-threaded and won’t work in a truly asynchronous way if the call is blocked or waiting for a response. The enrichment lookup API should handle requests and responses asynchronously coming from Flink async I/O operators. The enrichment lookup API can be hosted on Amazon EC2 or containers such as Amazon Elastic Container Service (Amazon ECS) or Amazon Elastic Kubernetes Service (Amazon EKS).

A number of server-side and client-side events are generated when a user interacts with a Poshmark application. As a result, the same information is required to enrich each event. This frequently accessed information cached in a centralized cache can optimize fetch time. The latency to the centralized cache can be further reduced by hosting the client (enrichment lookup API) and cache server in the same Availability Zone.

Reconciliation in case of pipeline errors

The event enrichment can fail in data enrichment applications for various reasons, such as the external store timing out or missing information in the store. The enriched fields may or may not be critical for downstream streaming applications. You should build your downstream streaming applications considering that these failures can occur and implement a fallback mechanism, for example retrying on-demand enrichment from the application. The failure handling will also be governed by latency tolerance of the application.

The processing of data is based on event time. In some situations, data can arrive late in the platform. Both Flink and Spark allow lateness and watermarks for users to handle late-arriving data by defining thresholds. Late-arriving data beyond the threshold is discarded from processing. It’s possible to get this discarded too-late data in Flink using a side output. There is no such provision in Spark Structured Streaming.

A few streaming applications require their batch counterpart to reconcile data hourly or daily to handle data mismatch or data discrepancy due to late-arriving data or missing data.

Improved customer experience

The new real-time architecture offered the following benefits for an improved customer experience:

  • Anonymization – Poshmark is now able to provide and utilize real-time anonymized data for multiple functions both internally and externally because anonymization happens in real time.
  • Fraud mitigation – Poshmark was previously able to detect and prevent 45% of ATOs with the batch-based solution. With the real-time system, Poshmark is able to prevent 80% of ATOs.
  • Personalization – By providing personalized search results, Poshmark achieved an 8% improvement on clickthrough rates for search. This is a significant increase in the top of the funnel, increasing overall search conversions.

Improvement in these three factors helped end-customers gain confidence in the Poshmark app and website, which in turn enabled customers to increase their interaction with the app and helped accelerate customer engagement and growth.

Conclusion

In this post, we discussed the ingestion of real-time clickstream and log event data into Amazon MSK. We showed how enrichment of the captured data can be performed through Kinesis Data Analytics for Apache Flink. We broke up the enrichment processing into multiple components, such as Kinesis Data Analytics for Apache Flink, the enrichment microservices and the enrichment lookup store, and an enrichment cache. We discussed the downstream applications that used this enriched customer information to perform real-time security checks and offer personalized recommendations to end-users. We also discussed some of the areas that may need attention in case there are failures in the pipeline. Lastly, we showed how Poshmark improved their customer experience and gained market share by implementing this real-time analytics pipeline.


About the authors

Mahesh Pasupuleti is a VP of Data & Machine Learning Engineering at Poshmark. He has helped several startups succeed in different domains, including media streaming, healthcare, the financial sector, and marketplaces. He loves software engineering, building high performance teams, and strategy, and enjoys gardening and playing badminton in his free time.

Gaurav Shah is Director of Data Engineering and ML at Poshmark. He and his team help build data-driven solutions to drive growth at Poshmark.

Raghu Mannam is a Sr. Solutions Architect at AWS in San Francisco. He works closely with late-stage startups, many of which have had recent IPOs. His focus is end-to-end solutioning including security, DevOps automation, resilience, analytics, machine learning, and workload optimization in the cloud.

Deepesh Malviya is Solutions Architect Manager on the AWS Data Lab team. He and his team help customers architect and build data, analytics, and machine learning solutions to accelerate their key initiatives as part of the AWS Data Lab.

How to choose the right Amazon MSK cluster type for you

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/how-to-choose-the-right-amazon-msk-cluster-type-for-you/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is an AWS streaming data service that manages Apache Kafka infrastructure and operations, making it easy for developers and DevOps managers to run Apache Kafka applications and Kafka Connect connectors on AWS, without the need to become experts in operating Apache Kafka. Amazon MSK operates, maintains, and scales Apache Kafka clusters, provides enterprise-grade security features out of the box, and has built-in AWS integrations that accelerate development of streaming data applications. You can easily get started by creating an MSK cluster using the AWS Management Console with a few clicks.

When creating a cluster, you must choose a cluster type from two options: provisioned or serverless. Choosing the best cluster type for each workload depends on the type of workload and your DevOps preferences. Amazon MSK provisioned clusters offer more flexibility in how you scale, configure, and optimize your cluster. Amazon MSK Serverless, on the other hand, makes scaling, load management, and operation of the cluster easier for you. With MSK Serverless, you can run your applications without having to configure, manage the infrastructure, or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring as well as ensuring an even balance of partition distribution across brokers in the cluster (auto-balancing).

In this post, I examine a use case with the fictitious company AnyCompany, who plans to use Amazon MSK for two applications. They must decide between provisioned or serverless cluster types. I describe a process by which they work backward from the applications’ requirements to find the best MSK cluster type for their workloads, including how the organizational structure and application requirements are relevant in finding the best offering. Lastly, I examine the requirements and their relationship to Amazon MSK features.

Use case

AnyCompany is an enterprise organization that is ready to move two of their Kafka applications to Amazon MSK.

The first is a large ecommerce platform, which is a legacy application that currently uses a self-managed Apache Kafka cluster run in their data centers. AnyCompany wants to migrate this application to the AWS Cloud and use Amazon MSK to reduce maintenance and operations overhead. AnyCompany has a DataOps team that has been operating self-managed Kafka clusters in their data centers for years. AnyCompany wants to continue using the DataOps team to manage the MSK cluster on behalf of the development team. There is very little flexibility for code changes. For example, a few modules of the application require plaintext communication and access to the Apache ZooKeeper cluster that comes with an MSK cluster. The ingress throughput for this application doesn’t fluctuate often. The ecommerce platform only experiences a surge in user activity during special sales events. The DataOps team has a good understanding of this application’s traffic pattern, and are confident that they can optimize an MSK cluster by setting some custom broker-level configurations.

The second application is a new cloud-native gaming application currently in development. AnyCompany hopes to launch this gaming application soon followed by a marketing campaign. Throughput needs for this application are unknown. The application is expected to receive high traffic initially, then user activity should decline gradually. Because the application is going to launch first in the US, traffic during the day is expected to be higher than at night. This application offers a lot of flexibility in terms of Kafka client version, encryption in transit, and authentication. Because this is a cloud-native application, AnyCompany hopes they can delegate full ownership of its infrastructure to the development team.

Solution overview

Let’s examine a process that helps AnyCompany decide between the two Amazon MSK offerings. The following diagram shows this process at a high level.

In the following sections, I explain each step in detail and the relevant information that AnyCompany needs to collect before they make a decision.

Competency in Apache Kafka

AWS recommends a list of best practices to follow when using the Amazon MSK provisioned offering. Amazon MSK provisioned, offers more flexibility so you make scaling decisions based on what’s best for your workloads. For example, you can save on cost by consolidating a group of workloads into a single cluster. You can decide which metrics are important to monitor and optimize your cluster through applying custom configurations to your brokers. You can choose your Apache Kafka version, among different supported versions, and decide when to upgrade to a new version. Amazon MSK takes care of applying your configuration and upgrading each broker in a rolling fashion.

With more flexibility, you have more responsibilities. You need to make sure your cluster is right-sized at any time. You can achieve this by monitoring a set of cluster-level, broker-level, and topic-level metrics to ensure you have enough resources that are needed for your throughput. You also need to make sure the number of partitions assigned to each broker doesn’t exceed the numbers suggested by Amazon MSK. If partitions are unbalanced, you need to even-load them across all brokers. If you have more partitions than recommended, you need to either upgrade brokers to a larger size or increase the number of brokers in your cluster. There are also best practices for the number of TCP connections when using AWS Identity and Access Management (IAM) authentication.

An MSK Serverless cluster takes away the complexity of right-sizing clusters and balancing partitions across brokers. This makes it easy for developers to focus on writing application code.

AnyCompany has an experienced DataOps team who are familiar with scaling operations and best practices for the MSK provisioned cluster type. AnyCompany can use their DataOps team’s Kafka expertise for building automations and easy-to-follow standard procedures on behalf of the ecommerce application team. The gaming development team is an exception, because they are expected to take the full ownership of the infrastructure.

In the following sections, I discuss other steps in the process before deciding which cluster type is right for each application.

Custom configuration

In certain use cases, you need to configure your MSK cluster differently from its default settings. This could be due to your application requirements. For example, AnyCompany’s ecommerce platform requires setting up brokers such that the default retention period for all topics is set to 72 hours. Also, topics should be or auto-created when they are requested and don’t exist.

The Amazon MSK provisioned offering provides a default configuration for brokers, topics, and Apache ZooKeeper nodes. It also allows you to create custom configurations and use them to create new MSK clusters or update existing clusters. An MSK cluster configuration consists of a set of properties and their corresponding values.

MSK Serverless doesn’t allow applying broker-level configuration. This is because AWS takes care of configuring and managing the backend nodes. It takes away the heavy lifting of configuring the broker nodes. You only need to manage your applications’ topics. To learn more, refer to the list of topic-level configurations that MSK Serverless allows you to change.

Unlike the ecommerce platform, AnyCompany’s gaming application doesn’t need broker-level custom configuration. The developers want to set the retention.ms and max.message.bytes per each topic only.

Application requirements

Apache Kafka applications differ in terms of their security; the way they connect, write, or read data; data retention period; and scaling patterns. For example, some applications can only scale vertically, whereas other applications can scale only horizontally. Although a flexible application can work with encryption in transit, a legacy application may only be able to communicate in plaintext format.

Cluster-level quotas

Amazon MSK enforces some quotas to ensure the performance, reliability, and availability of the service for all customers. These quotas are subject to change at any time. To access the latest values for each dimension, refer to Amazon MSK quota. Note that some of the quotas are soft limits and can be increased using a support ticket.

When choosing a cluster type in Amazon MSK, it’s important to understand your application requirements and compare those against quotas in relation with each offering. This makes sure you choose the best cluster type that meets your goals and application’s needs. Let’s examine how you can calculate the throughput you need and other important dimensions you need to compare with Amazon MSK quotas:

  • Number of clusters per account – Amazon MSK may have quotas for how many clusters you can create in a single AWS account. If this is limiting your ability to create more clusters, you can consider creating those in multiple AWS accounts and using secure connectivity patterns to provide access to your applications.
  • Message size – You need to make sure the maximum message size that your producer writes for a single message is lower than the configured size in the MSK cluster. MSK provisioned clusters allow you to change the default value in a custom configuration. If you choose MSK Serverless, check this value in Amazon MSK quota. The average message size is helpful when calculating the total ingress or egress throughput of the cluster, which I demonstrate later in this post.
  • Message rate per second – This directly influences total ingress and egress throughput of the cluster. Total ingress throughput equals the message rate per second multiplied by message size. You need to make sure your producer is configured for optimal throughput by adjusting batch.size and linger.ms properties. If you’re choosing MSK Serverless, you need to make sure you configure your producer to optimal batches with the rate that is lower than its request rate quota.
  • Number of consumer groups – This directly influences the total egress throughput of the cluster. Total egress throughput equals the ingress throughput multiplied by the number of consumer groups. If you’re choosing MSK Serverless, you need to make sure your application can work with these quotas.
  • Maximum number of partitions – Amazon MSK provisioned recommends not exceeding certain limits per broker (depending the broker size). If the number of partitions per broker exceeds the maximum value specified in the previous table, you can’t perform certain upgrade or update operations. MSK Serverless also has a quota of maximum number of partitions per cluster. You can request to increase the quota by creating a support case.

Partition-level quotas

Apache Kafka organizes data in structures called topics. Each topic consists of a single or many partitions. Partitions are the degree of parallelism in Apache Kafka. The data is distributed across brokers using data partitioning. Let’s examine a few important Amazon MSK requirements, and how you can make sure which cluster type works better for your application:

  • Maximum throughput per partition – MSK Serverless automatically balances the partitions of your topic between the backend nodes. It instantly scales when your ingress throughput increases. However, each partition has a quota of how much data it accepts. This is to ensure the data is distributed evenly across all partitions and backend nodes. In an MSK Serverless cluster, you need to create your topic with enough partitions such that the aggregated throughput is equal to the maximum throughput your application requires. You also need to make sure your consumers read data with a rate that is below the maximum egress throughput per partition quota. If you’re using Amazon MSK provisioned, there is no partition-level quota for write and read operations. However, AWS recommends you monitor and detect hot partitions and control how partitions should balance among the broker nodes.
  • Data storage – The amount of time each message is kept in a particular topic directly influences the total amount of storage needed for your cluster. Amazon MSK allows you to manage the retention period at the topic level. MSK provisioned clusters allow broker-level configuration to set the default data retention period. MSK Serverless clusters allow unlimited data retention, but there is a separate quota for the maximum data that can be stored in each partition.

Security

Amazon MSK recommends that you secure your data in the following ways. Availability of the security features varies depending on the cluster type. Before making a decision about your cluster type, check if your preferred security options are supported by your choice of cluster type.

  • Encryption at rest – Amazon MSK integrates with AWS Key Management Service (AWS KMS) to offer transparent server-side encryption. Amazon MSK always encrypts your data at rest. When you create an MSK cluster, you can specify the KMS key that you want Amazon MSK to use to encrypt your data at rest.
  • Encryption in transit – Amazon MSK uses TLS 1.2. By default, it encrypts data in transit between the brokers of your MSK cluster. You can override this default when you create the cluster. For communication between clients and brokers, you must specify one of the following settings:
    • Only allow TLS encrypted data. This is the default setting.
    • Allow both plaintext and TLS encrypted data.
    • Only allow plaintext data.
  • Authentication and authorization – Use IAM to authenticate clients and allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.

Cost of ownership

Amazon MSK helps you avoid spending countless hours and significant resources just managing your Apache Kafka cluster, adding little or no value to your business. With a few clicks on the Amazon MSK console, you can create highly available Apache Kafka clusters with settings and configuration based on Apache Kafka’s deployment best practices. Amazon MSK automatically provisions and runs Apache Kafka clusters. Amazon MSK continuously monitors cluster health and automatically replaces unhealthy nodes with no application downtime. In addition, Amazon MSK secures Apache Kafka clusters by encrypting data at rest and in transit. These capabilities can significantly reduce your Total Cost of Ownership (TCO).

With MSK provisioned clusters, you can specify and then scale cluster capacity to meet your needs. With MSK Serverless clusters, you don’t need to specify or scale cluster capacity. MSK Serverless automatically scales the cluster capacity based on the throughput, and you only pay per GB of data that your producers write to and your consumers read from the topics in your cluster. Additionally, you pay an hourly rate for your serverless clusters and an hourly rate for each partition that you create. The MSK Serverless cluster type generally offers a lower cost of ownership by taking away the cost of engineering resources needed for monitoring, capacity planning, and scaling MSK clusters. However, if your organization has a DataOps team with Kafka competency, you can use this competency to operate optimized MSK provisioned clusters. This allows you to save on Amazon MSK costs by consolidating several Kafka applications into a single cluster. There are a few critical considerations to decide when and how to split your workloads between multiple MSK clusters.

Apache ZooKeeper

Apache ZooKeeper is a service included in Amazon MSK when you create a cluster. It manages the Apache Kafka metadata and acts as a quorum controller for leader elections. Although interacting with ZooKeeper is not a recommended pattern, some Kafka applications have a dependency to connect directly to ZooKeeper. During the migration to Amazon MSK, you may find a few of these applications in your organization. This could be because they use an older version of the Kafka client library or other reasons. For example, applications that help with Apache Kafka admin operations or visibility such as Cruise Control usually need this kind of access.

Before you choose your cluster type, you first need to check which offering provides direct access to the ZooKeeper cluster. As of writing this post, only Amazon MSK provisioned provides direct access to ZooKeeper.

How AnyCompany chooses their cluster types

AnyCompany first needs to collect some important requirements about each of their applications. The following table shows these requirements. The rows marked with an asterisk (*) are calculated based on the values in previous rows.

Dimension Ecommerce Platform Gaming Application
Message rate per second 150,000 1,000
Maximum message size 15 MB 1 MB
Average message size 30 KB 15 KB
* Ingress throughput (average message size * message rate per second) 4.5GBps 15MBps
Number of consumer groups 2 1
* Outgress throughput (ingress throughput * number of consumer groups) 9 GBps 15 MBps
Number of topics 100 10
Average partition per topic 100 5
* Total number of partitions (number of topics * average partition per topic) 10,000 50
* Ingress per partition (ingress throughput / total number of partitions) 450 KBps 300 KBps
* Outgress per partition (outgress throughput / total number of partitions) 900 KBps 300 KBps
Data retention 72 hours 168 hours
* Total storage needed (ingress throughput * retention period in seconds) 1,139.06 TB 1.3 TB
Authentication Plaintext and SASL/SCRAM IAM
Need ZooKeeper access Yes No

For the gaming application, AnyCompany doesn’t want to use their in-house Kafka competency to support an MSK provisioned cluster. Also, the gaming application doesn’t need custom configuration, and its throughput needs are below the quotas set by the MSK Serverless cluster type. In this scenario, an MSK Serverless cluster makes more sense.

For the e-commerce platform, AnyCompany wants to use their Kafka competency. Moreover, their throughput needs exceed the MSK Serverless quotas, and the application requires some broker-level custom configuration. The ecommerce platform also can’t split between multiple clusters. Because of these reasons, AnyCompany chooses the MSK provisioned cluster type in this scenario. Additionally, AnyCompany can save more on cost with the Amazon MSK provisioned pricing model. Their throughput is consistent at most times and AnyCompany wants to use their DataOps team to optimize a provisioned MSK cluster and make scaling decisions based on their own expertise.

Conclusion

Choosing the best cluster type for your applications may seem complicated at first. In this post, I showed a process that helps you work backward from your application’s requirement and the resources available to you. MSK provisioned clusters offer more flexibility in how you scale, configure, and optimize your cluster. MSK Serverless, on the other hand, is a cluster type that makes it easier for you to run Apache Kafka clusters without having to manage compute and storage capacity. I generally recommend you begin with MSK Serverless if your application doesn’t require broker-level custom configurations, and your application throughput needs don’t exceed the quotas for the MSK Serverless cluster type. Sometimes it’s best to split your workloads between multiple MSK Serverless clusters, but if that isn’t possible, you may need to consider an MSK provisioned cluster. To operate an optimized MSK provisioned cluster, you need to have Kafka competency within your organization.

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Build an end-to-end change data capture with Amazon MSK Connect and AWS Glue Schema Registry

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/

The value of data is time sensitive. Real-time processing makes data-driven decisions accurate and actionable in seconds or minutes instead of hours or days. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations. The following are a few benefits of CDC:

  • It eliminates the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming of data changes into your target repository.
  • It ensures that data in multiple systems stays in sync. This is especially important if you’re making time-sensitive decisions in a high-velocity data environment.

Kafka Connect is an open-source component of Apache Kafka that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect provides a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by Kafka connectors (source) that produce data to Kafka. Connectors (sink) that consume data from Kafka receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide capabilities like maintaining a database table schema in a data catalog.

The post demonstrates how to build an end-to-end CDC using Amazon MSK Connect, an AWS managed service to deploy and run Kafka Connect applications and AWS Glue Schema Registry, which allows you to centrally discover, control, and evolve data stream schemas.

Solution overview

On the producer side, for this example we choose a MySQL-compatible Amazon Aurora database as the data source, and we have a Debezium MySQL connector to perform CDC. The Debezium connector continuously monitors the databases and pushes row-level changes to a Kafka topic. The connector fetches the schema from the database to serialize the records into a binary form. If the schema doesn’t already exist in the registry, the schema will be registered. If the schema exists but the serializer is using a new version, the schema registry checks the compatibility mode of the schema before updating the schema. In this solution, we use backward compatibility mode. The schema registry returns an error if a new version of the schema is not backward compatible, and we can configure Kafka Connect to send incompatible messages to the dead-letter queue.

On the consumer side, we use an Amazon Simple Storage Service (Amazon S3) sink connector to deserialize the record and store changes to Amazon S3. We build and deploy the Debezium connector and the Amazon S3 sink using MSK Connect.

Example schema

For this post, we use the following schema as the first version of the table:

{ 
    “Database Name”: “sampledatabase”, 
    “Table Name”: “movies”, 
    “Fields”: [
         { 
            “name”: “movie_id”, 
            “type”: “INTEGER” 
         },
         { 
            “name”: “title”, 
            “type”: “STRING” 
         },
         { 
            “name”: “release_year”,
            “type”: “INTEGER” 
         }
     ] 
}

Prerequisites

Before configuring the MSK producer and consumer connectors, we need to first set up a data source, MSK cluster, and new schema registry. We provide an AWS CloudFormation template to generate the supporting resources needed for the solution:

  • A MySQL-compatible Aurora database as the data source. To perform CDC, we turn on binary logging in the DB cluster parameter group.
  • An MSK cluster. To simplify the network connection, we use the same VPC for the Aurora database and the MSK cluster.
  • Two schema registries to handle schemas for message key and message value.
  • One S3 bucket as the data sink.
  • MSK Connect plugins and worker configuration needed for this demo.
  • One Amazon Elastic Compute Cloud (Amazon EC2) instance to run database commands.

To set up resources in your AWS account, complete the following steps in an AWS Region that supports Amazon MSK, MSK Connect, and the AWS Glue Schema Registry:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter suitable name.
  4. For Database Password, enter the password you want for the database user.
  5. Keep other values as default.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Custom plugin for the source and destination connector

A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the MSK Connect cluster where the connector is running. As part of this demo, for the source connector we use open-source Debezium MySQL connector JARs, and for the destination connector we use the Confluent community licensed Amazon S3 sink connector JARs. Both the plugins are also added with libraries for Avro Serializers and Deserializers of the AWS Glue Schema Registry. These custom plugins are already created as part of the CloudFormation template deployed in the previous step.

Use the AWS Glue Schema Registry with the Debezium connector on MSK Connect as the MSK producer

We first deploy the source connector using the Debezium MySQL plugin to stream data from an Amazon Aurora MySQL-Compatible Edition database to Amazon MSK. Complete the following steps:

  1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
  2. Choose Create connector.
  3. Choose Use existing custom plugin and then pick the custom plugin with name starting msk-blog-debezium-source-plugin.
  4. Choose Next.
  5. Enter a suitable name like debezium-mysql-connector and an optional description.
  6. For Apache Kafka cluster, choose MSK cluster and choose the cluster created by the CloudFormation template.
  7. In Connector configuration, delete the default values and use the following configuration key-value pairs and with the appropriate values:
    • name – The name used for the connector.
    • database.hostsname – The CloudFormation output for Database Endpoint.
    • database.user and database.password – The parameters passed in the CloudFormation template.
    • database.history.kafka.bootstrap.servers – The CloudFormation output for Kafka Bootstrap.
    • key.converter.region and value.converter.region – Your Region.
name=<Connector-name>
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=<DBHOST>
database.port=3306
database.user=<DBUSER>
database.password=<DBPASSWORD>
database.server.id=42
database.server.name=db1
table.whitelist=sampledatabase.movies
database.history.kafka.bootstrap.servers=<MSK-BOOTSTRAP>
database.history.kafka.topic=dbhistory.demo1
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=<REGION>
value.converter.region=<REGION>
key.converter.registry.name=msk-connect-blog-keys
value.converter.registry.name=msk-connect-blog-values
key.converter.compatibility=FORWARD
value.converter.compatibility=FORWARD
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,source.ts_ms
tasks.max=1

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector
  • tasks.max is the maximum number of tasks that should be created for this connector

Some settings (database.*, transforms.*) are specific to the Debezium MySQL connector. Refer to Debezium MySQL Source Connector Configuration Properties for more information.

Some settings (key.converter.* and value.converter.*) are specific to the Schema Registry. We use the AWSKafkaAvroConverter from the AWS Glue Schema Registry Library as the format converter. To configure AWSKafkaAvroConverter, we use the value of the string constant properties in the AWSSchemaRegistryConstants class:

  • key.converter and value.converter control the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. We use AWSKafkaAvroConverter for Avro format.
  • key.converter.registry.name and value.converter.registry.name define which schema registry to use.
  • key.converter.compatibility and value.converter.compatibility define the compatibility model.

Refer to Using Kafka Connect with AWS Glue Schema Registry for more information.

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the AWS Identity and Access Management (IAM) role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template (msk-connector-logs).
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector changes to running status.

Use the AWS Glue Schema Registry with the Confluent S3 sink connector running on MSK Connect as the MSK consumer

We deploy the sink connector using the Confluent S3 sink plugin to stream data from Amazon MSK to Amazon S3. Complete the following steps:

    1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
    2. Choose Create connector.
    3. Choose Use existing custom plugin and choose the custom plugin with name starting msk-blog-S3sink-plugin.
    4. Choose Next.
    5. Enter a suitable name like s3-sink-connector and an optional description.
    6. For Apache Kafka cluster, choose MSK cluster and select the cluster created by the CloudFormation template.
    7. In Connector configuration, delete the default values provided and use the following configuration key-value pairs with appropriate values:
        • name – The same name used for the connector.
        • s3.bucket.name – The CloudFormation output for Bucket Name.
        • s3.region, key.converter.region, and value.converter.region – Your Region.
name=<CONNERCOR-NAME>
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.bucket.name=<BUCKET-NAME>
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
s3.region=<REGION>
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=10
tasks.max=1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.region=<REGION>
value.converter.region=<REGION>
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
key.converter.compatibility=NONE
store.kafka.keys=false
schema.compatibility=NONE
topics=db1.sampledatabase.movies
value.converter.registry.name=msk-connect-blog-values
key.converter.registry.name=msk-connect-blog-keys
store.kafka.headers=false
  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the IAM role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template msk-connector-logs.
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector is running.

Test the end-to-end CDC log stream

Now that both the Debezium and S3 sink connectors are up and running, complete the following steps to test the end-to-end CDC:

  1. On the Amazon EC2 console, navigate to the Security groups page.
  2. Select the security group ClientInstanceSecurityGroup and choose Edit inbound rules.
  3. Add an inbound rule allowing SSH connection from your local network.
  4. On the Instances page, select the instance ClientInstance and choose Connect.
  5. On the EC2 Instance Connect tab, choose Connect.
  6. Ensure your current working directory is /home/ec2-user and it has the files create_table.sql, alter_table.sql , initial_insert.sql, and insert_data_with_new_column.sql.
  7. Create a table in your MySQL database by running the following command (provide the database host name from the CloudFormation template outputs):
mysql -h <DATABASE-HOST> -u master -p < create_table.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. Insert some sample data into the table with the following command:
mysql -h <DATABASE-HOST> -u master -p < initial_insert.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  3. Navigate to db1.sampledatabase.movies version 1 to check the new schema created for the movies table:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}

A separate S3 folder is created for each partition of the Kafka topic, and data for the topic is written in that folder.

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for your Kafka topic.

Schema evolution

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly. Compatibility modes allow you to control how schemas can or can’t evolve over time. These modes form the contract between applications producing and consuming data. For detailed information about different compatibility modes available in the AWS Glue Schema Registry, refer to AWS Glue Schema Registry. In our example, we use backward combability to ensure consumers can read both the current and previous schema versions. Complete the following steps:

  1. Add a new column to the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < alter_table.sql
  1. Insert new data into the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < insert_data_with_new_column.sql
  1. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  2. Navigate to the schema db1.sampledatabase.movies version 2 to check the new version of the schema created for the movies table movies including the country column that you added:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "COUNTRY",
      "type": "string"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}
  1. On the Amazon S3 console, check for data written in Parquet format in the folder for the Kafka topic.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post:

  1. On the Amazon S3 console, navigate to the S3 bucket created by the CloudFormation template.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. On the AWS CloudFormation console, delete the stack you created.
  5. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

This post demonstrated how to use Amazon MSK, MSK Connect, and the AWS Glue Schema Registry to build a CDC log stream and evolve schemas for data streams as business needs change. You can apply this architecture pattern to other data sources with different Kafka connecters. For more information, refer to the MSK Connect examples.


About the Author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Enhance operational insights for Amazon MSK using Amazon Managed Service for Prometheus and Amazon Managed Grafana

Post Syndicated from Anand Mandilwar original https://aws.amazon.com/blogs/big-data/enhance-operational-insights-for-amazon-msk-using-amazon-managed-service-for-prometheus-and-amazon-managed-grafana/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is an event streaming platform that you can use to build asynchronous applications by decoupling producers and consumers. Monitoring of different Amazon MSK metrics is critical for efficient operations of production workloads. Amazon MSK gathers Apache Kafka metrics and sends them to Amazon CloudWatch, where you can view them. You can also monitor Amazon MSK with Prometheus, an open-source monitoring application. Many of our customers use such open-source monitoring tools like Prometheus and Grafana, but doing it in self-managed environment comes with its own challenges regarding manageability, availability, and security.

In this post, we show how you can build an AWS Cloud native monitoring platform for Amazon MSK using the fully managed, highly available, scalable, and secure services Amazon Managed service for Prometheus and Amazon Managed Grafana for better operational insights.

Why is Kafka monitoring critical?

As a critical component of the IT infrastructure, it is necessary to track Amazon MSK clusters’ operations and their efficiencies. Amazon MSK metrics helps monitor critical tasks while operating applications. You can not only troubleshoot problems that have already occurred, but also discover anomalous behavior patterns and prevent problems from occurring in the first place.

Some customers currently use various third-party monitoring solutions like lenses.io, AppDynamics, Splunk, and others to monitor Amazon MSK operational metrics. In the context of cloud computing, customers are looking for an AWS Cloud native service that offers equivalent or better capabilities but with the added advantage of being highly scalable, available, secure, and fully managed.

Amazon MSK clusters emit a very large number of metrics via JMX, many of which can be useful for tuning the performance of your cluster, producers, and consumers. However, that large volume brings complexity with monitoring. By default, Amazon MSK clusters come with CloudWatch monitoring of your essential metrics. You can extend your monitoring capabilities by using open-source monitoring with Prometheus. This feature enables you to scrape a Prometheus friendly API to gather all the JMX metrics and work with the data in Prometheus.

This solution provides a simple and easy observability platform for Amazon MSK along with much needed insights into various critical operational metrics that yields the following organizational benefits for your IT operations or application teams:

  • You can quickly drill down to various Amazon MSK components (broker level, topic level, or cluster level) and identify issues that need investigation
  • You can investigate Amazon MSK issues after the event using the historical data in Amazon Managed Service for Prometheus
  • You can shorten or eliminate long calls that waste time questioning business users on Amazon MSK issues

In this post, we set up Amazon Managed Service for Prometheus, Amazon Managed Grafana, and a Prometheus server running as container on Amazon Elastic Compute Cloud (Amazon EC2) to provide a fully managed monitoring solution for Amazon MSK.

The solution provides an easy-to-configure dashboard in Amazon Managed Grafana for various critical operation metrics, as demonstrated in the following video.

Solution overview

Amazon Managed Service for Prometheus reduces the heavy lifting required to get started with monitoring applications across Amazon MSK, Amazon Elastic Kubernetes Service (Amazon EKS), Amazon Elastic Container Service (Amazon ECS), and AWS Fargate, as well as self-managed Kubernetes clusters. The service also seamlessly integrates with Amazon Managed Grafana to simplify data visualization, team management authentication, and authorization.

Grafana empowers you to create dashboards and alerts from multiple sources such as an Amazon Managed Prometheus workspace, CloudWatch, AWS X-Ray, Amazon OpenSearch Service, Amazon Redshift, and Amazon Timestream.

The following diagram demonstrates the solution architecture. This solution deploys a Prometheus server running as a container within Amazon EC2, which constantly scrapes metrics from the MSK brokers and remote write metrics to an Amazon Managed Service for Prometheus workspace. As of this writing, Amazon Managed Service for Prometheus is not able to scrape the metrics directly, therefore a Prometheus server is necessary to do so. We use Amazon Managed Grafana to query and visualize the operational metrics for the Amazon MSK platform.

Solution Architecture

The following are the high-level steps to deploy the solution:

  1. Create an EC2 key pair.
  2. Configure your Amazon MSK cluster and associated resources. We demonstrate how to configure an existing Amazon MSK cluster or create a new one.
    1. Option A:- Modify an existing Amazon MSK cluster
    2. Option B:- Create a new Amazon MSK cluster
  3. Enable AWS IAM Identity Center (successor to AWS Single Sign-On), if not enabled.
  4. Configure Amazon Managed Grafana and Amazon Managed Service for Prometheus.
  5. Configure Prometheus and start the service.
  6. Configure the data sources in Amazon Managed Grafana.
  7. Import the Grafana dashboard.

Prerequisites

git clone https://github.com/aws-samples/amazonmsk-managed-observability
  • You download three CloudFormation template files along with the Prometheus configuration file (prometheus.yml), targets.json file (you need this to update the MSK broker DNS later on), and three JSON files for creating a dashboard within Amazon Managed Grafana.
  • Make sure internet connection is allowed to download docker image of Prometheus from within Prometheus server

1. Create an EC2 key pair

To create your EC2 key pair, complete the following steps:

  1. On the Amazon EC2 console, under Network & Security in the navigation pane, choose Key Pairs.
  2. Choose Create key pair.
  3. For Name, enter DemoMSKKeyPair.
  4. For Key pair type¸ select RSA.
  5. For Private key file format, choose the format in which to save the private key:
    1. To save the private key in a format that can be used with OpenSSH, select .pem.
    2. To save the private key in a format that can be used with PuTTY, select .ppk.

Create EC2 Key Pair - DemoMSKKeyPair

The private key file is automatically downloaded by your browser. The base file name is the name that you specified as the name of your key pair, and the file name extension is determined by the file format that you chose.

  1. Save the private key file in a safe place.

2. Configure your Amazon MSK cluster and associated resources.

Using the following options to configure an existing Amazon MSK cluster or create a new one.

2.a Modify an existing Amazon MSK cluster

If you want to create a new Amazon MSK cluster for this solution, skip to the section – 2.b.Create a new Amazon MSK cluster, otherwise complete the steps in this section to modify an existing cluster.

Validate cluster monitoring settings

We must enable enhanced partition-level monitoring (available at an additional cost) and open monitoring with Prometheus. Note that open monitoring with Prometheus is only available for provisioned mode clusters.

  1. Sign in to the account where the Amazon MSK cluster is that you want to monitor.
  2. Open your Amazon MSK cluster.
  3. On the Properties tab, navigate to Monitoring metrics.
  4. Check the monitoring level for Amazon CloudWatch metrics for this cluster, and choose Edit to edit the cluster.
  5. Select Enhance partition-level monitoring.

Editing monitoring attributes of Amazon MSK Cluster

  1. Check the monitoring label for Open monitoring with Prometheus, and choose Edit to edit the cluster.
  2. Select Enable open monitoring for Prometheus.
  3. Under Prometheus exporters, select JMX Exporter and Note Exporter.

enable Open monitoring with Prometheus

  1. Under Broker log delivery, select Deliver to Amazon CloudWatch Logs.
  2. For Log group, enter your log group for Amazon MSK.
  3. Choose Save changes.

Deploy CloudFormation stack

Now we deploy the CloudFormation stack Prometheus_Cloudformation.yml that we downloaded earlier.

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose Create stack.
  3. For Prepare template, select Template is ready.
  4. For Template source, select Upload a template.
  5. Upload the Prometheus_Cloudformation.yml file, then choose Next.

CloudFormation - Create Stack for Prometheus_Cloudformation.yml

  1. For Stack name, enter Prometheus.
  2. VPCID – Provide the VPC ID where your Amazon MSK cluster is deployed (mandatory)
  3. VPCCIdr – Provide the VPC CIDR where your Amazon MSK Cluster is deployed (mandatory)
  4. SubnetID – Provide any one of the subnets ID where your existing Amazon MSK cluster is deployed (mandatory)
  5. MSKClusterName – Provide the name your existing Amazon MSK Cluster
  6. Leave Cloud9InstanceType, KeyName, and LatestAmild as default.
  7. Choose Next.

CloudFormation - Specify stack details for Prometheus Stack

  1. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  2. Choose Create stack.

You’re redirected to the AWS CloudFormation console, and can see the status as CREATE_IN_PROGRESSS. Wait until the status changes to COMPLETE.

CloudFormation stack creation status for Prometheus Stack

  1. On the stack’s Outputs tab, note the values for the following keys (if you don’t see anything under Outputs tab, click on refresh icon):
    1. PrometheusInstancePrivateIP
    2. PrometheusSecurityGroupId

getting private IP and Security Group ID for Prometheus Stack

Update the Amazon MSK cluster security group

Complete the following steps to update the security group of the existing Amazon MSK cluster to allow communication from the Kafka client and Prometheus server:

  1. On the Amazon MSK console, navigate to your Amazon MSK cluster.
  2. On the Properties tab, under Network settings, open the security group.
  3. Choose Edit inbound rules.
  4. Choose Add rule and create your rule with the following parameters:
    1. Type – Custom TCP
    2. Port range – 11001–11002
    3. Source – The Prometheus server security group ID

Set up your AWS Cloud9 environment

To configure your AWS Cloud9 environment, complete the following steps:

  1. On the AWS Cloud9 console, choose Environments in the navigation pane.
  2. Select Cloud9EC2Bastion and choose Open in Cloud9.

AWS Cloud9 home page

  1. Close the Welcome tab and open a new terminal tab
  2. Create an SSH key file with the contents from the private key file DemoMSKKeyPair using the following command:
    touch /home/ec2-user/environment/EC2KeyMSKDemo

  3. Run the following command to list the newly created key file
    ls -ltr

  4. Open the file, enter the contents of the private key file DemoMSKKeyPair, then save the file.

Updating the key file with the private file content

  1. Change the permissions of the file using the following command:
    chmod 600 /home/ec2-user/environment/EC2KeyMSKDemo

  2. Log in to the Prometheus server using this key file and the private IP noted earlier:
    ssh -i /home/ec2-user/environment/EC2KeyMSKDemo ec2-user@<Private-IP-of-Prometheus-instance>

  3. Once you’re logged in, check if the Docker service is up and running using the following command:
    systemctl status docker

checking docker service status

  1. To exit the server, enter exit and press Enter.

2.b Create a new Amazon MSK cluster

If you don’t have an Amazon MSK cluster running in your environment, or you don’t want to use an existing cluster for this solution, complete the steps in this section.

As part of these steps, your cluster will have the following properties:

Deploy CloudFormation stack

Complete the following steps to deploy the CloudFormation stack MSKResource_Cloudformation.yml:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose Create stack.
  3. For Prepare template, select Template is ready.
  4. For Template source, select Upload a template.
  5. Upload the MSKResource_Cloudformation.yml file, then choose Next.
  6. For Stack name, enter MSKDemo.
  7. Network Configuration – Generic (mandatory)
    1. Stack to be deployed in NEW VPC? (true/false) – if false, you MUST provide VPCCidr and other details under Existing VPC section (Default is true)
    2. VPCCidr – Default is 10.0.0.0/16 for a new VPC. You can have any valid values as per your environment. If deploying in an existing VPC, provide the CIDR for the same
  8. Network Configuration – For New VPC
    1. PrivateSubnetMSKOneCidr (Default is 10.0.1.0/24)
    2. PrivateSubnetMSKTwoCidr (Default is 10.0.2.0/24)
    3. PrivateSubnetMSKThreeCidr (Default is 10.0.3.0/24)
    4. PublicOneCidr (Default is 10.0.0.0/24)
  9. Network Configuration – For Existing VPC (You need at least 4 subnets)
    1. VpcId – Provide the value if you are using any existing VPC to deploy the resources else leave it blank(default)
    2. SubnetID1 – Any one of the existing subnets from the given VPCID
    3. SubnetID2 – Any one of the existing subnets from the given VPCID
    4. SubnetID3 – Any one of the existing subnets from the given VPCID
    5. PublicSubnetID – Any one of the existing subnets from the given VPCID
  10. Leave the remaining parameters as default and choose Next.

Specifying the parameter details for MSKDemo stack

  1. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  2. Choose Create stack.

You’re redirected to the AWS CloudFormation console, and can see the status as CREATE_IN_PROGRESSS. Wait until the status changes to COMPLETE.

CloudFormation stack creation status

  1. On the stack’s Outputs tab, note the values for the following (if you don’t see anything under Outputs tab, click on refresh icon):
    1. KafkaClientPrivateIP
    2. PrometheusInstancePrivateIP

Set up your AWS Cloud9 environment

Follow the steps as outlined in the previous section to configure your AWS Cloud9 environment.

Retrieve the cluster broker list

To get your MSK cluster broker list, complete the following steps:

  1. On the Amazon MSK console, navigate to your cluster.
  2. In the Cluster summary section, choose View client information.
  3. In the Bootstrap servers section, copy the private endpoint.

You need this value to perform some operations later, such as creating an MSK topic, producing sample messages, and consuming those sample messages.

  1. Choose Done.
  2. On the Properties tab, in the Brokers details section, note the endpoints listed.

These need to be updated in the targets.json file (used for Prometheus configuration in a later step).

3. Enable IAM Identity Center

Before you deploy the CloudFormation stack for Amazon Managed Service for Prometheus and Amazon Managed Grafana, make sure to enable IAM Identity Center.

If you don’t use IAM Identity Center, alternatively, you can set up user authentication via SAML. For more information, refer to Using SAML with your Amazon Managed Grafana workspace.

If IAM Identity Center is currently enabled/configured in another region, you don’t need to enable in your current region.

Complete the following steps to enable IAM Identity Center:

  1. On the IAM Identity Center console, under Enable IAM Identity Center, choose Enable.

Enabling IAM Identity Center

  1. Choose Create AWS organization.

4. Configure Amazon Managed Grafana and Amazon Managed Service for Prometheus

Complete the steps in this section to set up Amazon Managed Service for Prometheus and Amazon Managed Grafana.

Deploy CloudFormation template

Complete the following steps to deploy the CloudFormation stack AMG_AMP_Cloudformation:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose Create stack.
  3. For Prepare template, select Template is ready.
  4. For Template source, select Upload a template.
  5. Upload the AMG_AMP_Cloudformation.yml file, then choose Next.
  6. For Stack name, enter ManagedPrometheusAndGrafanaStack, then choose Next.
  7. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create stack.

You’re redirected to the AWS CloudFormation console, and can see the status as CREATE_IN_PROGRESSS. Wait until the status changes to COMPLETE.

  1. On the stack’s Outputs tab, note the values for the following (if you don’t see anything under Outputs tab, click on refresh icon):
    1. GrafanaWorkspaceURL – This is Amazon Managed Grafana URL
    2. PrometheusEndpointWriteURL – This is the Amazon Managed Service for Prometheus write endpoint URL

Outputs information for ManagedPromethusAndGrafana Stack

Create a user for Amazon Managed Grafana

Complete the following steps to create a user for Amazon Managed Grafana:

  1. On the IAM Identity Center console, choose Users in the navigation pane.
  2. Choose Add user.
  3. For Username, enter grafana-admin.
  4. Enter and confirm your email address to receive a confirmation email.

IAM Identity Center - Specifying user information

  1. Skip the optional steps, then choose Add user.

A success message appears at the top of the console.

  1. In the confirmation email, choose Accept invitation and set your user password.

Screenshot showing Invitation link for the new user

  1. On the Amazon Managed Grafana console, choose Workspaces in the navigation pane.
  2. Open the workspace Amazon-Managed-Grafana.
  3. Make a note of the Grafana workspace URL.

You use this URL to log in to view your Grafana dashboards.

  1. On the Authentication tab, choose Assign new user or group.

Amazon Managed Grafana - Workspace information - Assigning user

  1. Select the user you created earlier and choose Assign users and groups.

Amazon Managed Grafana - Assigning user to workspace

  1. On the Action menu, choose what kind of user to make it: admin, editor, or viewer.

Note that your Grafana workspace needs as least one admin user.

  1. Navigate to the Grafana URL you copied earlier in your browser.
  2. Choose Sign in with AWS IAM Identity Center.

Amazon Managed Grafana URL landing page

  1. Log in with your IAM Identity Center credentials.

5. Configure Prometheus and start the service

When you cloned the GitHub repo, you downloaded two configuration files: prometheus.yml and targets.json. In this section, we configure these two files.

  1. Use any IDE (Visual Studio Code or Notepad++) to open prometheus.yml.
  2. In the remote_write section, update the remote write URL and Region.

prometheus.yml file

  1. Use any IDE to open targets.json.
  2. Update the targets with the broker endpoints you obtained earlier.

targets.json file

  1. In your AWS Cloud9 environment, choose File, then Upload Local Files.
  2. Choose Select Files and upload targets.json and prometheus.yml from your local machine.

Upload file dialog page within AWS Cloud9 Environment

  1. In the AWS Cloud9 environment, run the following command using the key file you created earlier:
    ssh -i /home/ec2-user/environment/EC2KeyMSKDemo ec2-user@<Private-IP-of-Prometheus instance> mkdir -p /home/ec2-user/Prometheus

  2. copy targets.json to the Prometheus server:
    scp -i /home/ec2-user/environment/EC2KeyMSKDemo targets.json ec2-user@<Private-IP-of-Prometheus-instance>:/home/ec2-user/prometheus/

  3. copy prometheus.yml to the Prometheus server:
    scp -i /home/ec2-user/environment/EC2KeyMSKDemo prometheus.yml ec2-user@<Private-IP-of-Prometheus-instance>:/home/ec2-user/prometheus/

  4. SSH into the Prometheus server and start the container service for Prometheus
    ssh -i /home/ec2-user/environment/EC2KeyMSKDemo ec2-user@<Private-IP-of-Prometheus instance>

  5. start the prometheus container
    sudo docker run -d -p 9090:9090 --name=prometheus -v /home/ec2-user/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml -v /home/ec2-user/prometheus/targets.json:/etc/prometheus/targets.json prom/prometheus --config.file=/etc/prometheus/prometheus.yml

  6. Check if the Docker service is running:

Start the prometheus container inside Prometheus Server

6. Configure data sources in Amazon Managed Grafana

To configure your data sources, complete the following steps:

  1. Log in to the Amazon Managed Grafana URL.
  2. Choose AWS Data Services in the navigation pane, then choose Data Sources.

Amazon Managed Grafana - Configuring data source

  1. For Service, choose Amazon Managed Service for Prometheus.
  2. For Region, choose your Region.

The correct resource ID is populated automatically.

  1. Select your resource ID and choose Add 1 data source.

Configuring data source for Amazon managed Prometheus

  1. Choose Go to settings.

Configuring data source for Amazon managed Prometheus

  1. For Name, enter Amazon Managed Prometheus and enable Default.

The URL is automatically populated.

  1. Leave everything else as default.

Configuring data source for Amazon managed Prometheus

  1. Choose Save & Test.

If everything is correct, the message Data source is working appears.

Now we configure CloudWatch as a data source.

  1. Choose AWS Data Services, then choose Data source.
  2. For Services, choose CloudWatch.
  3. For Region, choose your correct Region.
  4. Choose Add data source.
  5. Select the CloudWatch data source and choose Go to settings.

Configuring data source with Service as CloudWatch

  1. For Name, enter AmazonMSK-CloudWatch.
  2. Choose Save & Test.

7. Import the Grafana dashboard

You can use the following preconfigured dashboards, which are available to download from the GitHub repo:

  • Kafka Metrics
  • MSK Cluster Overview
  • AWS MSK – Kafka Cluster-CloudWatch

To import your dashboard, complete the following steps:

  1. In Amazon Managed Grafana, choose the plus sign in the navigation pane.
  2. Choose Import.

Importing preconfigured dashboard

  1. Choose Upload JSON file.
  2. Choose the dashboard you downloaded.
  3. Choose Load.

Configuring dashboard by importing the json file

The following screenshot shows your loaded dashboard.

Dashboard showing cluster level metrics

Generate sample data in Amazon MSK (Optional – when you create a new Amazon MSK Cluster)

To generate sample data in Amazon MSK, complete the following steps:

  1. In your AWS Cloud9 environment, log in to the Kafka client.
    ssh -i /home/ec2-user/environment/EC2KeyMSKDemo ec2-user@<Private-IP-of-Kafka Client>

  2. Set the broker endpoint variable
    export MYBROKERS="<Bootstrap Servers Endpoint – captured earlier>"

  3. Run the following command to create a topic called TLSTestTopic60:
    /home/ec2-user/kafka/bin/kafka-topics.sh --command-config /home/ec2-user/kafka/config/client.properties --bootstrap-server $MYBROKERS --create --topic TLSTestTopic60 --partitions 5 --replication-factor 2

  4. Still logged in to the Kafka client, run the following command to start the producer service:
    /home/ec2-user/kafka/bin/kafka-console-producer.sh --producer.config /home/ec2-user/kafka/config/client.properties --broker-list $MYBROKERS --topic TLSTestTopic60

Creating topic and producing messages in Kafka Producer service

  1. Open a new terminal from within your AWS Cloud9 environment and log in to the Kafka client instance
    ssh -i /home/ec2-user/environment/EC2KeyMSKDemo ec2-user@<Private-IP-of-Kafka Client>

  2. Set the broker endpoint variable
    export MYBROKERS="<Bootstrap Servers Endpoint – captured earlier>"

  3. Now you can start the consumer service and see the incoming messages
    /home/ec2-user/kafka/bin/kafka-console-consumer.sh --consumer.config /home/ec2-user/kafka/config/client.properties --bootstrap-server $MYBROKERS --topic TLSTestTopic60 --from-beginning

Starting Kafka consumer service

  1. Press CTRL+C to stop the producer/consumer service.

Kafka metrics dashboards on Amazon Managed Grafana

You can now view your Kafka metrics dashboards on Amazon Managed Grafana:

  • Cluster overall health – Configured using Amazon Managed Service for Prometheus as the data source:
    • Critical metrics

Amazon MSK cluster overview – Configured using Amazon Managed Service for Prometheus as the data source:

  • Critical metrics
  • Cluster throughput (broker-level metrics)

  • Cluster metrics (JVM)

Kafka cluster operation metrics – Configured using CloudWatch as the data source:

  • General overall stats

  • CPU and Memory metrics

Clean up

You will continue to incur costs until you delete the infrastructure that you created for this post. Delete the CloudFormation stack you used to create the respective resources.

If you used an existing cluster, make sure to remove the inbound rules you updated in the security group (otherwise the stack deletion will fail).

  1. On the Amazon MSK console, navigate to your existing cluster.
  2. On the Properties tab, in the Networking settings section, open the security group you applied.
  3. Choose Edit inbound rules.

  1. Choose Delete to remove the rules you added.
  2. Choose Save rules.

Now you can delete your CloudFormation stacks.

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Select ManagedPrometheusAndGrafana and choose Delete.
  3. If you used an existing Amazon MSK cluster, delete the stack Prometheus.
  4. If you created a new Amazon MSK cluster, delete the stack MSKDemo.

Conclusion

This post showed how you can deploy a fully managed, highly available, scalable, and secure monitoring system for Amazon MSK using Amazon Managed Service for Prometheus and Amazon Managed Grafana, and use Grafana dashboards to gain deep insights into various operational metrics. Although this post only discussed using Amazon Managed Service for Prometheus and CloudWatch as the data sources in Amazon Managed Grafana, you can enable various other data sources, such as AWS IoT SiteWise, AWS X-Ray, Redshift, and Amazon Athena, and build a dashboard on top of those metrics. You can use these managed services for monitoring any number of Amazon MSK platforms. Metrics are available to query in Amazon Managed Grafana or Amazon Managed Service for Prometheus in near-real time.

You can use this post as prescriptive guidance and deploy an observability solution for a new or an existing Amazon MSK cluster, identify the metrics that are important for your applications and then create a dashboard using Amazon Managed Grafana and Prometheus.


About the Authors

Anand Mandilwar is an Enterprise Solutions Architect at AWS. He works with enterprise customers helping customers innovate and transform their business in AWS. He is passionate about automation around Cloud operation , Infrastructure provisioning and Cloud Optimization. He also likes python programming. In his spare time, he enjoys honing his photography skill especially in Portrait and landscape area.

Ajit Puthiyavettle is a Solution Architect working with enterprise clients, architecting solutions to achieve business outcomes. He is passionate about solving customer challenges with innovative solutions. His experience is with leading DevOps and security teams for enterprise and SaaS (Software as a Service) companies. Recently he is focussed on helping customers with Security, ML and HCLS workload.

Create more partitions and retain data for longer in your MSK Serverless clusters

Post Syndicated from Usama Naseem original https://aws.amazon.com/blogs/big-data/create-more-partitions-and-retain-data-for-longer-in-your-msk-serverless-clusters/

In April 2022, Amazon Managed Streaming for Apache Kafka (Amazon MSK) launched an exciting new capability, Amazon MSK Serverless. Amazon MSK is a fully managed service for Apache Kafka that makes it easier for developers to build and run highly available, secure, and scalable applications based on Apache Kafka. With MSK Serverless, developers can run their applications without having to provision, configure, or optimize their Apache Kafka clusters. MSK Serverless automatically provisions and scales compute and storage resources, so developers have access to on-demand streaming capacity and storage.

Over the remainder of 2022, the team collected customer feedback and worked backward from customer requirements to add new capabilities that made MSK Serverless even better. In this post, we discuss a few of these enhancements in detail and provide an example use case.

Higher default quota for partitions in a cluster

Data in Apache Kafka is written to topics, which can be partitioned into multiple log files called partitions. When a producer application writes data to a topic, it is appended to one of these partitions. MSK Serverless launched with a maximum quota of 120 partitions per cluster. However, our customers told us that they needed more partitions per cluster for a variety of use cases, ranging from change data capture (CDC) to faster real-time data processing.

In December 2022, we increased the default quota for partitions for MSK Serverless clusters. With the increased quota, you can create up to 2,400 partitions per cluster. The 20-fold increase in the number of partitions you can have per cluster lets you create more topics per cluster and have more applications consume data in parallel. You can also implement better isolation of data with fine-grained access control. More partitions are particularly useful for CDC use cases where each table in the database has hundreds of unqiue keys, which are each mapped to a unique partition. With more partitions, you can use MSK Serverless for capturing changes in larger databases with lots of tables and hundreds of keys. Note that the 2,400 limit only applies to leader partitions. MSK Serverless creates two replicas of each partition by default at no additional cost that don’t count towards this limit.

Unlimited data retention duration

The data you produce to your topics can be retained in Apache Kafka for a configurable duration, depending on how long you need to access data using Apache Kafka consumer APIs. Typically, customers retain data for short periods of time, ranging from a few hours to a few days. Previously, MSK Serverless limited data retention to a maximum of 24 hours (1 day), which is sufficient for most popular Apache Kafka use cases. However, some use cases require customers to retain data for longer, such as retaining data for audit purposes or maintaining application recovery SLAs.

Now, with the increase in the data retention duration quota, you can retain data for as long as you need in your MSK Serverless clusters. Longer data retention is particularly useful for use cases where your consumer applications need quick access to older data. For instance, in the case of a failure, the application may need to access data from the start of the topic to reconstruct its state. Because you can now retain data in your topics for longer durations, you can restore your application’s state by accessing older data using Kafka’s consumer API, making it easier to recover from such failures. After the application recovers, you can configure your application to start consuming the data from the earliest timestamp you need to reestablish your application’s state. Note that you can only retain up to 250 GB of data per partition. As long as your partition doesn’t reach 250 GB in size, you may retain it for as long as you wish. You may create more partitions if you need more storage for a given topic.

These new quotas are available in all Regions where MSK Serverless is available. For more information, navigate to the MSK Serverless tab on the Amazon MSK pricing page and choose the Region drop-down menu.

You can also request an increase to the maximum number of partitions quota by contacting AWS Support if you need more than 2,400 partitions in a cluster. The quotas for more partitions and longer retention are applied to both existing and new clusters.

Getting started: Create a topic with 1,000 partitions and 7-day retention

In this section, we demonstrate how to create a topic in MSK Serverless, specify the number of partitions, and set its retention duration.

As a prerequisite, you must have an MSK Serverless cluster and an Apache Kafka client. Refer to Getting started using MSK Serverless clusters for step-by-step instructions.

  1. On your client machine, access kafka_2.12-2.8.1/bin and run the following export command (replace the ‘my-endpoint’ with the bootstrap server string of your MSK Serverless cluster):
    export BS=my-endpoint

  2. Run the following command to create a topic called msk-sample-topic with 1,000 partitions and 7-day data retention (604,800,000 milliseconds):
    <path-to-your-kafka-installation>/bin/kafka-topics.sh 
    --bootstrap-server $BS 
    --command-config client.properties 
    --create 
    --topic msk-sample-topic 
    --partitions 1000 
    --config retention.ms=604800000

  3. (Optional) Run the following command to view the details of the topic you created in step 2 above:
    <path-to-your-kafka-installation>/bin/kafka-topics.sh 
    --bootstrap-server $BS
    --command-config client.properties 
    --topic msk-sample-topic 
    --describe | head -n1

    You will see the following result:

    Topic: msk-sample-topic TopicId: Ze76LY9EQuiH0xOIenx_HA PartitionCount: 1000ReplicationFactor: 3    Configs: min.insync.replicas=2,segment.bytes=134217728,retention.ms=604800000,message.format.version=2.8-IV2,unclean.leader.election.enable=false,retention.bytes=268435456000

Clean up

To avoid incurring charges on the AWS resources created in this post, delete the MSK Serverless cluster and the Amazon Elastic Compute Cloud (Amazon EC2) instance for your client machine.

  1. On the Amazon MSK console, select the MSK Serverless cluster you used for this solution.
  2. Choose Actions, then choose Delete.
  3. On the Amazon EC2 console, select the instance that you created for your Apache Kafka client machine.
  4. Choose Instance state, then choose Terminate instance.

Conclusion

This post demonstrated how to create an MSK Serverless cluster topic with 1,000 partitions and 7-day retention. With the new quota increases, you can create up to 2,400 partitions per cluster and retain data for as long as you need. If you have comments or feedback, please feel free to leave them in the comments.


About the author

Usama Naseem is a Senior Product Manager for Amazon MSK and focuses on MSK Serverless. Previously, he held product management roles for AWS Lambda and Amazon Fresh. He is passionate about giving customers the tools to build real-time applications in the cloud. Outside of work, he continues to be under the delusion that he will be the best squash player in the world one day.

Build a serverless streaming pipeline with Amazon MSK Serverless, Amazon MSK Connect, and MongoDB Atlas

Post Syndicated from Igor Alekseev original https://aws.amazon.com/blogs/big-data/build-a-serverless-streaming-pipeline-with-amazon-msk-serverless-amazon-msk-connect-and-mongodb-atlas/

This post was cowritten with Babu Srinivasan and Robert Walters from MongoDB.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available Apache Kafka service. Amazon MSK makes it easy to ingest and process streaming data in real time and use that data easily within the AWS ecosystem. With Amazon MSK Serverless, you can automatically provision and manage required resources to provide on-demand streaming capacity and storage for your applications.

Amazon MSK also supports integration of data sources such as MongoDB Atlas via Amazon MSK Connect. MSK Connect allows serverless integration of MongoDB data with Amazon MSK using the MongoDB Connector for Apache Kafka.

MongoDB Atlas Serverless provides database services that dynamically scale up and down with data size and throughput—and the cost scales accordingly. It’s best suited for applications with variable demands to be managed with minimal configuration. It provides high performance and reliability with automated upgrade, encryption, security, metrics, and backup features built in with the MongoDB Atlas infrastructure.

MSK Serverless is a type of cluster for Amazon MSK. Just like MongoDB Atlas Serverless, MSK Serverless automatically provisions and scales compute and storage resources. You can now create end-to-end serverless workflows. You can build a serverless streaming pipeline with serverless ingestion using MSK Serverless and serverless storage using MongoDB Atlas. In addition, MSK Connect now supports private DNS hostnames. This allows Serverless MSK instances to connect to Serverless MongoDB clusters via AWS PrivateLink, providing you with secure connectivity between platforms.

If you’re interested in using a non-serverless cluster, refer to Integrating MongoDB with Amazon Managed Streaming for Apache Kafka (MSK).

This post demonstrates how to implement a serverless streaming pipeline with MSK Serverless, MSK Connect, and MongoDB Atlas.

Solution overview

The following diagram illustrates our solution architecture.

Data flow between AWS MSK and MongoDB Atlas

The data flow starts with an Amazon Elastic Compute Cloud (Amazon EC2) client instance that writes records to an MSK topic. As data arrives, an instance of the MongoDB Connector for Apache Kafka writes the data to a collection in the MongoDB Atlas Serverless cluster. For secure connectivity between the two platforms, an AWS PrivateLink connection is created between the MongoDB Atlas cluster and the VPC containing the MSK instance.

This post walks you through the following steps:

  1. Create the serverless MSK cluster.
  2. Create the MongoDB Atlas Serverless cluster.
  3. Configure the MSK plugin.
  4. Create the EC2 client.
  5. Configure an MSK topic.
  6. Configure the MongoDB Connector for Apache Kafka as a sink.

Configure the serverless MSK cluster

To create a serverless MSK cluster, complete the following steps:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Choose Create cluster.
  3. For Creation method, select Custom create.
  4. For Cluster name, enter MongoDBMSKCluster.
  5. For Cluster type¸ select Serverless.
  6. Choose Next.Serverless MSK Cluster creation UI
  7. On the Networking page, specify your VPC, Availability Zones, and corresponding subnets.
  8. Note the Availability Zones and subnets to use later.Cluster settings showing VPC and Subnets
  9. Choose Next.
  10. Choose Create cluster.

When the cluster is available, its status becomes Active.

Cluster Available for Use

Create the MongoDB Atlas Serverless cluster

To create a MongoDB Atlas cluster, follow the Getting Started with Atlas tutorial. Note that for the purposes of this post, you need to create a serverless instance.

Create new cluster dialog

After the cluster is created, configure an AWS private endpoint with the following steps:

  1. On the Security menu, choose Network Access.Network Access location in the Security menu
  2. On the Private Endpoint tab, choose Serverless Instance.
    Serverless Instance network access
  3. Choose Create new endpoint.
  4. For Serverless Instance, choose the instance you just created.
  5. Choose Confirm.Create Private Endpoint UI
  6. Provide your VPC endpoint configuration and choose Next.VPC Endpoint Configuration UI
  7. When creating the AWS PrivateLink resource, make sure you specify the exact same VPC and subnets that you used earlier when creating the networking configuration for the serverless MSK instance.
  8. Choose Next.VPC Endpoint Subnet Configuration UI
  9. Follow the instructions on the Finalize page, then choose Confirm after your VPC endpoint is created.

Upon success, the new private endpoint will show up in the list, as shown in the following screenshot.

Network Access Confirmation Page

Configure the MSK Plugin

Next, we create a custom plugin in Amazon MSK using the MongoDB Connector for Apache Kafka. The connector needs to be uploaded to an Amazon Simple Storage Service (Amazon S3) bucket before you can create the plugin. To download the MongoDB Connector for Apache Kafka, refer to Download a Connector JAR File.

  1. On the Amazon MSK console, choose Customized plugins in the navigation pane.
  2. Choose Create custom plugin.
  3. For S3 URI, enter the S3 location of the downloaded connector.
  4. Choose Create custom plugin.

MSK plugin details

Configure an EC2 client

Next, let’s configure an EC2 instance. We use this instance to create the topic and insert data into the topic. For instructions, refer to the section Configure an EC2 client in the post Integrating MongoDB with Amazon Managed Streaming for Apache Kafka (MSK).

Create a topic on the MSK cluster

To create a Kafka topic, we need to install the Kafka CLI first.

  1. On the client EC2 instance, first install Java:

sudo yum install java-1.8.0

  1. Next, run the following command to download Apache Kafka:

wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz

  1. Unpack the tar file using the following command:

tar -xzf kafka_2.12-2.6.2.tgz

The distribution of Kafka includes a bin folder with tools that can be used to manage topics.

  1. Go to the kafka_2.12-2.6.2 directory and issue the following command to create a Kafka topic on the serverless MSK cluster:

bin/kafka-topics.sh --create --topic sandbox_sync2 --bootstrap-server <BOOTSTRAP SERVER> --command-config=bin/client.properties --partitions 2

You can copy the bootstrap server endpoint on the View Client Information page for your serverless MSK cluster.

Bootstrap Server Connection Page

You can configure IAM authentication by following these instructions.

Configure the sink connector

Now, let’s configure a sink connector to send the data to the MongoDB Atlas Serverless instance.

  1. On the Amazon MSK console, choose Connectors in the navigation pane.
  2. Choose Create connector.
  3. Select the plugin you created earlier.
  4. Choose Next.Sink Connector UI
  5. Select the serverless MSK instance that you created earlier.
  6. Enter your connection configuration as the following code:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
key.converter.schema.enable=false
value.converter.schema.enable=false
database=MongoDBMSKDemo
collection=Sink
tasks.max=1
topics=MongoDBMSKDemo.Source
connection.uri=(MongoDB Atlas Connection String Gos Here) 
value.converter=org.apache.kafka.connect.storage.StringConverter 
key.converter=org.apache.kafka.connect.storage.StringConverter

Make sure that the connection to the MongoDB Atlas Serverless instance is through AWS PrivateLink. For more information, refer to Connecting Applications Securely to a MongoDB Atlas Data Plane with AWS PrivateLink.

  1. In the Access Permissions section, create an AWS Identity and Access Management (IAM) role with the required trust policy.
  2. Choose Next.IAM Role configuration
  3. Specify Amazon CloudWatch Logs as your log delivery option.
  4. Complete your connector.

When the connector status changes to Active, the pipeline is ready.

Connector Confirmation Page

Insert data into the MSK topic

On your EC2 client, insert data into the MSK topic using the kafka-console-producer as follows:

bin/kafka-console-producer.sh --topic sandbox_sync2 --bootstrap-server <BOOTSTRAP SERVER> --producer.config=bin/client.properties

To verify that data successfully flows from the Kafka topic to the serverless MongoDB cluster, we use the MongoDB Atlas UI.

MongoDB Atlas Browse Collections UI

If you run into any issues, be sure to check the log files. In this example, we used CloudWatch to read the events that were generated from Amazon MSK and the MongoDB Connector for Apache Kafka.

CloudWatch Logs UI

Clean up

To avoid incurring future charges, clean up the resources you created. First, delete the MSK cluster, connector, and EC2 instance:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Select your cluster and on the Actions menu, choose Delete.
  3. Choose Connectors in the navigation pane.
  4. Select your connector and choose Delete.
  5. Choose Customized plugins in the navigation pane.
  6. Select your plugin and choose Delete.
  7. On the Amazon EC2 console, choose Instances in the navigation pane.
  8. Choose the instance you created.
  9. Choose Instance state, then choose Terminate instance.
  10. On the Amazon VPC console, choose Endpoints in the navigation pane.
  11. Select the endpoint you created and on the Actions menu, choose Delete VPC endpoints.

Now you can delete the Atlas cluster and AWS PrivateLink:

  1. Log in to the Atlas cluster console.
  2. Navigate to the serverless cluster to be deleted.
  3. On the options drop-down menu, choose Terminate.
  4. Navigate to the Network Access section.
  5. Choose the private endpoint.
  6. Select the serverless instance.
  7. On the options drop-down menu, choose Terminate.Endpoint Termination UI

Summary

In this post, we showed you how to build a serverless streaming ingestion pipeline using MSK Serverless and MongoDB Atlas Serverless. With MSK Serverless, you can automatically provision and manage required resources on an as-needed basis. We used a MongoDB connector deployed on MSK Connect to seamlessly integrate the two services, and used an EC2 client to send sample data to the MSK topic. MSK Connect now supports Private DNS hostnames, enabling you to use private domain names between the services. In this post, the connector used the default DNS servers of the VPC to resolve the Availability Zone-specific private DNS name. This AWS PrivateLink configuration allowed secure and private connectivity between the MSK Serverless instance and the MongoDB Atlas Serverless instance.

To continue your learning, check out the following resources:


About the Authors

Igor Alekseev is a Senior Partner Solution Architect at AWS in Data and Analytics domain. In his role Igor is working with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect he implemented many projects in Big Data domain, including several data lakes in Hadoop ecosystem. As a Data Engineer he was involved in applying AI/ML to fraud detection and office automation.

Kiran Matty is a Principal Product Manager with Amazon Web Services (AWS) and works with the Amazon Managed Streaming for Apache Kafka (Amazon MSK) team based out of Palo Alto, California. He is passionate about building performant streaming and analytical services that help enterprises realize their critical use cases.

 Babu Srinivasan is a Senior Partner Solutions Architect at MongoDB. In his current role, he is working with AWS to build the technical integrations and reference architectures for the AWS and MongoDB solutions. He has more than two decades of experience in Database and Cloud technologies . He is passionate about providing technical solutions to customers working with multiple Global System Integrators(GSIs) across multiple geographies.

Robert Walters is currently a Senior Product Manager at MongoDB. Previous to MongoDB, Rob spent 17 years at Microsoft working in various roles, including program management on the SQL Server team, consulting, and technical pre-sales. Rob has co-authored three patents for technologies used within SQL Server and was the lead author of several technical books on SQL Server. Rob is currently an active blogger on MongoDB Blogs.

Near-real-time fraud detection using Amazon Redshift Streaming Ingestion with Amazon Kinesis Data Streams and Amazon Redshift ML

Post Syndicated from Praveen Kadipikonda original https://aws.amazon.com/blogs/big-data/near-real-time-fraud-detection-using-amazon-redshift-streaming-ingestion-with-amazon-kinesis-data-streams-and-amazon-redshift-ml/

The importance of data warehouses and analytics performed on data warehouse platforms has been increasing steadily over the years, with many businesses coming to rely on these systems as mission-critical for both short-term operational decision-making and long-term strategic planning. Traditionally, data warehouses are refreshed in batch cycles, for example, monthly, weekly, or daily, so that businesses can derive various insights from them.

Many organizations are realizing that near-real-time data ingestion along with advanced analytics opens up new opportunities. For example, a financial institute can predict if a credit card transaction is fraudulent by running an anomaly detection program in near-real-time mode rather than in batch mode.

In this post, we show how Amazon Redshift can deliver streaming ingestion and machine learning (ML) predictions all in one platform.

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL.

Amazon Redshift ML makes it easy for data analysts and database developers to create, train, and apply ML models using familiar SQL commands in Amazon Redshift data warehouses.

We’re excited to launch Amazon Redshift Streaming Ingestion for Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), which enables you to ingest data directly from a Kinesis data stream or Kafka topic without having to stage the data in Amazon Simple Storage Service (Amazon S3). Amazon Redshift streaming ingestion allows you to achieve low latency in the order of seconds while ingesting hundreds of megabytes of data into your data warehouse.

This post demonstrates how Amazon Redshift, the cloud data warehouse allows you to build near-real-time ML predictions by using Amazon Redshift streaming ingestion and Redshift ML features with familiar SQL language.

Solution overview

By following the steps outlined in this post, you’ll be able to set up a producer streamer application on an Amazon Elastic Compute Cloud (Amazon EC2) instance that simulates credit card transactions and pushes data to Kinesis Data Streams in real time. You set up an Amazon Redshift Streaming Ingestion materialized view on Amazon Redshift, where streaming data is received. You train and build a Redshift ML model to generate real-time inferences against the streaming data.

The following diagram illustrates the architecture and process flow.

The step-by-step process is as follows:

  1. The EC2 instance simulates a credit card transaction application, which inserts credit card transactions into the Kinesis data stream.
  2. The data stream stores the incoming credit card transaction data.
  3. An Amazon Redshift Streaming Ingestion materialized view is created on top of the data stream, which automatically ingests streaming data into Amazon Redshift.
  4. You build, train, and deploy an ML model using Redshift ML. The Redshift ML model is trained using historical transactional data.
  5. You transform the streaming data and generate ML predictions.
  6. You can alert customers or update the application to mitigate risk.

This walkthrough uses credit card transaction streaming data. The credit card transaction data is fictitious and is based on a simulator. The customer dataset is also fictitious and is generated with some random data functions.

Prerequisites

  1. Create an Amazon Redshift cluster.
  2. Configure the cluster to use Redshift ML.
  3. Create an AWS Identity and Access Management (IAM) user.
  4. Update the IAM role attached to the Redshift cluster to include permissions to access the Kinesis data stream. For more information about the required policy, refer to Getting started with streaming ingestion.
  5. Create an m5.4xlarge EC2 instance. We tested Producer application with m5.4xlarge instance but you are free to use other instance type. When creating the instance, use the amzn2-ami-kernel-5.10-hvm-2.0.20220426.0-x86_64-gp2 AMI.
  6. To make sure that Python3 is installed in the EC2 instance, run the following command to verity your Python version (note that the data extraction script only works on Python 3):
python3 --version
  1. Install the following dependent packages to run the simulator program:
sudo yum install python3-pip
pip3 install numpy
pip3 install pandas
pip3 install matplotlib
pip3 install seaborn
pip3 install boto3
  1. Configure Amazon EC2 using the variables like AWS credentials generated for IAM user created in step 3 above. The following screenshot shows an example using aws configure.

Set up Kinesis Data Streams

Amazon Kinesis Data Streams is a massively scalable and durable real-time data streaming service. It can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more. We use Kinesis Data Streams because it’s a serverless solution that can scale based on usage.

Create a Kinesis data stream

First, you need to create a Kinesis data stream to receive the streaming data:

  1. On the Amazon Kinesis console, choose Data streams in the navigation pane.
  2. Choose Create data stream.
  3. For Data stream name, enter cust-payment-txn-stream.
  4. For Capacity mode, select On-demand.
  5. For the rest of the options, choose the default options and follow through the prompts to complete the setup.
  6. Capture the ARN for the created data stream to use in the next section when defining your IAM policy.

Streaming ARN Highlight

Set up permissions

For a streaming application to write to Kinesis Data Streams, the application needs to have access to Kinesis. You can use the following policy statement to grant the simulator process that you set up in next section access to the data stream. Use the ARN of the data stream that you saved in the previous step.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt123",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:PutRecord",
"kinesis:PutRecords",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListShards",
"kinesis:DescribeStreamSummary"
],
"Resource": [
"arn:aws:kinesis:us-west-2:xxxxxxxxxxxx:stream/cust-payment-txn-stream"
]
}
]
}

Configure the stream producer

Before we can consume streaming data in Amazon Redshift, we need a streaming data source that writes data to the Kinesis data stream. This post uses a custom-built data generator and the AWS SDK for Python (Boto3) to publish the data to the data stream. For setup instructions, refer to Producer Simulator. This simulator process publishes streaming data to the data stream created in the previous step (cust-payment-txn-stream).

Configure the stream consumer

This section talks about configuring the stream consumer (the Amazon Redshift streaming ingestion view).

Amazon Redshift Streaming Ingestion provides low-latency, high-speed ingestion of streaming data from Kinesis Data Streams into an Amazon Redshift materialized view. You can configure your Amazon Redshift cluster to enable streaming ingestion and create a materialized view with auto refresh, using SQL statements, as described in Creating materialized views in Amazon Redshift. The automatic materialized view refresh process will ingest streaming data at hundreds of megabytes of data per second from Kinesis Data Streams into Amazon Redshift. This results in fast access to external data that is quickly refreshed.

After creating the materialized view, you can access your data from the data stream using SQL and simplify your data pipelines by creating materialized views directly on top of the stream.

Complete the following steps to configure an Amazon Redshift streaming materialized view:

  1. On the IAM console, choose policies in the navigation pane.
  2. Choose Create policy.
  3. Create a new IAM policy called KinesisStreamPolicy.  For the streaming policy definition, see Getting started with streaming ingestion.
  4. In the navigation pane, choose Roles.
  5. Choose Create role.
  6. Select AWS service and choose Redshift and Redshift customizable.
  7. Create a new role called redshift-streaming-role and attach the policy KinesisStreamPolicy.
  8. Create an external schema to map to Kinesis Data Streams :
CREATE EXTERNAL SCHEMA custpaytxn
FROM KINESIS IAM_ROLE 'arn:aws:iam::386xxxxxxxxx:role/redshift-streaming-role';

Now you can create a materialized view to consume the stream data. You can use the SUPER data type to store the payload as is, in JSON format, or use Amazon Redshift JSON functions to parse the JSON data into individual columns. For this post, we use the second method because the schema is well defined.

  1. Create the streaming ingestion materialized view cust_payment_tx_stream. By specifying AUTO REFRESH YES in the following code, you can enable automatic refresh of the streaming ingestion view, which saves time by avoiding building data pipelines:
CREATE MATERIALIZED VIEW cust_payment_tx_stream
AUTO REFRESH YES
AS
SELECT approximate_arrival_timestamp ,
partition_key,
shard_id,
sequence_number,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TRANSACTION_ID')::bigint as TRANSACTION_ID,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_DATETIME')::character(50) as TX_DATETIME,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'CUSTOMER_ID')::int as CUSTOMER_ID,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TERMINAL_ID')::int as TERMINAL_ID,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_AMOUNT')::decimal(18,2) as TX_AMOUNT,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_TIME_SECONDS')::int as TX_TIME_SECONDS,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'TX_TIME_DAYS')::int as TX_TIME_DAYS
FROM custpaytxn."cust-payment-txn-stream"
Where is_utf8(kinesis_data) AND can_json_parse(kinesis_data);

Note that json_extract_path_text has a length limitation of 64 KB. Also from_varbye filters records larger than 65KB.

  1. Refresh the data.

The Amazon Redshift streaming materialized view is auto refreshed by Amazon Redshift for you. This way, you don’t need worry about data staleness. With materialized view auto refresh, data is automatically loaded into Amazon Redshift as it becomes available in the stream. If you choose to manually perform this operation, use the following command:

REFRESH MATERIALIZED VIEW cust_payment_tx_stream ;
  1. Now let’s query the streaming materialized view to see sample data:
Select * from cust_payment_tx_stream limit 10;

  1. Let’s check how many records are in the streaming view now:
Select count(*) as stream_rec_count from cust_payment_tx_stream;

Now you have finished setting up the Amazon Redshift streaming ingestion view, which is continuously updated with incoming credit card transaction data. In my setup, I see that around 67,000 records have been pulled into the streaming view at the time when I ran my select count query. This number could be different for you.

Redshift ML

With Redshift ML, you can bring a pre-trained ML model or build one natively. For more information, refer to Using machine learning in Amazon Redshift.

In this post, we train and build an ML model using a historical dataset. The data contains a tx_fraud field that flags a historical transaction as fraudulent or not. We build a supervised ML model using Redshift Auto ML, which learns from this dataset and predicts incoming transactions when those are run through the prediction functions.

In the following sections, we show how to set up the historical dataset and customer data.

Load the historical dataset

The historical table has more fields than what the streaming data source has. These fields contain the customer’s most recent spend and terminal risk score, like number of fraudulent transactions computed by transforming streaming data. There are also categorical variables like weekend transactions or nighttime transactions.

To load the historical data, run the commands using the Amazon Redshift query editor.

Create the transaction history table with the following code. The DDL can also be found on GitHub.

CREATE TABLE cust_payment_tx_history
(
TRANSACTION_ID integer,
TX_DATETIME timestamp,
CUSTOMER_ID integer,
TERMINAL_ID integer,
TX_AMOUNT decimal(9,2),
TX_TIME_SECONDS integer,
TX_TIME_DAYS integer,
TX_FRAUD integer,
TX_FRAUD_SCENARIO integer,
TX_DURING_WEEKEND integer,
TX_DURING_NIGHT integer,
CUSTOMER_ID_NB_TX_1DAY_WINDOW decimal(9,2),
CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW decimal(9,2),
CUSTOMER_ID_NB_TX_7DAY_WINDOW decimal(9,2),
CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW decimal(9,2),
CUSTOMER_ID_NB_TX_30DAY_WINDOW decimal(9,2),
CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW decimal(9,2),
TERMINAL_ID_NB_TX_1DAY_WINDOW decimal(9,2),
TERMINAL_ID_RISK_1DAY_WINDOW decimal(9,2),
TERMINAL_ID_NB_TX_7DAY_WINDOW decimal(9,2),
TERMINAL_ID_RISK_7DAY_WINDOW decimal(9,2),
TERMINAL_ID_NB_TX_30DAY_WINDOW decimal(9,2),
TERMINAL_ID_RISK_30DAY_WINDOW decimal(9,2)
);
Copy cust_payment_tx_history
FROM 's3://redshift-demos/redshiftml-reinvent/2022/ant312/credit-card-transactions/credit_card_transactions_transformed_balanced.csv'
iam_role default
ignoreheader 1
csv ;

Let’s check how many transactions are loaded:

select count(1) from cust_payment_tx_history;

Check the monthly fraud and non-fraud transactions trend:

SELECT to_char(tx_datetime, 'YYYYMM') as YearMonth,
sum(case when tx_fraud=1 then 1 else 0 end) as fraud_tx,
sum(case when tx_fraud=0 then 1 else 0 end) as non_fraud_tx,
count(*) as total_tx
FROM cust_payment_tx_history
GROUP BY YearMonth;

Create and load customer data

Now we create the customer table and load data, which contains the email and phone number of the customer. The following code creates the table, loads the data, and samples the table. The table DDL is available on GitHub.

CREATE TABLE public."customer_info"(customer_id bigint NOT NULL encode az64,
job_title character varying(500) encode lzo,
email_address character varying(100) encode lzo,
full_name character varying(200) encode lzo,
phone_number character varying(20) encode lzo,
city varchar(50),
state varchar(50)
);
COPY customer_info
FROM 's3://redshift-demos/redshiftml-reinvent/2022/ant312/customer-data/Customer_Data.csv'
IGNOREHEADER 1
IAM_ROLE default CSV;
Select count(1) from customer_info;

Our test data has about 5,000 customers. The following screenshot shows sample customer data.

Build an ML model

Our historical card transaction table has 6 months of data, which we now use to train and test the ML model.

The model takes the following fields as input:

TX_DURING_WEEKEND ,
TX_AMOUNT,
TX_DURING_NIGHT ,
CUSTOMER_ID_NB_TX_1DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW ,
CUSTOMER_ID_NB_TX_7DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW ,
CUSTOMER_ID_NB_TX_30DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW ,
TERMINAL_ID_NB_TX_1DAY_WINDOW ,
TERMINAL_ID_RISK_1DAY_WINDOW ,
TERMINAL_ID_NB_TX_7DAY_WINDOW ,
TERMINAL_ID_RISK_7DAY_WINDOW ,
TERMINAL_ID_NB_TX_30DAY_WINDOW ,
TERMINAL_ID_RISK_30DAY_WINDOW

We get tx_fraud as output.

We split this data into training and test datasets. Transactions from 2022-04-01 to 2022-07-31 are for the training set. Transactions from 2022-08-01 to 2022-09-30 are used for the test set.

Let’s create the ML model using the familiar SQL CREATE MODEL statement. We use a basic form of the Redshift ML command. The following method uses Amazon SageMaker Autopilot, which performs data preparation, feature engineering, model selection, and training automatically for you. Provide the name of your S3 bucket containing the code.

CREATE MODEL cust_cc_txn_fd
FROM (
SELECT TX_AMOUNT ,
TX_FRAUD ,
TX_DURING_WEEKEND ,
TX_DURING_NIGHT ,
CUSTOMER_ID_NB_TX_1DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW ,
CUSTOMER_ID_NB_TX_7DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW ,
CUSTOMER_ID_NB_TX_30DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW ,
TERMINAL_ID_NB_TX_1DAY_WINDOW ,
TERMINAL_ID_RISK_1DAY_WINDOW ,
TERMINAL_ID_NB_TX_7DAY_WINDOW ,
TERMINAL_ID_RISK_7DAY_WINDOW ,
TERMINAL_ID_NB_TX_30DAY_WINDOW ,
TERMINAL_ID_RISK_30DAY_WINDOW
FROM cust_payment_tx_history
WHERE cast(tx_datetime as date) between '2022-06-01' and '2022-09-30'
) TARGET tx_fraud
FUNCTION fn_customer_cc_fd
IAM_ROLE default
SETTINGS (
S3_BUCKET '<replace this with your s3 bucket name>',
s3_garbage_collect off,
max_runtime 3600
);

I call the ML model as Cust_cc_txn_fd, and the prediction function as fn_customer_cc_fd. The FROM clause shows the input columns from the historical table public.cust_payment_tx_history. The target parameter is set to tx_fraud, which is the target variable that we’re trying to predict. IAM_Role is set to default because the cluster is configured with this role; if not, you have to provide your Amazon Redshift cluster IAM role ARN. I set the max_runtime to 3,600 seconds, which is the time we give to SageMaker to complete the process. Redshift ML deploys the best model that is identified in this time frame.

Depending on the complexity of the model and the amount of data, it can take some time for the model to be available. If you find your model selection is not completing, increase the value for max_runtime. You can set a max value of 9999.

The CREATE MODEL command is run asynchronously, which means it runs in the background. You can use the SHOW MODEL command to see the status of the model. When the status shows as Ready, it means the model is trained and deployed.

show model cust_cc_txn_fd;

The following screenshots show our output.

From the output, I see that the model has been correctly recognized as BinaryClassification, and F1 has been selected as the objective. The F1 score is a metric that considers both precision and recall. It returns a value between 1 (perfect precision and recall) and 0 (lowest possible score). In my case, it’s 0.91. The higher the value, the better the model performance.

Let’s test this model with the test dataset. Run the following command, which retrieves sample predictions:

SELECT
tx_fraud ,
fn_customer_cc_fd(
TX_AMOUNT ,
TX_DURING_WEEKEND ,
TX_DURING_NIGHT ,
CUSTOMER_ID_NB_TX_1DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW ,
CUSTOMER_ID_NB_TX_7DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW ,
CUSTOMER_ID_NB_TX_30DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW ,
TERMINAL_ID_NB_TX_1DAY_WINDOW ,
TERMINAL_ID_RISK_1DAY_WINDOW ,
TERMINAL_ID_NB_TX_7DAY_WINDOW ,
TERMINAL_ID_RISK_7DAY_WINDOW ,
TERMINAL_ID_NB_TX_30DAY_WINDOW ,
TERMINAL_ID_RISK_30DAY_WINDOW )
FROM cust_payment_tx_history
WHERE cast(tx_datetime as date) >= '2022-10-01'
limit 10 ;

We see that some values are matching and some are not. Let’s compare predictions to the ground truth:

SELECT
tx_fraud ,
fn_customer_cc_fd(
TX_AMOUNT ,
TX_DURING_WEEKEND ,
TX_DURING_NIGHT ,
CUSTOMER_ID_NB_TX_1DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW ,
CUSTOMER_ID_NB_TX_7DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW ,
CUSTOMER_ID_NB_TX_30DAY_WINDOW ,
CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW ,
TERMINAL_ID_NB_TX_1DAY_WINDOW ,
TERMINAL_ID_RISK_1DAY_WINDOW ,
TERMINAL_ID_NB_TX_7DAY_WINDOW ,
TERMINAL_ID_RISK_7DAY_WINDOW ,
TERMINAL_ID_NB_TX_30DAY_WINDOW ,
TERMINAL_ID_RISK_30DAY_WINDOW
) as prediction, count(*) as values
FROM public.cust_payment_tx_history
WHERE cast(tx_datetime as date) >= '2022-08-01'
Group by 1,2 ;

We validated that the model is working and the F1 score is good. Let’s move on to generating predictions on streaming data.

Predict fraudulent transactions

Because the Redshift ML model is ready to use, we can use it to run the predictions against streaming data ingestion. The historical dataset has more fields than what we have in the streaming data source, but they’re just recency and frequency metrics around the customer and terminal risk for a fraudulent transaction.

We can apply the transformations on top of the streaming data very easily by embedding the SQL inside the views. Create the first view, which aggregates streaming data at the customer level. Then create the second view, which aggregates streaming data at terminal level, and the third view, which combines incoming transactional data with customer and terminal aggregated data and calls the prediction function all in one place. The code for the third view is as follows:

CREATE VIEW public.cust_payment_tx_fraud_predictions
as
select a.approximate_arrival_timestamp,
d.full_name , d.email_address, d.phone_number,
a.TRANSACTION_ID, a.TX_DATETIME, a.CUSTOMER_ID, a.TERMINAL_ID,
a.TX_AMOUNT ,
a.TX_TIME_SECONDS ,
a.TX_TIME_DAYS ,
public.fn_customer_cc_fd(a.TX_AMOUNT ,
a.TX_DURING_WEEKEND,
a.TX_DURING_NIGHT,
c.CUSTOMER_ID_NB_TX_1DAY_WINDOW ,
c.CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW ,
c.CUSTOMER_ID_NB_TX_7DAY_WINDOW ,
c.CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW ,
c.CUSTOMER_ID_NB_TX_30DAY_WINDOW ,
c.CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW ,
t.TERMINAL_ID_NB_TX_1DAY_WINDOW ,
t.TERMINAL_ID_RISK_1DAY_WINDOW ,
t.TERMINAL_ID_NB_TX_7DAY_WINDOW ,
t.TERMINAL_ID_RISK_7DAY_WINDOW ,
t.TERMINAL_ID_NB_TX_30DAY_WINDOW ,
t.TERMINAL_ID_RISK_30DAY_WINDOW ) Fraud_prediction
From
(select
Approximate_arrival_timestamp,
TRANSACTION_ID, TX_DATETIME, CUSTOMER_ID, TERMINAL_ID,
TX_AMOUNT ,
TX_TIME_SECONDS ,
TX_TIME_DAYS ,
case when extract(dow from cast(TX_DATETIME as timestamp)) in (1,7) then 1 else 0 end as TX_DURING_WEEKEND,
case when extract(hour from cast(TX_DATETIME as timestamp)) between 00 and 06 then 1 else 0 end as TX_DURING_NIGHT
FROM cust_payment_tx_stream) a
join terminal_transformations t
on a.terminal_id = t.terminal_id
join customer_transformations c
on a.customer_id = c.customer_id
join customer_info d
on a.customer_id = d.customer_id
;

Run a SELECT statement on the view:

select * from
cust_payment_tx_fraud_predictions
where Fraud_prediction = 1;

As you run the SELECT statement repeatedly, the latest credit card transactions go through transformations and ML predictions in near-real time.

This demonstrates the power of Amazon Redshift—with easy-to-use SQL commands, you can transform streaming data by applying complex window functions and apply an ML model to predict fraudulent transactions all in one step, without building complex data pipelines or building and managing additional infrastructure.

Expand the solution

Because the data streams in and ML predictions are made in near-real time, you can build business processes for alerting your customer using Amazon Simple Notification Service (Amazon SNS), or you can lock the customer’s credit card account in an operational system.

This post doesn’t go into the details of these operations, but if you’re interested in learning more about building event-driven solutions using Amazon Redshift, refer to the following GitHub repository.

Clean up

To avoid incurring future charges, delete the resources that were created as part of this post.

Conclusion

In this post, we demonstrated how to set up a Kinesis data stream, configure a producer and publish data to streams, and then create an Amazon Redshift Streaming Ingestion view and query the data in Amazon Redshift. After the data was in the Amazon Redshift cluster, we demonstrated how to train an ML model and build a prediction function and apply it against the streaming data to generate predictions near-real time.

If you have any feedback or questions, please leave them in the comments.


About the Authors

Bhanu Pittampally is an Analytics Specialist Solutions Architect based out of Dallas. He specializes in building analytic solutions. His background is in data warehouses—architecture, development, and administration. He has been in the data and analytics field for over 15 years.

Praveen Kadipikonda is a Senior Analytics Specialist Solutions Architect at AWS based out of Dallas. He helps customers build efficient, performant, and scalable analytic solutions. He has worked with building databases and data warehouse solutions for over 15 years.

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Analyze real-time streaming data in Amazon MSK with Amazon Athena

Post Syndicated from Scott Rigney original https://aws.amazon.com/blogs/big-data/analyze-real-time-streaming-data-in-amazon-msk-with-amazon-athena/

Recent advances in ease of use and scalability have made streaming data easier to generate and use for real-time decision-making. Coupled with market forces that have forced businesses to react more quickly to industry changes, more and more organizations today are turning to streaming data to fuel innovation and agility.

Amazon Managed Streaming for Apache Kafka (MSK) is a fully managed service that makes it easy to build and run applications that use Apache Kafka, an open-source distributed event streaming platform designed for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. With Amazon MSK, you can capture real-time data from a wide range of sources such as database change events or web application user clickstreams. Since Kafka is highly optimized for writing and reading fresh data, it’s a great fit for operational reporting. However, gaining insight from this data often requires a specialized stream processing layer to write streaming records to a storage medium like Amazon S3, where it can be accessed by analysts, data scientists, and data engineers for historical analysis and visualization using tools like Amazon QuickSight.

When you want to analyze data where it lives and without developing separate pipelines and jobs, a popular choice is Amazon Athena. With Athena, you can use your existing SQL knowledge to extract insights from a wide range of data sources without learning a new language, developing scripts to extract (and duplicate) data, or managing infrastructure. Athena supports over 25 connectors to popular data sources including Amazon DynamoDB and Amazon Redshift which give data analysts, data engineers, and data scientists the flexibility to run SQL queries on data stored in databases running on-premises or in the cloud alongside data stored in Amazon S3. With Athena, there’s no data movement and you pay only for the queries you run.

What’s new

Starting today, you can now use Athena to query streaming data in MSK and self-managed Apache Kafka. This enables you to run analytical queries on real-time data held in Kafka topics and join that data with other Kafka topics as well as other data in your Amazon S3 data lake – all without the need for separate processes to first store the data on Amazon S3.

Solution overview

In this post, we show you how to get started with real-time SQL analytics using Athena and its connector for MSK. The process involves:

  1. Registering the schema of your streaming data with AWS Glue Schema Registry. Schema Registry is a feature of AWS Glue that allows you to validate and reliably evolve streaming data against JSON schemas. It can also serialize data into a compressed format, which helps you save on data transfer and storage costs.
  2. Creating a new instance of the Amazon Athena MSK Connector. Athena connectors are pre-built applications that run as serverless AWS Lambda applications, so there’s no need for standalone data export processes.
  3. Using the Athena console to run interactive SQL queries on your Kafka topics.

Get started with Athena’s connector for Amazon MSK

In this section, we’ll cover the steps necessary to set up your MSK cluster to work with Athena to run SQL queries on your Kafka topics.

Prerequisites

This post assumes you have a serverless or provisioned MSK cluster set up to receive streaming messages from a producing application. For information, see Setting up Amazon MSK and Getting started using Amazon MSK in the Amazon Managed Streaming for Apache Kafka Developer Guide.

You’ll also need to set up a VPC and a security group before you use the Athena connector for MSK. For more information, see Creating a VPC for a data source connector. Note that with MSK Serverless, VPCs and security groups are created automatically, so you can get started quickly.

Define the schema of your Kafka topics with AWS Glue Schema Registry

To run SQL queries on your Kafka topics, you’ll first need to define the schema of your topics as Athena uses this metadata for query planning. AWS Glue makes it easy to do this with its Schema Registry feature for streaming data sources.

Schema Registry allows you to centrally discover, control, and evolve streaming data schemas for use in analytics applications such as Athena. With AWS Glue Schema Registry, you can manage and enforce schemas on your data streaming applications using convenient integrations with Apache Kafka. To learn more, see AWS Glue Schema Registry and Getting started with Schema Registry.

If configured to do so, the producer of data can auto-register its schema and changes to it with AWS Glue. This is especially useful in use cases where the contents of the data is likely to change over time. However, you can also specify the schema manually and will resemble the following JSON structure.

{
  "tableName": "orders",
  "schemaName": "customer_schema",
  "topicName": "orders",
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "customer_id",
        "mapping": "customer_id",
        "type": "VARCHAR"
      },
      {
        "name": "item_id",
        "mapping": "item_id",
        "type": "INTEGER"
      }
    ]
  }
}

When setting up your Schema Registry, be sure to give it an easy-to-remember name, such as customer_schema, because you’ll reference it within SQL queries as you’ll see later on. For additional information on schema set up, see Schema examples for the AWS Glue Schema Registry.

Configure the Athena connector for MSK

With your schema registered with Glue, the next step is to set up the Athena connector for MSK. We recommend using the Athena console for this step. For more background on the steps involved, see Deploying a connector and connecting to a data source.

In Athena, federated data source connectors are applications that run on AWS Lambda and handle communication between your target data source and Athena. When a query runs on a federated source, Athena calls the Lambda function and tasks it with running the parts of your query that are specific to that source. To learn more about the query execution workflow, see Using Amazon Athena Federated Query in the Amazon Athena User Guide.

Start by accessing the Athena console and selecting Data sources on the left navigation, then choose Create data source:

Next, search for and select Amazon MSK from the available connectors and select Next.

In Data source details, give your connector a name, like msk, that’s easy to remember and reference in your future SQL queries. Under Connection details section, select Create Lambda function. This will bring you to the AWS Lambda console where you’ll provide additional configuration properties.

In the Lambda application configuration screen (not shown), you’ll provide the Application settings for your connector. To do this, you’ll need a few properties from your MSK cluster and schema registered in Glue.

On another browser tab, use the MSK console to navigate to your MSK cluster and then select the Properties tab. Here you’ll see the VPC subnets and security group IDs from your MSK cluster which you’ll provide in the SubnetIds and SecurityGroupIds fields in the Athena connector’s Application settings form. You can find the value for KafkaEndpoint by clicking View client information.

In the AWS Glue console, navigate to your Schema Registry to find the GlueRegistryArn for the schema you wish to use with this connector.

After providing these and the other required values, click Deploy.

Return to the Athena console and enter the name of the Lambda function you just created in the Connection details box, then click Create data source.

Run queries on streaming data using Athena

With your MSK data connector set up, you can now run SQL queries on the data. Let’s explore a few use cases in more detail.

Use case: interactive analysis

If you want to run queries that aggregate, group, or filter your MSK data, you can run interactive queries using Athena. These queries will run against the current state of your Kafka topics at the time the query was submitted.

Before running any queries, it may be helpful to validate the schema and data types available within your Kafka topics. To do this, run the DESCRIBE command on your Kafka topic, which appears in Athena as a table, as shown below. In this query, the orders table corresponds to the topic you specified in the Schema Registry.

DESCRIBE msk.customer_schema.orders

Now that you know the contents of your topic, you can begin to develop analytical queries. A sample query for a hypothetical Kafka topic containing e-commerce order data is shown below:

SELECT customer_id, SUM(order_total)
FROM msk.customer_schema.orders
GROUP BY customer_id

Because the orders table (and underlying Kafka topic) can contain an unbounded stream of data, the query above is likely to return a different value for SUM(order_total) with each execution of the query.

If you have data in one topic that you need to join with another topic, you can do that too:

SELECT t1.order_id, t2.item_id
FROM msk.customer_schema.orders as t1
JOIN msk.customer_schema.items as t2
ON t1.id = t2.id

Use case: ingesting streaming data to a table on Amazon S3

Federated queries run against the underlying data source which ensures interactive queries, like the ones above, are evaluated against the current state of your data. One consideration is that repeatedly running federated queries can put additional load on the underlying source. If you plan to perform multiple queries on the same source data, you can use Athena’s CREATE TABLE AS SELECT, also known as CTAS, to store the results of a SELECT query in a table on Amazon S3. You can then run queries on your newly created table without going back to the underlying source each time.

CREATE TABLE my_kafka_data
WITH (format = 'Parquet', 
      write_compression = 'SNAPPY')
AS
SELECT order_id, item_id, timestamp
FROM msk.customer_schema.orders

If you plan to do additional downstream analysis on this data, for example within dashboards on Amazon QuickSight, you can enhance the solution above by periodically adding new data to your table. To learn more, see Using CTAS and INSERT INTO for ETL and data analysis. Another benefit of this approach is that you can secure these tables with row-, column-, and table-level data governance policies powered by AWS Lake Formation to ensure only authorized users can access your table.

What else can you do?

With Athena, you can use your existing SQL knowledge to run federated queries that generate insights from a wide range of data sources without learning a new language, developing scripts to extract (and duplicate) data, or managing infrastructure. Athena provides additional integrations with other AWS services and popular analytics tools and SQL IDEs that allow you to do much more with your data. For example, you can:

  • Visualize the data in business intelligence applications like Amazon QuickSight
  • Design event-driven data processing workflows with Athena’s integration with AWS Step Functions
  • Unify multiple data sources to produce rich input features for machine learning in Amazon SageMaker

Conclusion

In this post, we learned about the newly released Athena connector for Amazon MSK. With it, you can run interactive queries on data held in Kafka topics running in MSK or self-managed Apache Kafka. This helps you bring real-time insights to dashboards or enable point-in-time analysis of streaming data to answer time-sensitive business questions. We also covered how to periodically ingest new streaming data into Amazon S3 without the need for a separate sink process. This simplifies recurring analysis of your data without incurring round-trip queries to your underlying Kafka clusters and makes it possible to secure the data with access rules powered by Lake Formation.

We encourage you to evaluate Athena and federated queries on your next analytics project. For help getting started, we recommend the following resources:


About the authors

Scott Rigney is a Senior Technical Product Manager with Amazon Web Services (AWS) and works with the Amazon Athena team based out of Arlington, Virginia. He is passionate about building analytics products that enable enterprises to make data-driven decisions.

Kiran Matty is a Principal Product Manager with Amazon Web Services (AWS) and works with the Amazon Managed Streaming for Apache Kafka (Amazon MSK) team based out of Palo Alto, California. He is passionate about building performant streaming and analytical services that help enterprises realize their critical use cases.

Gain visibility into your Amazon MSK cluster by deploying the Conduktor Platform

Post Syndicated from Stéphane Maarek original https://aws.amazon.com/blogs/big-data/gain-visibility-into-your-amazon-msk-cluster-by-deploying-the-conduktor-platform/

This is a guest post by AWS Data Hero and co-founder of Conduktor, Stephane Maarek.

Deploying Apache Kafka on AWS is now easier, thanks to Amazon Managed Streaming for Apache Kafka (Amazon MSK). In a few clicks, it provides you with a production-ready Kafka cluster on which you can run your applications and create data streams.

Apache Kafka is an open-source project, and no official user interfaces are available. The lack of visibility into Apache Kafka is a factor in the slow development of applications.

The recent announcement of the Conduktor Platform makes Amazon MSK operations simple, and you can solve Kafka issues end to end with solutions for testing, monitoring, data quality, governance, and security.

You can use the Conduktor Platform to monitor both types of MSK clusters, provisioned and serverless. In this post, we demonstrate how to use AWS Identity and Access Management (IAM) based security to administer our MSK cluster.

Solution overview

We look at how we can deploy the Conduktor Platform on Amazon MSK in a production-ready deployment so you can try it out today.

The solution is fully serverless and customizable. Everything is deployed using AWS CloudFormation templates.

The source code and CloudFormation templates used in this post are available in the GitHub repo.

To implement this solution, we complete the following high-level steps:

  1. Deploy a CloudFormation template to create our customized Docker image for the Conduktor Platform using AWS CodeBuild.
  2. Optionally, deploy an MSK cluster in provisioned or serverless mode using a CloudFormation template.
  3. Deploy the Conduktor Platform as an AWS Fargate container against our MSK cluster using a CloudFormation template.

Create a customized configuration for the Conduktor Platform

The Conduktor Platform uses a YAML configuration file to define the cluster connection endpoints. Therefore, we must create a customized Docker image of the Conduktor Platform that is able to connect to a cluster on Amazon MSK with a customized YAML file. For this, we use CodeBuild, and we store our configuration files in Amazon Simple Storage Service (Amazon S3). The final image is stored in Amazon Elastic Container Registry (Amazon ECR). The following diagram illustrates this workflow.

  1. Deploy the first CloudFormation template to create the following resources:
    • An S3 bucket to store our configuration files.
    • An ECR repository to store our final Docker image.
    • A CodeBuild project to build that Docker image.
    • An IAM role and policy to allow CodeBuild to perform the build.

Now we need to upload our files into Amazon S3.

  1. Upload the following files:
    • The file buildspec.yml, which is used by CodeBuild to build our primary Docker image.
    • The Dockerfile, which contains instructions on how to build our final Docker image.
    • The folder conduktor-platform-config (as is), which contains the configuration files to connect to Amazon MSK.

  1. At this stage, you can customize the conduktor-platform.yaml file, allowing you to connect to one MSK cluster:

Alternatively, you can connect to multiple MSK clusters or external ones by specifying multiple Kafka bootstrap servers, as shown in the following code. You can also use the same configuration file to specify the schema registry URL, Kafka Connect connection details, and SSO.

A single-Region Conduktor Platform deployment can work for multi-Region MSK clusters, although natural latency is expected. For latency-sensitive usage, you can deploy this solution in every Region in which you’re using Amazon MSK.

After uploading the files and configurations in your S3 bucket, let’s run CodeBuild to generate a new image.

  1. On the CodeBuild console, navigate to the project and choose Start build.

The build should complete in about 3 minutes.

The final image is pushed to Amazon ECR thanks to the script hosted in our build-spec.yml script run by CodeBuild. We’re now done with our first step. Your Conduktor Platform setup can now fully connect to your MSK cluster.

Start the MSK cluster

If you already have an MSK cluster set up with IAM access control, you can skip this step. If not, you can create one using the provided CloudFormation template.

From the MSK cluster (the new one or existing one), retrieve two essential pieces of information:

We use IAM access control so that we only need to use IAM policies to connect to our cluster.

If you’re using another security mechanism (such as SASL/SCRAM), you need to modify the Conduktor configuration files with the right properties, upload them back into Amazon S3, and rebuild the Conduktor image using CodeBuild.

Conduktor supports every single Kafka authentication method, including the ones supported by Amazon MSK: IAM access control, mutual TLS authentication, and user name/password using SASL/SCRAM.

Deploy the Conduktor Platform on Amazon ECS with Fargate

The last step is to deploy the Conduktor Platform. For this, we prefer running serverless solutions using Amazon Elastic Container Service (Amazon ECS) with Fargate. This allows you to right-size your containers in the future in case your usage of Conduktor grows over time.

Conduktor stores persistent data in the /var/conduktor file system folder, to store configuration, cache computation results, store logs, and run an internal database (for example, if you start creating data masking rules). For the persistence layer, we use Amazon Elastic File System (Amazon EFS), an elastic network file system that can be mounted on Fargate to provide a persistence layer.

Finally, we expose our Fargate container through an Application Load Balancer, giving us a public static DNS endpoint to expose the Conduktor Platform and giving us complete control over the network security to access the Conduktor Platform. The following diagram illustrates our architecture.

Deploy the Conduktor Platform on Amazon ECS with Fargate

We deploy our last CloudFormation file and specify some important parameters:

  • MSKBookstrapServersURL – This parameter is necessary to tell Conduktor which MSK cluster to connect to
  • MSKSecurityGroupID – The MSK security group is necessary to allow the template to add a security group ingress rule to it, thereby allowing our ECS task
  • PublicSubnetIDs – The public subnet IDs are for your Application Load Balancer
  • SubnetIDs – The subnet IDs are for your ECS task and can be the same subnets or private subnets (as long as they have access to the MSK cluster and the other public subnets)
  • VpcID – This is the VPC you’re deploying to

After deploying the template, on the Output tab of the stack, you can find the Application Load Balancer URL.

We use this URL and log in to the Conduktor Platform with the user name [email protected] and password password. These login credentials can be changed using the YAML configuration file, and you can even enable SSO and LDAP.

On the Conduktor console, you can start creating topics, producing data, consuming data, and much more! AWS Glue Schema Registry support is coming soon, and Confluent Schema Registry compatibility is already available.

Clean up

To clean up your AWS account, perform the following steps in order:

  1. Delete the third CloudFormation template (3 – create ECS Service.yaml).
  2. Delete the second CloudFormation template (2 – create MSK cluster.yaml).
  3. Empty the contents of your S3 bucket.
  4. Delete all your images in your ECR repository.
  5. Delete the first CloudFormation template (1 – base conduktor.yaml).

Conclusion

You can use the Conduktor Platform against as many MSK clusters as desired by editing the file conduktor-platform.yaml. You can even connect to your clusters running elsewhere, for example on Amazon Elastic Compute Cloud (Amazon EC2).

On our roadmap, we’re working on a complete integration with Amazon MSK, including AWS Glue Schema Registry support, Amazon MSK Connect support, and complete monitoring capabilities.

The Conduktor Platform offers a limited free tier with no time limit. Head to Conduktor’s Get Started page and create an account to start using the Platform alongside MSK clusters today.


About the Author

Stéphane Maarek is the co-founder of Conduktor. He is also the lead instructor on Udemy for learning Apache Kafka and AWS Certifications, having taught these technologies to over 1.5 million learners. Through Conduktor, he wants to democratize access to Apache Kafka and make its usage seamless and enterprise-ready.

New for Amazon Redshift – General Availability of Streaming Ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-for-amazon-redshift-general-availability-of-streaming-ingestion-for-kinesis-data-streams-and-managed-streaming-for-apache-kafka/

Ten years ago, just a few months after I joined AWS, Amazon Redshift was launched. Over the years, many features have been added to improve performance and make it easier to use. Amazon Redshift now allows you to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes. More recently, Amazon Redshift Serverless became generally available to make it easier to run and scale analytics without having to manage your data warehouse infrastructure.

To process data as quickly as possible from real-time applications, customers are adopting streaming engines like Amazon Kinesis and Amazon Managed Streaming for Apache Kafka. Previously, to load streaming data into your Amazon Redshift database, you’d have to configure a process to stage data in Amazon Simple Storage Service (Amazon S3) before loading. Doing so would introduce a latency of one minute or more, depending on the volume of data.

Today, I am happy to share the general availability of Amazon Redshift Streaming Ingestion. With this new capability, Amazon Redshift can natively ingest hundreds of megabytes of data per second from Amazon Kinesis Data Streams and Amazon MSK into an Amazon Redshift materialized view and query it in seconds.

Architecture diagram.

Streaming ingestion benefits from the ability to optimize query performance with materialized views and allows the use of Amazon Redshift more efficiently for operational analytics and as the data source for real-time dashboards. Another interesting use case for streaming ingestion is analyzing real-time data from gamers to optimize their gaming experience. This new integration also makes it easier to implement analytics for IoT devices, clickstream analysis, application monitoring, fraud detection, and live leaderboards.

Let’s see how this works in practice.

Configuring Amazon Redshift Streaming Ingestion
Apart from managing permissions, Amazon Redshift streaming ingestion can be configured entirely with SQL within Amazon Redshift. This is especially useful for business users who lack access to the AWS Management Console or the expertise to configure integrations between AWS services.

You can set up streaming ingestion in three steps:

  1. Create or update an AWS Identity and Access Management (IAM) role to allow access to the streaming platform you use (Kinesis Data Streams or Amazon MSK). Note that the IAM role should have a trust policy that allows Amazon Redshift to assume the role.
  2. Create an external schema to connect to the streaming service.
  3. Create a materialized view that references the streaming object (Kinesis data stream or Kafka topic) in the external schemas.

After that, you can query the materialized view to use the data from the stream in your analytics workloads. Streaming ingestion works with Amazon Redshift provisioned clusters and with the new serverless option. To maximize simplicity, I am going to use Amazon Redshift Serverless in this walkthrough.

To prepare my environment, I need a Kinesis data stream. In the Kinesis console, I choose Data streams in the navigation pane and then Create data stream. For the Data stream name, I use my-input-stream and then leave all other options set to their default value. After a few seconds, the Kinesis data stream is ready. Note that by default I am using on-demand capacity mode. In a development or test environment, you can choose provisioned capacity mode with one shard to optimize costs.

Now, I create an IAM role to give Amazon Redshift access to the my-input-stream Kinesis data streams. In the IAM console, I create a role with this policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:123412341234:stream/my-input-stream"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

To allow Amazon Redshift to assume the role, I use the following trust policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "redshift.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

In the Amazon Redshift console, I choose Redshift serverless from the navigation pane and create a new workgroup and namespace, similar to what I did in this blog post. When I create the namespace, in the Permissions section, I choose Associate IAM roles from the dropdown menu. Then, I select the role I just created. Note that the role is visible in this selection only if the trust policy allows Amazon Redshift to assume it. After that, I complete the creation of the namespace using the default options. After a few minutes, the serverless database is ready for use.

In the Amazon Redshift console, I choose Query editor v2 in the navigation pane. I connect to the new serverless database by choosing it from the list of resources. Now, I can use SQL to configure streaming ingestion. First, I create an external schema that maps to the streaming service. Because I am going to use simulated IoT data as an example, I call the external schema sensors.

CREATE EXTERNAL SCHEMA sensors
FROM KINESIS
IAM_ROLE 'arn:aws:iam::123412341234:role/redshift-streaming-ingestion';

To access the data in the stream, I create a materialized view that selects data from the stream. In general, materialized views contain a precomputed result set based on the result of a query. In this case, the query is reading from the stream, and Amazon Redshift is the consumer of the stream.

Because streaming data is going to be ingested as JSON data, I have two options:

  1. Leave all the JSON data in a single column and use Amazon Redshift capabilities to query semi-structured data.
  2. Extract JSON properties into their own separate columns.

Let’s see the pros and cons of both options.

The approximate_arrival_timestamp, partition_key, shard_id, and sequence_number columns in the SELECT statement are provided by Kinesis Data Streams. The record from the stream is in the kinesis_data column. The refresh_time column is provided by Amazon Redshift.

To leave the JSON data in a single column of the sensor_data materialized view, I use the JSON_PARSE function:

CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_PARSE(kinesis_data, 'utf-8') as payload    
      FROM sensors."my-input-stream";
CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_PARSE(kinesis_data) as payload 
FROM sensors."my-input-stream";

Because I used the AUTO REFRESH YES parameter, the content of the materialized view is automatically refreshed when there is new data in the stream.

To extract the JSON properties into separate columns of the sensor_data_extract materialized view, I use the JSON_EXTRACT_PATH_TEXT function:

CREATE MATERIALIZED VIEW sensor_data_extract AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'sensor_id')::VARCHAR(8) as sensor_id,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'current_temperature')::DECIMAL(10,2) as current_temperature,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'status')::VARCHAR(8) as status,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::CHARACTER(26) as event_time
      FROM sensors."my-input-stream";

Loading Data into the Kinesis Data Stream
To put data in the my-input-stream Kinesis Data Stream, I use the following random_data_generator.py Python script simulating data from IoT sensors:

import datetime
import json
import random
import boto3

STREAM_NAME = "my-input-stream"


def get_random_data():
    current_temperature = round(10 + random.random() * 170, 2)
    if current_temperature > 160:
        status = "ERROR"
    elif current_temperature > 140 or random.randrange(1, 100) > 80:
        status = random.choice(["WARNING","ERROR"])
    else:
        status = "OK"
    return {
        'sensor_id': random.randrange(1, 100),
        'current_temperature': current_temperature,
        'status': status,
        'event_time': datetime.datetime.now().isoformat()
    }


def send_data(stream_name, kinesis_client):
    while True:
        data = get_random_data()
        partition_key = str(data["sensor_id"])
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis')
    send_data(STREAM_NAME, kinesis_client)

I start the script and see the records that are being put in the stream. They use a JSON syntax and contain random data.

$ python3 random_data_generator.py
{'sensor_id': 66, 'current_temperature': 69.67, 'status': 'OK', 'event_time': '2022-11-20T18:31:30.693395'}
{'sensor_id': 45, 'current_temperature': 122.57, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.486649'}
{'sensor_id': 15, 'current_temperature': 101.64, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.671593'}
...

Querying Streaming Data from Amazon Redshift
To compare the two materialized views, I select the first ten rows from each of them:

  • In the sensor_data materialized view, the JSON data in the stream is in the payload column. I can use Amazon Redshift JSON functions to access data stored in JSON format.Console screenshot.
  • In the sensor_data_extract materialized view, the JSON data in the stream has been extracted into different columns: sensor_id, current_temperature, status, and event_time.Console screenshot.

Now I can use the data in these views in my analytics workloads together with the data in my data warehouse, my operational databases, and my data lake. I can use the data in these views together with Redshift ML to train a machine learning model or use predictive analytics. Because materialized views support incremental updates, the data in these views can be efficiently used as a data source for dashboards, for example, using Amazon Redshift as a data source for Amazon Managed Grafana.

Availability and Pricing
Amazon Redshift streaming ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka is generally available today in all commercial AWS Regions.

There are no additional costs for using Amazon Redshift streaming ingestion. For more information, see Amazon Redshift pricing.

It’s never been easier to use low-latency streaming data in your data warehouse and in your data lake. Let us know what you build with this new capability!

Danilo

How SOCAR built a streaming data pipeline to process IoT data for real-time analytics and control

Post Syndicated from DoYeun Kim original https://aws.amazon.com/blogs/big-data/how-socar-built-a-streaming-data-pipeline-to-process-iot-data-for-real-time-analytics-and-control/

SOCAR is the leading Korean mobility company with strong competitiveness in car-sharing. SOCAR has become a comprehensive mobility platform in collaboration with Nine2One, an e-bike sharing service, and Modu Company, an online parking platform. Backed by advanced technology and data, SOCAR solves mobility-related social problems, such as parking difficulties and traffic congestion, and changes the car ownership-oriented mobility habits in Korea.

SOCAR is building a new fleet management system to manage the many actions and processes that must occur in order for fleet vehicles to run on time, within budget, and at maximum efficiency. To achieve this, SOCAR is looking to build a highly scalable data platform using AWS services to collect, process, store, and analyze internet of things (IoT) streaming data from various vehicle devices and historical operational data.

This in-car device data, combined with operational data such as car details and reservation details, will provide a foundation for analytics use cases. For example, SOCAR will be able to notify customers if they have forgotten to turn their headlights off or to schedule a service if a battery is running low. Unfortunately, the previous architecture didn’t enable the enrichment of IoT data with operational data and couldn’t support streaming analytics use cases.

AWS Data Lab offers accelerated, joint-engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. The Build Lab is a 2–5-day intensive build with a technical customer team.

In this post, we share how SOCAR engaged the Data Lab program to assist them in building a prototype solution to overcome these challenges, and to build the basis for accelerating their data project.

Use case 1: Streaming data analytics and real-time control

SOCAR wanted to utilize IoT data for a new business initiative. A fleet management system, where data comes from IoT devices in the vehicles, is a key input to drive business decisions and derive insights. This data is captured by AWS IoT and sent to Amazon Managed Streaming for Apache Kafka (Amazon MSK). By joining the IoT data to other operational datasets, including reservations, car information, device information, and others, the solution can support a number of functions across SOCAR’s business.

An example of real-time monitoring is when a customer turns off the car engine and closes the car door, but the headlights are still on. By using IoT data related to the car light, door, and engine, a notification is sent to the customer to inform them that the car headlights should be turned off.

Although this real-time control is important, they also want to collect historical data—both raw and curated data—in Amazon Simple Storage Service (Amazon S3) to support historical analytics and visualizations by using Amazon QuickSight.

Use case 2: Detect table schema change

The first challenge SOCAR faced was existing batch ingestion pipelines that were prone to breaking when schema changes occurred in the source systems. Additionally, these pipelines didn’t deliver data in a way that was easy for business analysts to consume. In order to meet the future data volumes and business requirements, they needed a pattern for the automated monitoring of batch pipelines with notification of schema changes and the ability to continue processing.

The second challenge was related to the complexity of the JSON files being ingested. The existing batch pipelines weren’t flattening the five-level nested structure, which made it difficult for business users and analysts to gain business insights without any effort on their end.

Overview of solution

In this solution, we followed the serverless data architecture to establish a data platform for SOCAR. This serverless architecture allowed SOCAR to run data pipelines continuously and scale automatically with no setup cost and without managing servers.

AWS Glue is used for both the streaming and batch data pipelines. Amazon Kinesis Data Analytics is used to deliver streaming data with subsecond latencies. In terms of storage, data is stored in Amazon S3 for historical data analysis, auditing, and backup. However, when frequent reading of the latest snapshot data is required by multiple users and applications concurrently, the data is stored and read from Amazon DynamoDB tables. DynamoDB is a key-value and document database that can support tables of virtually any size with horizontal scaling.

Let’s discuss the components of the solution in detail before walking through the steps of the entire data flow.

Component 1: Processing IoT streaming data with business data

The first data pipeline (see the following diagram) processes IoT streaming data with business data from an Amazon Aurora MySQL-Compatible Edition database.

Whenever a transaction occurs in two tables in the Aurora MySQL database, this transaction is captured as data and then loaded into two MSK topics via AWS Database Management (AWS DMS) tasks. One topic conveys the car information table, and the other topic is for the device information table. This data is loaded into a single DynamoDB table that contains all the attributes (or columns) that exist in the two tables in the Aurora MySQL database, along with a primary key. This single DynamoDB table contains the latest snapshot data from the two DB tables, and is important because it contains the latest information of all the cars and devices for the lookup against the streaming IoT data. If the lookup were done on the database directly with the streaming data, it would impact the production database performance.

When the snapshot is available in DynamoDB, an AWS Glue streaming job runs continuously to collect the IoT data and join it with the latest snapshot data in the DynamoDB table to produce the up-to-date output, which is written into another DynamoDB table.

The up-to-date data in DynamoDB is used for real-time monitoring and control that SOCAR’s Data Analytics team performs for safety maintenance and fleet management. This data is ultimately consumed by a number of apps to perform various business activities, including route optimization, real-time monitoring for oil consumption and temperature, and to identify a driver’s driving pattern, tire wear and defect detection, and real-time car crash notifications.

Component 2: Processing IoT data and visualizing the data in dashboards

The second data pipeline (see the following diagram) batch processes the IoT data and visualizes it in QuickSight dashboards.

There are two data sources. The first is the Aurora MySQL database. The two database tables are exported into Amazon S3 from the Aurora MySQL cluster and registered in the AWS Glue Data Catalog as tables. The second data source is Amazon MSK, which receives streaming data from AWS IoT Core. This requires you to create a secure AWS Glue connection for an Apache Kafka data stream. SOCAR’s MSK cluster requires SASL_SSL as a security protocol (for more information, refer to Authentication and authorization for Apache Kafka APIs). To create an MSK connection in AWS Glue and set up connectivity, we use the following CLI command:

aws glue create-connection —connection-input
'{"Name":"kafka-connection","Description":"kafka connection example",
"ConnectionType":"KAFKA",
"ConnectionProperties":{
"KAFKA_BOOTSTRAP_SERVERS":"<server-ip-addresses>",
"KAFKA_SSL_ENABLED":"true",
// "KAFKA_CUSTOM_CERT": "s3://bucket/prefix/cert.pem",
"KAFKA_SECURITY_PROTOCOL" : "SASL_SSL",
"KAFKA_SKIP_CUSTOM_CERT_VALIDATION":"false",
"KAFKA_SASL_MECHANISM": "SCRAM-SHA-512",
"KAFKA_SASL_SCRAM_USERNAME": "<username>",
"KAFKA_SASL_SCRAM_PASSWORD: "<password>"
},
"PhysicalConnectionRequirements":
{"SubnetId":"subnet-xxx","SecurityGroupIdList":["sg-xxx"],"AvailabilityZone":"us-east-1a"}}'

Component 3: Real-time control

The third data pipeline processes the streaming IoT data in millisecond latency from Amazon MSK to produce the output in DynamoDB, and sends a notification in real time if any records are identified as an outlier based on business rules.

AWS IoT Core provides integrations with Amazon MSK to set up real-time streaming data pipelines. To do so, complete the following steps:

  1. On the AWS IoT Core console, choose Act in the navigation pane.
  2. Choose Rules, and create a new rule.
  3. For Actions, choose Add action and choose Kafka.
  4. Choose the VPC destination if required.
  5. Specify the Kafka topic.
  6. Specify the TLS bootstrap servers of your Amazon MSK cluster.

You can view the bootstrap server URLs in the client information of your MSK cluster details. The AWS IoT rule was created with the Kafka topic as an action to provide data from AWS IoT Core to Kafka topics.

SOCAR used Amazon Kinesis Data Analytics Studio to analyze streaming data in real time and build stream-processing applications using standard SQL and Python. We created one table from the Kafka topic using the following code:

CREATE TABLE table_name (
column_name1 VARCHAR,
column_name2 VARCHAR(100),
column_name3 VARCHAR,
column_name4 as TO_TIMESTAMP (`time_column`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR column AS column -INTERVAL '5' SECOND
)
PARTITIONED BY (column_name5)
WITH (
'connector'= 'kafka',
'topic' = 'topic_name',
'properties.bootstrap.servers' = '<bootstrap servers shown in the MSK client info dialog>',
'format' = 'json',
'properties.group.id' = 'testGroup1',
'scan.startup.mode'= 'earliest-offset'
);

Then we applied a query with business logic to identify a particular set of records that need to be alerted. When this data is loaded back into another Kafka topic, AWS Lambda functions trigger the downstream action: either load the data into a DynamoDB table or send an email notification.

Component 4: Flattening the nested structure JSON and monitoring schema changes

The final data pipeline (see the following diagram) processes complex, semi-structured, and nested JSON files.

This step uses an AWS Glue DynamicFrame to flatten the nested structure and then land the output in Amazon S3. After the data is loaded, it’s scanned by an AWS Glue crawler to update the Data Catalog table and detect any changes in the schema.

Data flow: Putting it all together

The following diagram illustrates our complete data flow with each component.

Let’s walk through the steps of each pipeline.

The first data pipeline (in red) processes the IoT streaming data with the Aurora MySQL business data:

  1. AWS DMS is used for ongoing replication to continuously apply source changes to the target with minimal latency. The source includes two tables in the Aurora MySQL database tables (carinfo and deviceinfo), and each is linked to two MSK topics via AWS DMS tasks.
  2. Amazon MSK triggers a Lambda function, so whenever a topic receives data, a Lambda function runs to load data into DynamoDB table.
  3. There is a single DynamoDB table with columns that exist from the carinfo table and the deviceinfo table of the Aurora MySQL database. This table consists of all the data from two tables and stores the latest data by performing an upsert operation.
  4. An AWS Glue job continuously receives the IoT data and joins it with data in the DynamoDB table to produce the output into another DynamoDB target table.
  5. This target table contains the final data, which includes all the device and car status information from the IoT devices as well as metadata from the Aurora MySQL table.

The second data pipeline (in green) batch processes IoT data to use in dashboards and for visualization:

  1. The car and reservation data (in two DB tables) is exported via a SQL command from the Aurora MySQL database with the output data available in an S3 bucket. The folders that contain data are registered as an S3 location for the AWS Glue crawler and become available via the AWS Glue Data Catalog.
  2. The MSK input topic continuously receives data from AWS IoT. Each car has a number of IoT devices, and each device captures data and sends it to an MSK input topic. The Amazon MSK S3 sink connector is configured to export data from Kafka topics to Amazon S3 in JSON formats. In addition, the S3 connector exports data by guaranteeing exactly-once delivery semantics to consumers of the S3 objects it produces.
  3. The AWS Glue job runs in a daily batch to load the historical IoT data into Amazon S3 and into two tables (refer to step 1) to produce the output data in an Enriched folder in Amazon S3.
  4. Amazon Athena is used to query data from Amazon S3 and make it available as a dataset in QuickSight for visualizing historical data.

The third data pipeline (in blue) processes streaming IoT data from Amazon MSK with millisecond latency to produce the output in DynamoDB and send a notification:

  1. An Amazon Kinesis Data Analytics Studio notebook powered by Apache Zeppelin and Apache Flink is used to build and deploy its output as a Kinesis Data Analytics application. This application loads data from Amazon MSK in real time, and users can apply business logic to select particular events coming from the IoT real-time data, for example, the car engine is off and the doors are closed, but the headlights are still on. The particular event that users want to capture can be sent to another MSK topic (Outlier) via the Kinesis Data Analytics application.
  2. Amazon MSK triggers a Lambda function, so whenever a topic receives data, a Lambda function runs to send an email notification to users that are subscribed to an Amazon Simple Notification Service (Amazon SNS) topic. An email is published using an SNS notification.
  3. The Kinesis Data Analytics application loads data from AWS IoT, applies business logic, and then loads it into another MSK topic (output). Amazon MSK triggers a Lambda function when data is received, which loads data into a DynamoDB Append table.
  4. Amazon Kinesis Data Analytics Studio is used to run SQL commands for ad hoc interactive analysis on streaming data.

The final data pipeline (in yellow) processes complex, semi-structured, and nested JSON files, and sends a notification when a schema evolves.

  1. An AWS Glue job runs and reads the JSON data from Amazon S3 (as a source), applies logic to flatten the nested schema using a DynamicFrame, and pivots out array columns from the flattened frame.
  2. The output is stored in Amazon S3 and is automatically registered to the AWS Glue Data Catalog table.
  3. Whenever there is a new attribute or change in the JSON input data at any level in the nested structure, the new attribute and change are captured in Amazon EventBridge as an event from the AWS Glue Data Catalog. An email notification is published using Amazon SNS.

Conclusion

As a result of the four-day Build Lab, the SOCAR team left with a working prototype that is custom fit to their needs, gaining a clear path to production. The Data Lab allowed the SOCAR team to build a new streaming data pipeline, enrich IoT data with operational data, and enhance the existing data pipeline to process complex nested JSON data. This establishes a baseline architecture to support the new fleet management system beyond the car-sharing business.


About the Authors

DoYeun Kim is the Head of Data Engineering at SOCAR. He is a passionate software engineering professional with 19+ years experience. He leads a team of 10+ engineers who are responsible for the data platform, data warehouse and MLOps engineering, as well as building in-house data products.

SangSu Park is a Lead Data Architect in SOCAR’s cloud DB team. His passion is to keep learning, embrace challenges, and strive for mutual growth through communication. He loves to travel in search of new cities and places.

YoungMin Park is a Lead Architect in SOCAR’s cloud infrastructure team. His philosophy in life is-whatever it may be-to challenge, fail, learn, and share such experiences to build a better tomorrow for the world. He enjoys building expertise in various fields and basketball.

Younggu Yun is a Senior Data Lab Architect at AWS. He works with customers around the APAC region to help them achieve business goals and solve technical problems by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together. In his free time, his son and he are obsessed with Lego blocks to build creative models.

Vicky Falconer leads the AWS Data Lab program across APAC, offering accelerated joint engineering engagements between teams of customer builders and AWS technical resources to create tangible deliverables that accelerate data analytics modernization and machine learning initiatives.

Retain more for less with tiered storage for Amazon MSK

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/retain-more-for-less-with-tiered-storage-for-amazon-msk/

Organizations are adopting Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to capture and analyze data in real-time. Amazon MSK allows you to build and run production applications on Apache Kafka without needing Kafka infrastructure management expertise or having to deal with the complex overheads associated with running Apache Kafka on your own. With increasing maturity, customers seek to build sophisticated use cases that combine aspects of real time and batch processing. For instance, you may want to train machine learning (ML) models based on historic data and then use these models to do real time inferencing. Or you may want to be able to recompute previous results when the application logic changed, e.g., when a new KPI is added to a streaming analytics application or when a bug was fixed that caused incorrect output. These use cases often require storing data for several weeks, months, or even years.

Apache Kafka is well positioned to support these kind of use cases. Data is retained in the Kafka cluster as long as required by configuring the retention policy. In this way, the most recent data can be processed in real time for low-latency use cases while historic data remains accessible in the cluster and can be processed in a batch fashion.

However, retaining data in a Kafka cluster can become expensive because storage and compute are tightly coupled in a cluster. To scale storage, you need to add more brokers. But adding more brokers with the sole purpose of increasing the storage squanders the rest of the compute resources like CPU and memory. Also, a large cluster with more nodes adds operational complexity with a longer time to recover and rebalance when a broker fails. To avoid that operational complexity and higher cost, you can move your data to Amazon Simple Storage Service (Amazon S3) for long-term access and with cost-effective storage classes in Amazon S3 you can optimize your overall storage cost. This solves cost challenges, but now you have to build and maintain that part of the architecture for data movement to a different data store. You also need to build different data processing logic using different APIs for consuming data (Kafka API for streaming, Amazon S3 API for historic reads).

Today, we’re announcing Amazon MSK tiered storage, which brings a virtually unlimited and low-cost storage tier for Amazon MSK, making it simpler and cost-effective for developers to build streaming data applications. Since the launch of Amazon MSK in 2019, we have enabled capabilities such as vertical scaling and automatic scaling of broker storage so you can operate your Kafka workloads in a cost-effective way. Earlier this year, we launched provisioned throughput which enables seamlessly scaling I/O without having to provision additional brokers. Tiered storage makes it even more cost-effective for you to run Kafka workloads. You can now store data in Apache Kafka without worrying about limits. You can effectively balance your performance and costs by using the performance-optimized primary storage for real-time data and the new low-cost tier for the historical data. With a few clicks, you can move streaming data into a lower-cost tier to store data and only pay for what you use.

Tiered storage frees you from making hard trade-offs between supporting the data retention needs of your application teams and the operational complexity that comes with it. This enables you to use the same code to process both real-time and historical data to minimize redundant workflows and simplify architectures. With Amazon MSK tiered storage, you can implement a Kappa architecture – a streaming-first software architecture deployment pattern – to use the same data processing pipeline for correctness and completeness of data over a much longer time horizon for business analysis.

How Amazon MSK tiered storage works

Let’s look at how tiered storage works for Amazon MSK. Apache Kafka stores data in files called log segments. As each segment completes, based on the segment size configured at cluster or topic level, it’s copied to the low-cost storage tier. Data is held in performance-optimized storage for a specified retention time, or up to a specified size, and then deleted. There is a separate time and size limit setting for the low-cost storage, which must be longer than the performance-optimized storage tier. If clients request data from segments stored in the low-cost tier, the broker reads the data from it and serves the data in the same way as if it were being served from the performance-optimized storage. The APIs and existing clients work with minimal changes. When your application starts reading data from the low-cost tier, you can expect an increase in read latency for the first few bytes. As you start reading the remaining data sequentially from the low-cost tier, you can expect latencies that are similar to the primary storage tier. With tiered storage, you pay for the amount of data you store and the amount of data you retrieve.

For a pricing example, let’s consider a workload where your ingestion rate is 15 MB/s, with a replication factor of 3, and you want to retain data in your Kafka cluster for 7 days. For such a workload, it requires 6x m5.large brokers, with 32.4 TB EBS storage, which costs $4,755. But if you use tiered storage for the same workload with local retention of 4 hours and overall data retention of 7 days, it requires 3x m5.large brokers, with 0.8 TB EBS storage and 9 TB of tiered storage, which costs $1,584. If you want to read all the historic data at once, it costs $13 ($0.0015 per GB retrieval cost). In this example with tiered storage, you save around 66% of your overall cost.

Get started using Amazon MSK tiered storage

To enable tiered storage on your existing cluster, upgrade your MSK cluster to Kafka version 2.8.2.tiered and then choose Tiered storage and EBS storage as your cluster storage mode on the Amazon MSK console.

After tiered storage is enabled on the cluster level, run the following command to enable tiered storage on an existing topic. In this example, you’re enabling tiered storage on a topic called msk-ts-topic with 7 days’ retention (local.retention.ms=604800000) for a local high-performance storage tier, setting 180 days’ retention (retention.ms=15550000000) to retain the data in the low-cost storage tier, and updating the log segment size to 48 MB:

bin/kafka-configs.sh --bootstrap-server $bsrv --alter --entity-type topics --entity-name msk-ts-topic --add-config 'remote.storage.enable=true, local.retention.ms=604800000, retention.ms=15550000000, segment.bytes=50331648'

Availability and pricing

Amazon MSK tiered storage is available in all AWS regions where Amazon MSK is available excluding the AWS China, AWS GovCloud regions. This low-cost storage tier scales to virtually unlimited storage and requires no upfront provisioning. You pay only for the volume of data retained and retrieved in the low-cost tier.

For more information about this feature and its pricing, see the Amazon MSK developer guide and Amazon MSK pricing page. For finding the right sizing for your cluster, see the best practices page.

Summary

With Amazon MSK tiered storage you don’t need to provision storage for the low-cost tier or manage the infrastructure. Tiered storage enables you to scale to virtually unlimited storage. You can access data in the low-cost tier using the same clients you currently use to read data from the high-performance primary storage tier. Apache Kafka’s consumer API, streams API, and connectors consume data from both tiers without changes. You can modify the retention limits on the low-cost storage tier similarly as to how you can modify the retention limits on the high-performance storage.

Enable tiered storage on your MSK clusters today to retain data longer at a lower cost.


About the Author

Masudur Rahaman Sayem is a Streaming Architect at AWS. He works with AWS customers globally to design and build data streaming architecture to solve real-world business problems. He is passionate about distributed systems. He also likes to read, especially classic comic books.

Use MSK Connect for managed MirrorMaker 2 deployment with IAM authentication

Post Syndicated from Tanner Pratt original https://aws.amazon.com/blogs/big-data/use-msk-connect-for-managed-mirrormaker-2-deployment-with-iam-authentication/

In this post, we show how to use MSK Connect for MirrorMaker 2 deployment with AWS Identity and Access Management (IAM) authentication. We create an MSK Connect custom plugin and IAM role, and then replicate the data between two existing Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters. The goal is to have replication successfully running between two MSK clusters that are using IAM as an authentication mechanism. It’s important to note that although we’re using IAM authentication in this solution, this can be accomplished using no authentication for the MSK authentication mechanism.

Solution overview

This solution can help Amazon MSK users run MirrorMaker 2 on MSK Connect, which eases the administrative and operational burden because the service handles the underlying resources, enabling you to focus on the connectors and data to ensure correctness. The following diagram illustrates the solution architecture.

Apache Kafka is an open-source platform for streaming data. You can use it to build building various workloads like IoT connectivity, data analytic pipelines, or event-based architectures.

Kafka Connect is a component of Apache Kafka that provides a framework to stream data between systems like databases, object stores, and even other Kafka clusters, into and out of Kafka. Connectors are the executable applications that you can deploy on top of the Kafka Connect framework to stream data into or out of Kafka.

MirrorMaker is the cross-cluster data mirroring mechanism that Apache Kafka provides to replicate data between two clusters. You can deploy this mirroring process as a connector in the Kafka Connect framework to improve the scalability, monitoring, and availability of the mirroring application. Replication between two clusters is a common scenario when needing to improve data availability, migrate to a new cluster, aggregate data from edge clusters into a central cluster, copy data between Regions, and more. In KIP-382, MirrorMaker 2 (MM2) is documented with all the available configurations, design patterns, and deployment options available to users. It’s worthwhile to familiarize yourself with the configurations because there are many options that can impact your unique needs.

MSK Connect is a managed Kafka Connect service that allows you to deploy Kafka connectors into your environment with seamless integrations with AWS services like IAM, Amazon MSK, and Amazon CloudWatch.

In the following sections, we walk you through the steps to configure this solution:

  1. Create an IAM policy and role.
  2. Upload your data.
  3. Create a custom plugin.
  4. Create and deploy connectors.

Create an IAM policy and role for authentication

IAM helps users securely control access to AWS resources. In this step, we create an IAM policy and role that has two critical permissions:

A common mistake made when creating an IAM role and policy needed for common Kafka tasks (publishing to a topic, listing topics) is to assume that the AWS managed policy AmazonMSKFullAccess (arn:aws:iam::aws:policy/AmazonMSKFullAccess) will suffice for permissions.

The following is an example of a policy with both full Kafka and Amazon MSK access:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*",
                "kafka:*",
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

This policy supports the creation of the cluster within the AWS account infrastructure and grants access to the components that make up the cluster anatomy like Amazon Elastic Compute Cloud (Amazon EC2), Amazon Virtual Private Cloud (Amazon VPC), logs, and kafka:*. There is no managed policy for a Kafka administrator to have full access on the cluster itself.

After you create the KafkaAdminFullAccess policy, create a role and attach the policy to it. You need two entries on the role’s Trust relationships tab:

  • The first statement allows Kafka Connect to assume this role and connect to the cluster.
  • The second statement follows the pattern arn:aws:sts::(YOUR ACCOUNT NUMBER):assumed-role/(YOUR ROLE NAME)/(YOUR ACCOUNT NUMBER). Your account number should be the same account number where MSK Connect and the role are being created in. This role is the role you’re editing the trust entity on. In the following example code, I’m editing a role called MSKConnectExample in my account. This is so that when MSK Connect assumes the role, the assumed user can assume the role again to publish and consume records on the target cluster.

In the following example trust policy, provide your own account number and role name:

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Principal": {
				"Service": "kafkaconnect.amazonaws.com"
			},
			"Action": "sts:AssumeRole"
		},
		{
			"Effect": "Allow",
			"Principal": {
				"AWS": "arn:aws:sts::123456789101:assumed-role/MSKConnectExampleRole/123456789101"
			},
			"Action": "sts:AssumeRole"
		}
	]
}

Now we’re ready to deploy MirrorMaker 2.

Upload data

MSK Connect custom plugins accept a file or folder with a .jar or .zip ending. For this step, create a dummy folder or file and compress it. Then upload the .zip object to your Amazon Simple Storage Service (Amazon S3) bucket:

mkdir mm2 
zip mm2.zip mm2 
aws s3 cp mm2.zip s3://mytestbucket/

Because Kafka and subsequently Kafka Connect have MirrorMaker libraries built in, you don’t need to add additional JAR files for this functionality. MSK Connect has a prerequisite that a custom plugin needs to be present at connector creation, so we have to create an empty one just for reference. It doesn’t matter what the contents of the file are or what the folder contains, as long as there is an object in Amazon S3 that is accessible to MSK Connect, so MSK Connect has access to MM2 classes.

Create a custom plugin

On the Amazon MSK console, follow the steps to create a custom plugin from the .zip file. Enter the object’s Amazon S3 URI and for this post, and name the plugin Mirror-Maker-2.

custom plugin console

Create and deploy connectors

You need to deploy three connectors for a successful mirroring operation:

  • MirrorSourceConnector
  • MirrorHeartbeatConnector
  • MirrorCheckpointConnector

Complete the following steps for each connector:

  1. On the Amazon MSK console, choose Create connector.
  2. For Connector name, enter the name of your first connector.
    connector properties name
  3. Select the target MSK cluster that the data is mirrored to as a destination.
  4. Choose IAM as the authentication mechanism.
    select cluster
  5. Pass the config into the connector.
    connector config

Connector config files are JSON-formatted config maps for the Kafka Connect framework to use in passing configurations to the executable JAR. When using the MSK Connect console, we must convert the config file from a JSON config file to single-lined key=value (with no spaces) file.

You need to change some values within the configs for deployment, namely bootstrap.server, sasl.jaas.config and tasks.max. Note the placeholders in the following code for all three configs.

The following code is for MirrorHeartBeatConnector:

connector.class=org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
source.cluster.alias=source
target.cluster.alias=target
clusters=source,target
source.cluster.bootstrap.servers=(SOURCE BOOTSTRAP SERVERS)
target.cluster.security.protocol=SASL_SSL
target.cluster.producer.security.protocol=SASL_SSL
target.cluster.consumer.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=AWS_MSK_IAM
target.cluster.producer.sasl.mechanism=AWS_MSK_IAM
target.cluster.consumer.sasl.mechanism=AWS_MSK_IAM
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role):role/mck-role" awsDebugCreds=true;
target.cluster.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.security.protocol=SASL_SSL
source.cluster.producer.security.protocol=SASL_SSL
source.cluster.consumer.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=AWS_MSK_IAM
source.cluster.producer.sasl.mechanism=AWS_MSK_IAM
source.cluster.consumer.sasl.mechanism=AWS_MSK_IAM
source.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
topics=.*
topics.exclude=.*[-.]internal, .*.replica, __.*, .*-config, .*-status, .*-offset
groups.exclude=console-consumer-.*, connect-.*, __.*
refresh.groups.enabled=true
refresh.groups.interval.seconds=60
emit.checkpoints.enabled=true
consumer.auto.offset.reset=earliest
producer.linger.ms=500
producer.retry.backoff.ms=1000
producer.max.block.ms=10000
replication.factor=3
tasks.max=1
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

The following code is for MirrorCheckpointConnector:

connector.class=org.apache.kafka.connect.mirror.MirrorCheckpointConnector
source.cluster.alias=source
target.cluster.alias=target
clusters=source,target
source.cluster.bootstrap.servers=(Source Bootstrap Servers)
target.cluster.bootstrap.servers=(Target Bootstrap Servers)
target.cluster.security.protocol=SASL_SSL
target.cluster.producer.security.protocol=SASL_SSL
target.cluster.consumer.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=AWS_MSK_IAM
target.cluster.producer.sasl.mechanism=AWS_MSK_IAM
target.cluster.consumer.sasl.mechanism=AWS_MSK_IAM
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.security.protocol=SASL_SSL
source.cluster.producer.security.protocol=SASL_SSL
source.cluster.consumer.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=AWS_MSK_IAM
source.cluster.producer.sasl.mechanism=AWS_MSK_IAM
source.cluster.consumer.sasl.mechanism=AWS_MSK_IAM
source.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::(Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
topics=.*
topics.exclude=.*[-.]internal, .*.replica, __.*, .*-config, .*-status, .*-offset
groups.exclude=console-consumer-.*, connect-.*, __.*
refresh.groups.enabled=true
refresh.groups.interval.seconds=60
emit.checkpoints.enabled=true
consumer.auto.offset.reset=earliest
producer.linger.ms=500
producer.retry.backoff.ms=1000
producer.max.block.ms=10000
replication.factor=3
tasks.max=1
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
sync.group.offsets.interval.seconds=5

The following code is for MirrorSourceConnector:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
# See note below about the recommendations
tasks.max=(NUMBER OF TASKS)
clusters=source,target
source.cluster.alias=source
target.cluster.alias=target
source.cluster.bootstrap.servers=(SOURCE BOOTSTRAP-SERVER)
source.cluster.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.producer.security.protocol=SASL_SSL
source.cluster.producer.sasl.mechanism=AWS_MSK_IAM
source.cluster.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam:: (Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
source.cluster.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam:: (Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.consumer.security.protocol=SASL_SSL
source.cluster.consumer.sasl.mechanism=AWS_MSK_IAM
source.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam:: (Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
source.cluster.sasl.mechanism=AWS_MSK_IAM
source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.bootstrap.servers=(TARGET BOOTSTRAP-SERVER)
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam:: (Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.producer.sasl.mechanism=AWS_MSK_IAM
target.cluster.producer.security.protocol=SASL_SSL
target.cluster.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam:: (Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.security.protocol=SASL_SSL
target.cluster.consumer.sasl.mechanism=AWS_MSK_IAM
target.cluster.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam:: (Your Account Number):role/(Your IAM role)" awsDebugCreds=true;
target.cluster.sasl.mechanism=AWS_MSK_IAM
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
refresh.groups.enabled=true
refresh.groups.interval.seconds=60
refresh.topics.interval.seconds=60
topics.exclude=.*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset
emit.checkpoints.enabled=true
topics=.*
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
producer.max.block.ms=10000
producer.linger.ms=500
producer.retry.backoff.ms=1000
sync.topic.configs.enabled=true
sync.topic.configs.interval.seconds=60
refresh.topics.enabled=true
groups.exclude=console-consumer-.*,connect-.*,__.*
consumer.auto.offset.reset=earliest
replication.factor=3

A general guideline for the number of tasks for a MirrorSourceConnector is one task per up to 10 partitions to be mirrored. For example, if a Kafka cluster has 15 topics with 12 partitions each for a total partition count of 180 partitions, we deploy at least 18 tasks for mirroring the workload.

Exceeding the recommended number of tasks for the source connector may lead to offsets that aren’t translated (negative consumer group offsets). For more information about this issue and its workarounds, refer to MM2 may not sync partition offsets correctly.

  1. For the heartbeat and checkpoint connectors, use provisioned scale with one worker, because there is only one task running for each of them.
  2. For the source connector, we set the maximum number of workers to the value decided for the tasks.max property.
    Note that we use the defaults of the auto scaling threshold settings for now.
    worker properties
  3. Although it’s possible to pass custom worker configurations, let’s leave the default option selected.
    worker config
  4. In the Access permissions section, we use the IAM role that we created earlier that has a trust relationship with kafkaconnect.amazonaws.com and kafka-cluster:* permissions. Warning signs display above and below the drop-down menu. These are to remind you that IAM roles and attached policies is a common reason why connectors fail. If you never get any log output upon connector creation, that is a good indicator of an improperly configured IAM role or policy permission problem.
    connect iam role
    On the bottom of this page is a warning box telling us not to use the aptly named AWSServiceRoleForKafkaConnect role. This is an AWS managed service role that MSK Connect needs to perform critical, behind-the-scenes functions upon connector creation. For more information, refer to Using Service-Linked Roles for MSK Connect.
  5. Choose Next.
    Depending on the authorization mechanism chosen when aligning the connector with a specific cluster (we chose IAM), the options in the Security section are preset and unchangeable. If no authentication was chosen and your cluster allows plaintext communication, that option is available under Encryption – in transit.
  6. Choose Next to move to the next page.
    access and encryption
  7. Choose your preferred logging destination for MSK Connect logs. For this post, I select Deliver to Amazon CloudWatch Logs and choose the log group ARN for my MSK Connect logs.
  8. Choose Next.
    logs properties
  9. Review your connector settings and choose Create connector.

A message appears indicating either a successful start to the creation process or immediate failure. You can now navigate to the Log groups page on the CloudWatch console and wait for the log stream to appear.

The CloudWatch logs indicate when connectors are successful or have failed faster than on the Amazon MSK console. You can see a log stream in your chosen log group get created within a few minutes after you create your connector. If your log stream never appears, this is an indicator that there was a misconfiguration in your connector config or IAM role and it won’t work.

cloudwatch

Verify that the connector launched successfully

In this section, we walk through two confirmation steps to determine a successful launch.

Check the log stream

Open the log stream that your connector is writing to. In the log, you can check if the connector has successfully launched and is publishing data to the cluster. In the following screenshot, we can confirm data is being published.

cloudwatch logs

Mirror data

The second step is to create a producer to send data to the source cluster. We use the console producer and consumer that Kafka ships with. You can follow Step 1 from the Apache Kafka quickstart.

  1. On a client machine that can access Amazon MSK, download Kafka from https://kafka.apache.org/downloads and extract it:
    tar -xzf kafka_2.13-3.1.0.tgz
    cd kafka_2.13-3.1.0

  2. Download the latest stable JAR for IAM authentication from the repository. As of this writing, it is 1.1.3:
    cd libs/
    wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.3/aws-msk-iam-auth-1.1.3-all.jar

  3. Next, we need to create our client.properties file that defines our connection properties for the clients. For instructions, refer to Configure clients for IAM access control. Copy the following example of the client.properties file:
    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

    You can place this properties file anywhere on your machine. For ease of use and simple referencing, I place mine inside kafka_2.13-3.1.0/bin.
    After we create the client.properties file and place the JAR in the libs directory, we’re ready to create the topic for our replication test.

  4. From the bin folder, run the kafka-topics.sh script:
    ./kafka-topics.sh --bootstrap-server $bss --create --topic MirrorMakerTest --replication-factor 2 --partitions 1 --command-config client.properties

    The details of the command are as follows:
    –bootstrap-server – Your bootstrap server of the source cluster.
    –topic – The topic name you want to create.
    –create – The action for the script to perform.
    –replication-factor – The replication factor for the topic.
    –partitions – Total number of partitions to create for the topic.
    –command-config – Additional configurations needed for successful running. Here is where we pass in the client.properties file we created in the previous step.

  5. We can list all the topics to see that it was successfully created:
    ./kafka-topics.sh --bootstrap-server $bss --list --command-config client.properties

    When defining bootstrap servers, it’s recommended to use one broker from each Availability Zone. For example:

    export bss=broker1:9098,broker2:9098,broker3:9098

    Similar to the create topic command, the preceding step simply calls list to show all topics available on the cluster. We can run this same command on our target cluster to see if MirrorMaker has replicated the topic.
    With our topic created, let’s start the consumer. This consumer is consuming from the target cluster. When the topic is mirrored with the default replication policy, it will have a source. prefixed to it.

  6. For our topic, we consume from source.MirrorMakerTest as shown in the following code:
    ./kafka-console-consumer.sh --bootstrap-server $targetcluster --topic source.MirrorMakerTest --consumer.config client.properties

    The details of the code are as follows:
    –bootstrap-server – Your target MSK bootstrap servers
    –topic – The mirrored topic
    –consumer.config – Where we pass in our client.properties file again to instruct the client how to authenticate to the MSK cluster
    After this step is successful, it leaves a consumer running all the time on the console until we either close the client connection or close our terminal session. You won’t see any messages flowing yet because we haven’t started producing to the source topic on the source cluster.

  7. Open a new terminal window, leaving the consumer open, and start the producer:
    ./kafka-console-producer.sh --bootstrap-server $bss --topic MirrorMakerTest --producer.config client.properties

    The details of the code are as follows:
    –bootstrap-server – The source MSK bootstrap servers
    –topic – The topic we’re producing to
    –producer.config – The client.properties file indicating which IAM authentication properties to use

    After this is successful, the console returns >, which indicates that it’s ready to produce what we type. Let’s produce some messages, as shown in the following screenshot. After each message, press Enter to have the client produce to the topic.

    producer input

    Switching back to the consumer’s terminal window, you should see the same messages being replicated and now showing on your console’s output.

    consumer output

Clean up

We can close the client connections now by pressing Ctrl+C to close the connections or by simply closing the terminal windows.

We can delete the topics on both clusters by running the following code:

./kafka-topics.sh --bootstrap-server $bss --delete --topic MirrorMakerTest --command-config client.properties

Delete the source cluster topic first, then the target cluster topic.

Finally, we can delete the three connectors via the Amazon MSK console by selecting them from the list of connectors and choosing Delete.

Conclusion

In this post, we showed how to use MSK Connect for MM2 deployment with IAM authentication. We successfully deployed the Amazon MSK custom plugin, and created the MM2 connector along with the accompanying IAM role. Then we deployed the MM2 connector onto our MSK Connect instances and watched as data was replicated successfully between two MSK clusters.

Using MSK Connect to deploy MM2 eases the administrative and operational burden of Kafka Connect and MM2, because the service handles the underlying resources, enabling you to focus on the connectors and data. The solution removes the need to have a dedicated infrastructure of a Kafka Connect cluster hosted on Amazon services like Amazon Elastic Compute Cloud (Amazon EC2), AWS Fargate, or Amazon EKS. The solution also automatically scales the resources for you (if configured to do so), which eliminates the need for the administers to check if the resources are scaling to meet demand. Additionally, using the Amazon managed service MSK Connect allows for easier compliance and security adherence for Kafka teams.

If you have any feedback or questions, please leave a comment.


About the Authors

tannerTanner Pratt is a Practice Manager at Amazon Web Services. Tanner is leading a team of consultants focusing on Amazon streaming services like Managed Streaming for Apache Kafka, Kinesis Data Streams/Firehose and Kinesis Data Analytics.

edberezEd Berezitsky is a Senior Data Architect at Amazon Web Services.Ed helps customers design and implement solutions using streaming technologies, and specializes on Amazon MSK and Apache Kafka.

Split your monolithic Apache Kafka clusters using Amazon MSK Serverless

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/split-your-monolithic-apache-kafka-clusters-using-amazon-msk-serverless/

Today, many companies are building real-time applications to improve their customer experience and get immediate insights from their data before it loses its value. As the result, companies have been facing increasing demand to provide data streaming services such as Apache Kafka for developers. To meet this demand, companies typically start with a small- or medium-sized, centralized Apache Kafka cluster to build a global streaming service. Over time, they scale the capacity of the cluster to match the demand for streaming. They choose to keep a monolithic cluster to simplify staffing and training by bringing all technical expertise in a single place. This approach also has cost benefits because it reduces the technical debt, overall operational costs, and complexity. In a monolithic cluster, the extra capacity is shared among all applications, therefore it usually reduces the overall streaming infrastructure cost.

In this post, I explain a few challenges with a centralized approach, and introduce two strategies for implementing a decentralized approach, using Amazon MSK Serverless. A decentralized strategy enables you to provision multiple Apache Kafka clusters instead of a monolithic one. I discuss how this strategy helps you optimize clusters per your application’s security, storage, and performance needs. I also discuss the benefits of a decentralized model and how to migrate from a monolithic cluster to a multi-cluster deployment model.

MSK Serverless can reduce the overhead and cost of managing Apache Kafka clusters. It automatically provisions and scales compute and storage resources for Apache Kafka clusters and automatically manages cluster capacity. It monitors how the partitions are distributed across the backend nodes and reassigns the partitions automatically when necessary. It integrates with other AWS services such as Amazon CloudWatch, where you can monitor the health of the cluster. The choice of MSK Serverless in this post is deliberate, even though the concepts can be applied to the Amazon MSK provisioned offering as well.

Overview of solution

Apache Kafka is an open-source, high-performance, fault-tolerant, and scalable platform for building real-time streaming data pipelines and applications. Apache Kafka simplifies producing and consuming streaming data by decoupling producers from the consumers. Producers simply interact with a single data store (Apache Kafka) to send their data. Consumers read the continuously flowing data, independent from the architecture or the programming language of the producers.

Apache Kafka is a popular choice for many use cases, such as:

  • Real-time web and log analytics
  • Transaction and event sourcing
  • Messaging
  • Decoupling microservices
  • Streaming ETL (extract, transform, and load)
  • Metrics and log aggregation

Challenges with a monolithic Apache Kafka cluster

Monolithic Apache Kafka saves companies from having to install and maintain multiple clusters in their data centers. However, this approach comes with common disadvantages:

  • The entire streaming capacity is consolidated in one place, making capacity planning difficult and complicated. You typically need more time to plan and reconfigure the cluster. For example, when preparing for sales or large campaign events, it’s hard to predict and calculate an aggregation of needed capacity across all applications. This can also inhibit the growth of your company because reconfiguring a large cluster for a new workload often takes longer than a small cluster.
  • Organizational conflicts may occur regarding the ownership and maintenance of the Apache Kafka cluster, because a monolithic cluster is a shared resource.
  • The Apache Kafka cluster becomes a single point of failure. Any downtime means the outage of all related applications.
  • If you choose to increase Apache Kafka’s resiliency with a multi-datacenter deployment, then you typically must have a cluster with the same size (as large) in the other data center, which is expensive.
  • Maintenance and operation activities, such as version upgrades or installing OS patches, take significantly longer for larger clusters due to the distributed nature of Apache Kafka architecture.
  • A faulty application can impact the reliability of the whole cluster and other applications.
  • Version upgrades have to wait until all applications are tested with the new Apache Kafka version. This limits any application from experimenting with Apache Kafka features quickly.
  • This model makes it difficult to attribute the cost of running the cluster to the applications for chargeback purposes.

The following diagram shows a monolithic Apache Kafka architecture.

diagram shows a monolithic Apache Kafka architecture.

Decentralized model

A decentralized Apache Kafka deployment model involves provisioning, configuring, and maintaining multiple clusters. This strategy generally isn’t preferred because managing multiple clusters requires heavy investments in operational excellence, advanced monitoring, infrastructure as code, security, and hardware procurement in on-premises environments.

However, provisioning decentralized Apache Kafka clusters using MSK Serverless doesn’t require those investments. It can scale the capacity and storage up and down instantly based on the application requirement, adding new workloads or scaling operations without the need for complex capacity planning. It also provides a throughput-based pricing model, and you pay for the storage and throughput that your applications use. Moreover, with MSK Serverless, you no longer need to perform standard maintenance tasks such as Apache Kafka version upgrade, partition reassignments, or OS patching.

With MSK Serverless, you benefit from a decentralized deployment without the operational burden that usually comes with a self-managed Apache Kafka deployment. In this strategy, the DevOps managers don’t have to spend time provisioning, configuring, monitoring, and operating multiple clusters. Instead, they invest more in building more operational tools to onboard more real-time applications.

In the remainder of this post, I discuss different strategies for implementing a decentralized model. Furthermore, I highlight the benefits and challenges of each strategy so you can decide what works best for your organization.

Write clusters and read clusters

In this strategy, write clusters are responsible for ingesting data from the producers. You can add new workloads by creating new topics or creating new MSK Serverless clusters. If you need to scale the size of current workloads, you simply increase the number of partitions of your topics if the ordering isn’t important. MSK Serverless manages the capacity instantly as per the new configuration.

Each MSK Serverless cluster provides up to 200 MBps of write throughput and 400 MBps of read throughput. It also allocates up to 5 MBps of write throughput and 10 MBps of read throughput per partition.

Data consumers within any organization can usually be divided to two main categories:

  • Time-sensitive workloads, which need data with very low latency (such as millisecond or subsecond) and can only tolerate a very short Recovery Time Objective (RTO)
  • Time-insensitive workloads, which can tolerate higher latency (sub-10 seconds to minute-level latency) and longer RTO

Each of these categories also can be further divided into subcategories based on certain conditions, such as data classification, regulatory compliance, or service level agreements (SLAs). Read clusters can be set up according to your business or technical requirements, or even organizational boundaries, which can be used by the specific group of consumers. Finally, the consumers are configured to run against the associated read cluster.

To connect the write clusters to read clusters, a data replication pipeline is necessary. You can build a data replication pipeline many ways. Because MSK Serverless supports the standard Apache Kafka APIs, you can use the standard Apache Kafka tools such as MirrorMaker 2 to set up replications between Apache Kafka clusters.

The following diagram shows the architecture for this strategy.

diagram shows the architecture for this strategy.

This approach has the following benefits:

  • Producers are isolated from the consumers; therefore, your write throughput can scale independently from your read throughput. For example, if you have reached your max read throughput with existing clusters and need to add a new consumer group, you can simply provision a new MSK Serverless cluster and set up replication between the write cluster and the new read cluster.
  • It helps enforce security and regulatory compliance. You can build streaming jobs that can mask or remove the sensitive fields of data events, such as personally identifiable information (PII), while replicating the data.
  • Different clusters can be configured differently in terms of retention. For example, read clusters can be configured with different maximum retention periods, to save on storage cost depending on their requirements.
  • You can prioritize your response time for outages for certain clusters over the others.
  • For implementing increased resiliency, you can have fewer clusters in the backup Region by only replicating the data from the write clusters. Other clusters such as read clusters can be provisioned when the workload failover is invoked. In this model, with the MSK Serverless pricing model, you pay additionally for what you use (lighter replica) in the backup Region.

There are a few important notes to keep in mind when choosing this strategy:

  • It requires setting up multiple replications between clusters, which comes with additional operational and maintenance complexity.
  • Replication tools such as MirrorMaker 2 only support at-least-once processing semantics. This means that during failures and restarts, data events can be duplicated. If you have consumers that can’t tolerate data duplication, I suggest building data pipelines that support the exactly-once processing semantic for replicating the data, such as Apache Flink, instead of using MirrorMaker 2.
  • Because consumers don’t consume data directly from the write clusters, the latency is increased between the writers and the readers.
  • In this strategy, even though there are multiple Apache Kafka clusters, ownership and control still reside with one team, and the resources are in a single AWS account.

Segregating clusters

For some companies, providing access to Apache Kafka through a central data platform can create scaling, ownership, and accountability challenges. Infrastructure teams may not understand the specific business needs of an application, such as data freshness or latency requirements, security, data schemas, or a specific method needed for data ingestion.

You can often reduce these challenges by giving ownership and autonomy to the team who owns the application. You allow them to build and manage their application and needed infrastructure, rather than only being able to use a common central platform. For instance, development teams are responsible for provisioning, configuring, maintaining, and operating their own Apache Kafka clusters. They’re the domain experts of their application requirements, and they can manage their cluster according to their application needs. This reduces overall friction and puts application teams accountable to their advertised SLAs.

As mentioned before, MSK Serverless minimizes the operation and maintenance work associated with Apache Kafka clusters. This enables the autonomous application teams to manage their clusters according to industry best practices, without needing to be an expert in running highly available Apache Kafka clusters on AWS. If the MSK Serverless cluster is provisioned within their AWS account, they also own all the costs associated with operating their applications and the data streaming services.

The following diagram shows the architecture for this strategy.

diagram shows the architecture for this strategy.

This approach has the following benefits:

  • MSK Serverless clusters are managed by different teams; therefore, the overall management work is minimized.
  • Applications are isolated from each other. A faulty application or downtime of a cluster doesn’t impact other applications.
  • Consumers read data directly with low latency from the same cluster where the data is written.
  • Each MSK Serverless cluster scales differently per its write and read throughput.
  • Simple cost attribution means that application teams own their infrastructure and its cost.
  • Total ownership of the streaming infrastructure allows developers to adopt streaming faster and deliver more functionalities. It may also help shorten their response time to failures and outages.

Compared to the previous strategy, this approach has the following disadvantages:

  • It’s difficult to enforce a unified security or regulatory compliance across many teams.
  • Duplicate copies of the same data may be ingested in multiple clusters. This increases the overall cost.
  • To increase resiliency, each team individually needs to set up replications between MSK Serverless clusters.

Moving from a centralized to decentralized strategy

MSK Serverless provides AWS Command Line Interface (AWS CLI) tools and support for AWS CloudFormation templates for provisioning clusters in minutes. You can implement any of the strategies that I mentioned earlier via the methods AWS provides, and migrate your producers and consumers when the new clusters are ready.

The following steps provide further guidance on implementation of these strategies:

  1. Begin by focusing on the current issues with the monolithic Apache Kafka. Next, compare the challenges with the benefits and disadvantages, as listed under each strategy. This helps you decide which strategy serves your company the best.
  2. Identify and document each application’s performance, resiliency, SLA, and ownership requirements separately.
  3. Attempt grouping applications that have similar requirements. For example, you may find a few applications that run batch analytics; therefore, they’re not sensitive to data freshness and also don’t need access to sensitive (or PII) data. If you decide segregating clusters is the right strategy for your company, you may choose to group applications by the team who owns them.
  4. Compare each group of applications’ storage and streaming throughput requirements against the MSK Serverless quotas. This helps you determine whether one MSK Serverless cluster can provide the needed aggregated streaming capacity. Otherwise, further divide larger groups to smaller ones.
  5. Create MSK Serverless clusters per each group you identified earlier via the AWS Management Console, AWS CLI, or CloudFormation templates.
  6. Identify the topics that correspond to each new MSK Serverless cluster.
  7. Choose the best migration pattern to Amazon MSK according to the replication requirements. For example, when you don’t need data transformation, and duplicate data events can be tolerated by applications, you can use Apache Kafka migration tools such as MirrorMaker 2.0.
  8. After you have verified the data is replicating correctly to the new clusters, first restart the consumers against the new cluster. This ensures no data will be lost as the result of the migration.
  9. After the consumers resume processing data, restart the producers against the new cluster, and shut down the replication pipeline you created earlier.

As of this writing, MSK Serverless only supports AWS Identity and Access Management (IAM) for authentication and access control. For more information, refer to Securing Apache Kafka is easy and familiar with IAM Access Control for Amazon MSK. If your applications use other methods supported by Apache Kafka, you need to modify your application code to use IAM Access Control instead or use the Amazon MSK provisioned offering.

Summary

MSK Serverless eliminates operational overhead, including the provisioning, configuration, and maintenance of highly available Apache Kafka. In this post, I showed how splitting Apache Kafka clusters helps improve the security, performance, scalability, and reliability of your overall data streaming services and applications. I also described two main strategies for splitting a monolithic Apache Kafka cluster using MSK Serverless. If you’re using Amazon MSK provisioned offering, these strategies are still relevant when considering moving from a centralized to a decentralized model. You can decide the right strategy depending on your company’s specific needs.

For further reading on Amazon MSK, visit the official product page.


About the Author

About the author Ali AlemiAli Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.