All posts by Vikas Bajaj

Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka Quotas – Part 1

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/multi-tenancy-apache-kafka-clusters-in-amazon-msk-with-iam-access-control-and-kafka-quotas-part-1/

With Amazon Managed Streaming for Apache Kafka (Amazon MSK), you can build and run applications that use Apache Kafka to process streaming data. To process streaming data, organizations either use multiple Kafka clusters based on their application groupings, usage scenarios, compliance requirements, and other factors, or a dedicated Kafka cluster for the entire organization. It doesn’t matter what pattern is used, Kafka clusters are typically multi-tenant, allowing multiple producer and consumer applications to consume and produce streaming data simultaneously.

With multi-tenant Kafka clusters, however, one of the challenges is to make sure that data consumer and producer applications don’t overuse cluster resources. There is a possibility that a few poorly behaved applications may overuse cluster resources, affecting the well-behaved applications as a result. Therefore, teams who manage multi-tenant Kafka clusters need a mechanism to prevent applications from overconsuming cluster resources in order to avoid issues. This is where Kafka quotas come into play. Kafka quotas control the amount of resources client applications can use within a Kafka cluster.

In Part 1 of this two-part series, we explain the concepts of how to enforce Kafka quotas in MSK multi-tenant Kafka clusters while using AWS Identity and Access Management (IAM) access control for authentication and authorization. In Part 2, we cover detailed implementation steps along with sample Kafka client applications.

Brief introduction to Kafka quotas

Kafka quotas control the amount of resources client applications can use within a Kafka cluster. It’s possible for the multi-tenant Kafka cluster to experience performance degradation or a complete outage due to resource constraints if one or more client applications produce or consume large volumes of data or generate requests at a very high rate for a continuous period of time, monopolizing Kafka cluster’s resources.

To prevent applications from overwhelming the cluster, Apache Kafka allows configuring quotas that determine how much traffic each client application produces and consumes per Kafka broker in a cluster. Kafka brokers throttle the client applications’ requests in accordance with their allocated quotas. Kafka quotas can be configured for specific users, or specific client IDs, or both. The client ID is a logical name defined in the application code that Kafka brokers use to identify which application sent messages. The user represents the authenticated user principal of a client application in a secure Kafka cluster with authentication enabled.

There are two types of quotas supported in Kafka:

  • Network bandwidth quotas – The byte-rate thresholds define how much data client applications can produce to and consume from each individual broker in a Kafka cluster measured in bytes per second.
  • Request rate quotas – This limits the percentage of time each individual broker spends processing client applications requests.

Depending on the business requirements, you can use either of these quota configurations. However, the use of network bandwidth quotas is common because it allows organizations to cap platform resources consumption according to the amount of data produced and consumed by applications per second.

Because this post uses an MSK cluster with IAM access control, we specifically discuss configuring network bandwidth quotas based on the applications’ client IDs and authenticated user principals.

Considerations for Kafka quotas

Keep the following in mind when working with Kafka quotas:

  • Enforcement level – Quotas are enforced at the broker level rather than at the cluster level. Suppose there are six brokers in a Kafka cluster and you specify a 12 MB/sec produce quota for a client ID and user. The producer application using the client ID and user can produce a max of 12MB/sec on each broker at the same time, for a total of max 72 MB/sec across all six brokers. However, if leadership for every partition of a topic resides on one broker, the same producer application can only produce a max of 12 MB/sec. Due to the fact that throttling occurs per broker, it’s essential to maintain an even balance of topics’ partitions leadership across all the brokers.
  • Throttling – When an application reaches its quota, it is throttled, not failed, meaning the broker doesn’t throw an exception. Clients who reach their quota on a broker will begin to have their requests throttled by the broker to prevent exceeding the quota. Instead of sending an error when a client exceeds a quota, the broker attempts to slow it down. Brokers calculate the amount of delay necessary to bring clients under quotas and delay responses accordingly. As a result of this approach, quota violations are transparent to clients, and clients don’t have to implement any special backoff or retry policies. However, when using an asynchronous producer and sending messages at a rate greater than the broker can accept due to quota, the messages will be queued in the client application memory first. The client will eventually run out of buffer space if the rate of sending messages continues to exceed the rate of accepting messages, causing the next Producer.send() call to be blocked. Producer.send() will eventually throw a TimeoutException if the timeout delay isn’t sufficient to allow the broker to catch up to the producer application.
  • Shared quotas – If more than one client application has the same client ID and user, the quota configured for the client ID and user will be shared among all those applications. Suppose you configure a produce quota of 5 MB/sec for the combination of client-id="marketing-producer-client" and user="marketing-app-user". In this case, all producer applications that have marketing-producer-client as a client ID and marketing-app-user as an authenticated user principal will share the 5 MB/sec produce quota, impacting each other’s throughput.
  • Produce throttling – The produce throttling behavior is exposed to producer clients via client metrics such as produce-throttle-time-avg and produce-throttle-time-max. If these are non-zero, it indicates that the destination brokers are slowing the producer down and the quotas configuration should be reviewed.
  • Consume throttling – The consume throttling behavior is exposed to consumer clients via client metrics such as fetch-throttle-time-avg and fetch-throttle-time-max. If these are non-zero, it indicates that the origin brokers are slowing the consumer down and the quotas configuration should be reviewed.

Note that client metrics are metrics exposed by clients connecting to Kafka clusters.

  • Quota configuration – It’s possible to configure Kafka quotas either statically through the Kafka configuration file or dynamically through kafka-config.sh or the Kafka Admin API. The dynamic configuration mechanism is much more convenient and manageable because it allows quotas for the new producer and consumer applications to be configured at any time without having to restart brokers. Even while application clients are producing or consuming data, dynamic configuration changes take effect in real time.
  • Configuration keys – With the kafka-config.sh command-line tool, you can set dynamic consume, produce, and request quotas using the following three configuration keys, respectively: consumer_byte_rate, producer_byte_rate, and request_percentage.

For more information about Kafka quotas, refer to Kafka documentation.

Enforce network bandwidth quotas with IAM access control

Following our understanding of Kafka quotas, let’s look at how to enforce them in an MSK cluster while using IAM access control for authentication and authorization. IAM access control in Amazon MSK eliminates the need for two separate mechanisms for authentication and authorization.

The following figure shows an MSK cluster that is configured to use IAM access control in the demo account. Each producer and consumer application has a quota that determines how much data they can produce or consume in bytes per second. For example, ProducerApp-1 has a produce quota of 1024 bytes/sec, and ConsumerApp-1 and ConsumerApp-2 each have a consume quota of 5120 and 1024 bytes/sec, respectively. It’s important to note that Kafka quotas are set on the Kafka cluster rather than in the client applications.

The preceding figure illustrates how Kafka client applications (ProducerApp-1, ConsumerApp-1, and ConsumerApp-2) access Topic-B in the MSK cluster by assuming write and read IAM roles. The workflow is as follows:

  • P1ProducerApp-1 (via its ProducerApp-1-Role IAM role) assumes the Topic-B-Write-Role IAM role to send messages to Topic-B in the MSK cluster.
  • P2 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B.
  • C1ConsumerApp-1 (via its ConsumerApp-1-Role IAM role) and ConsumerApp-2 (via its ConsumerApp-2-Role IAM role) assume the Topic-B-Read-Role IAM role to read messages from Topic-B in the MSK cluster.
  • C2 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B.

ConsumerApp-1 and ConsumerApp-2 are two separate consumer applications. They do not belong to the same consumer group.

Configuring client IDs and understanding authenticated user principal

As explained earlier, Kafka quotas can be configured for specific users, specific client IDs, or both. Let’s explore client ID and user concepts and configurations required for Kafka quota allocation.

Client ID

A client ID representing an application’s logical name can be configured within an application’s code. In Java applications, for example, you can set the producer’s and consumer’s client IDs using ProducerConfig.CLIENT_ID_CONFIG and ConsumerConfig.CLIENT_ID_CONFIG configurations, respectively. The following code snippet illustrates how ProducerApp-1 sets the client ID to this-is-me-producerapp-1 using ProducerConfig.CLIENT_ID_CONFIG:

Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG,"this-is-me-producerapp-1");

User

The user refers to an authenticated user principal of the client application in the Kafka cluster with authentication enabled. As shown in the solution architecture, producer and consumer applications assume the Topic-B-Write-Role and Topic-B-Read-Role IAM roles, respectively, to perform write and read operations on Topic-B. Therefore, their authenticated user principal will look like the following IAM identifier:

arn:aws:sts::<AWS Account Id>:assumed-role/<assumed Role Name>/<role session name>

For more information, refer to IAM identifiers.

The role session name is a string identifier that uniquely identifies a session when IAM principals, federated identities, or applications assume an IAM role. In our case, ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications assume an IAM role using the AWS Security Token Service (AWS STS) SDK, and provide a role session name in the AWS STS SDK call. For example, if ProducerApp-1 assumes the Topic-B-Write-Role IAM role and uses this-is-producerapp-1-role-session as its role session name, its authenticated user principal will be as follows:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Write-Role/this-is-producerapp-1-role-session

The following is an example code snippet from the ProducerApp-1 application using this-is-producerapp-1-role-session as the role session name while assuming the Topic-B-Write-Role IAM role using the AWS STS SDK:

StsClient stsClient = StsClient.builder().region(region).build();
AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
          .roleArn("<Topic-B-Write-Role ARN>")
          .roleSessionName("this-is-producerapp-1-role-session") //role-session-name string literal
          .build();

Configure network bandwidth (produce and consume) quotas

The following commands configure the produce and consume quotas dynamically for client applications based on their client ID and authenticated user principal in the MSK cluster configured with IAM access control.

The following code configures the produce quota:

kafka-configs.sh --bootstrap-server <MSK cluster bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "producer_byte_rate=<number of bytes per second>" \
--entity-type clients --entity-name <ProducerApp client Id> \
--entity-type users --entity-name <ProducerApp user principal>

The producer_byes_rate refers to the number of messages, in bytes, that a producer client identified by client ID and user is allowed to produce to a single broker per second. The option --command-config points to config_iam.properties, which contains the properties required for IAM access control.

The following code configures the consume quota:

kafka-configs.sh --bootstrap-server <MSK cluster bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=<number of bytes per second>" \
--entity-type clients --entity-name <ConsumerApp client Id> \
--entity-type users --entity-name <ConsumerApp user principal>

The consumer_bytes_rate refers to the number of messages, in bytes, that a consumer client identified by client ID and user allowed to consume from a single broker per second.

Let’s look at some example quota configuration commands for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 client applications:

  • ProducerApp-1 produce quota configuration – Let’s assume ProducerApp-1 has this-is-me-producerapp-1 configured as the client ID in the application code and uses this-is-producerapp-1-role-session as the role session name when assuming the Topic-B-Write-Role IAM role. The following command sets the produce quota for ProducerApp-1 to 1024 bytes per second:
kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "producer_byte_rate=1024" \
--entity-type clients --entity-name this-is-me-producerapp-1 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Write-Role/this-is-producerapp-1-role-session
  • ConsumerApp-1 consume quota configuration – Let’s assume ConsumerApp-1 has this-is-me-consumerapp-1 configured as the client ID in the application code and uses this-is-consumerapp-1-role-session as the role session name when assuming the Topic-B-Read-Role IAM role. The following command sets the consume quota for ConsumerApp-1 to 5120 bytes per second:
kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=5120" \
--entity-type clients --entity-name this-is-me-consumerapp-1 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/this-is-consumerapp-1-role-session


ConsumerApp-2 consume quota configuration
– Let’s assume ConsumerApp-2 has this-is-me-consumerapp-2 configured as the client ID in the application code and uses this-is-consumerapp-2-role-session as the role session name when assuming the Topic-B-Read-Role IAM role. The following command sets the consume quota for ConsumerApp-2 to 1024 bytes per second per broker:

kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=1024" \
--entity-type clients --entity-name this-is-me-consumerapp-2 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/this-is-consumerapp-2-role-session

As a result of the preceding commands, the ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 client applications will be throttled by the MSK cluster using IAM access control if they exceed their assigned produce and consume quotas, respectively.

Implement the solution

Part 2 of this series showcases the step-by-step detailed implementation of Kafka quotas configuration with IAM access control along with the sample producer and consumer client applications.

Conclusion

Kafka quotas offer teams the ability to set limits for producer and consumer applications. With Amazon MSK, Kafka quotas serve two important purposes: eliminating guesswork and preventing issues caused by poorly designed producer or consumer applications by limiting their quota, and allocating operational costs of a central streaming data platform across different cost centers and tenants (application and product teams).

In this post, we learned how to configure network bandwidth quotas within Amazon MSK while using IAM access control. We also covered some sample commands and code snippets to clarify how the client ID and authenticated principal are used when configuring quotas. Although we only demonstrated Kafka quotas using IAM access control, you can also configure them using other Amazon MSK-supported authentication mechanisms.

In Part 2 of this series, we demonstrate how to configure network bandwidth quotas with IAM access control in Amazon MSK and provide you with example producer and consumer applications so that you can see them in action.

Check out the following resources to learn more:


About the Author

Vikas Bajaj is a Senior Manager, Solutions Architects, Financial Services at Amazon Web Services. Having worked with financial services organizations and digital native customers, he advises financial services customers in Australia on technology decisions, architectures, and product roadmaps.

Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka quotas – Part 2

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/multi-tenancy-apache-kafka-clusters-in-amazon-msk-with-iam-access-control-and-kafka-quotas-part-2/

Kafka quotas are integral to multi-tenant Kafka clusters. They prevent Kafka cluster performance from being negatively affected by poorly behaved applications overconsuming cluster resources. Furthermore, they enable the central streaming data platform to be operated as a multi-tenant platform and used by downstream and upstream applications across multiple business lines. Kafka supports two types of quotas: network bandwidth quotas and request rate quotas. Network bandwidth quotas define byte-rate thresholds such as how much data client applications can produce to and consume from each individual broker in a Kafka cluster measured in bytes per second. Request rate quotas limit the percentage of time each individual broker spends processing client applications requests. Depending on your configuration, Kafka quotas can be set for specific users, specific client IDs, or both.

In Part 1 of this two-part series, we discussed the concepts of how to enforce Kafka quotas in Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters while using AWS Identity and Access Management (IAM) access control.

In this post, we walk you through the step-by-step implementation of setting up Kafka quotas in an MSK cluster while using IAM access control and testing them through sample client applications.

Solution overview

The following figure, which we first introduced in Part 1, illustrates how Kafka client applications (ProducerApp-1, ConsumerApp-1, and ConsumerApp-2) access Topic-B in the MSK cluster by assuming write and read IAM roles. Each producer and consumer client application has a quota that determines how much data they can produce or consume in bytes/second. The ProducerApp-1 quota allows it to produce up to 1024 bytes/second per broker. Similarly, the ConsumerApp-1 and ConsumerApp-2 quotas allow them to consume 5120 and 1024 bytes/second per broker, respectively. The following is a brief explanation of the flow shown in the architecture diagram:

  • P1ProducerApp-1 (via its ProducerApp-1-Role IAM role) assumes the Topic-B-Write-Role IAM role to send messages to Topic-B
  • P2 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B
  • C1ConsumerApp-1 (via its ConsumerApp-1-Role IAM role) and ConsumerApp-2 (via its ConsumerApp-2-Role IAM role) assume the Topic-B-Read-Role IAM role to read messages from Topic-B
  • C2 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B

Note that this post uses the AWS Command Line Interface (AWS CLI), AWS CloudFormation templates, and the AWS Management Console for provisioning and modifying AWS resources, and resources provisioned will be billed to your AWS account.

The high-level steps are as follows:

  1. Provision an MSK cluster with IAM access control and Amazon Elastic Compute Cloud (Amazon EC2) instances for client applications.
  2. Create Topic-B on the MSK cluster.
  3. Create IAM roles for the client applications to access Topic-B.
  4. Run the producer and consumer applications without setting quotas.
  5. Configure the produce and consume quotas for the client applications.
  6. Rerun the applications after setting the quotas.

Prerequisites

It is recommended that you read Part 1 of this series before continuing. In order to get started, you need the following:

  • An AWS account that will be referred to as the demo account in this post, assuming that its account ID is 1111 1111 1111
  • Permissions to create, delete, and modify AWS resources in the demo account

Provision an MSK cluster with IAM access control and EC2 instances

This step involves provisioning an MSK cluster with IAM access control in a VPC in the demo account. Additionally, we create four EC2 instances to make configuration changes to the MSK cluster and host producer and consumer client applications.

Deploy CloudFormation stack

  1. Clone the GitHub repository to download the CloudFormation template files and sample client applications:
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git
  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose Create stack.
  3. For Prepare template, select Template is ready.
  4. For Template source, select Upload a template file.
  5. Upload the cfn-msk-stack-1.yaml file from amazon-msk-kafka-quotas/cfn-templates directory, then choose Next.
  6. For Stack name, enter MSKStack.
  7. Leave the parameters as default and choose Next.
  8. Scroll to the bottom of the Configure stack options page and choose Next to continue.
  9. Scroll to the bottom of the Review page, select the check box I acknowledge that CloudFormation may create IAM resources, and choose Submit.

It will take approximately 30 minutes for the stack to complete. After the stack has been successfully created, the following resources will be created:

  • A VPC with three private subnets and one public subnet
  • An MSK cluster with three brokers with IAM access control enabled
  • An EC2 instance called MSKAdminInstance for modifying MSK cluster settings as well as creating and modifying AWS resources
  • EC2 instances for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2, one for each client application
  • A separate IAM role for each EC2 instance that hosts the client application, as shown in the architecture diagram
  1. From the stack’s Outputs tab, note the MSKClusterArn value.

Create a topic on the MSK cluster

To create Topic-B on the MSK cluster, complete the following steps:

  1. On the Amazon EC2 console, navigate to your list of running EC2 instances.
  2. Select the MSKAdminInstance EC2 instance and choose Connect.
  3. On the Session Manager tab, choose Connect.
  4. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Add Kafka binaries to the path
sed -i 's|HOME/bin|HOME/bin:~/kafka/bin|' .bash_profile

# Set your AWS region
aws configure set region <AWS Region>
  1. Set the environment variable to point to the MSK Cluster brokers IAM endpoint:
MSK_CLUSTER_ARN=<Use the value of MSKClusterArn that you noted earlier>
echo "export BOOTSTRAP_BROKERS_IAM=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerStringSaslIam)" >> .bash_profile
source .bash_profile
echo $BOOTSTRAP_BROKERS_IAM
  1. Take note of the value of BOOTSTRAP_BROKERS_IAM.
  2. Run the following Kafka CLI command to create Topic-B on the MSK cluster:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--create --topic Topic-B \
--partitions 3 --replication-factor 3 \
--command-config config_iam.properties

Because the MSK cluster is provisioned with IAM access control, the option --command-config points to config_iam.properties, which contains the properties required for IAM access control, created by the MSKStack CloudFormation stack.

The following warnings may appear when you run the Kafka CLI commands, but you may ignore them:

The configuration 'sasl.jaas.config' was supplied but isn't a known config. 
The configuration 'sasl.client.callback.handler.class' was supplied but isn't a known config.
  1. To verify that Topic-B has been created, list all the topics:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties --list

Create IAM roles for client applications to access Topic-B

This step involves creating Topic-B-Write-Role and Topic-B-Read-Role as shown in the architecture diagram. Topic-B-Write-Role enables write operations on Topic-B, and can be assumed by the ProducerApp-1 . In a similar way, the ConsumerApp-1 and ConsumerApp-2 can assume Topic-B-Read-Role to perform read operations on Topic-B. To perform read operations on Topic-B, ConsumerApp-1 and ConsumerApp-2 must also belong to the consumer groups specified during the MSKStack stack update in the subsequent step.

Create the roles with the following steps:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Select MSKStack and choose Update.
  3. For Prepare template, select Replace current template.
  4. For Template source, select Upload a template file.
  5. Upload the cfn-msk-stack-2.yaml file from amazon-msk-kafka-quotas/cfn-templates directory, then choose Next.
  6. Provide the following additional stack parameters:
    • For Topic B ARN, enter the Topic-B ARN.

The ARN must be formatted as arn:aws:kafka:region:account-id:topic/msk-cluster-name/msk-cluster-uuid/Topic-B. Use the cluster name and cluster UUID from the MSK cluster ARN you noted earlier and provide your AWS Region. For more information, refer to the IAM access control for Amazon MSK.

    • For ConsumerApp-1 Consumer Group name, enter ConsumerApp-1 consumer group ARN.

It must be formatted as arn:aws:kafka:region:account-id:group/msk-cluster-name/msk-cluster-uuid/consumer-group-name

    • For ConsumerApp-2 Consumer Group name, enter ConsumerApp-2 consumer group ARN.

Use a similar format as the previous ARN.

  1. Choose Next to continue.
  2. Scroll to the bottom of the Configure stack options page and choose Next to continue.
  3. Scroll to the bottom of the Review page, select the check box I acknowledge that CloudFormation may create IAM resources, and choose Update stack.

It will take approximately 3 minutes for the stack to update. After the stack has been successfully updated, the following resources will be created:

  • Topic-B-Write-Role – An IAM role with permission to perform write operations on Topic-B. Its trust policy allows the ProducerApp-1-Role IAM role to assume it.
  • Topic-B-Read-Role – An IAM role with permission to perform read operations on Topic-B. Its trust policy allows the ConsumerApp-1-Role and ConsumerApp-2-Role IAM roles to assume it. Furthermore, ConsumerApp-1 and ConsumerApp-2 must also belong to the consumer groups you specified when updating the stack to perform read operations on Topic-B.
  1. From the stack’s Outputs tab, note the TopicBReadRoleARN and TopicBWriteRoleARN values.

Run the producer and consumer applications without setting quotas

Here, we run ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 without setting their quotas. From the previous steps, you will need BOOTSTRAP_BROKERS_IAM value, Topic-B-Write-Role ARN, and Topic-B-Read-Role ARN. The source code of client applications and their packaged versions are available in the GitHub repository.

Run the ConsumerApp-1 application

To run the ConsumerApp-1 application, complete the following steps:

  1. On the Amazon EC2 console, select the ConsumerApp-1 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ConsumerApp-1 application to start consuming messages from Topic-B:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-1 consumer group name> \
--role-session-name <role session name for ConsumerApp-1 to use during STS assume role call> \
--client-id <ConsumerApp-1 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

You can find the source code on GitHub for your reference. The command line parameter details are as follows:

  • –bootstrap-servers – MSK cluster bootstrap brokers IAM endpoint.
  • –assume-role-arnTopic-B-Read-Role IAM role ARN. Assuming this role, ConsumerApp-1 will read messages from the topic.
  • –region – Region you’re using.
  • –topic-name – Topic name from which ConsumerApp-1 will read messages. The default is Topic-B.
  • –consumer-group – Consumer group name for ConsumerApp-1, as specified during the stack update.
  • –role-session-name ConsumerApp-1 assumes the Topic-B-Read-Role using the AWS Security Token Service (AWS STS) SDK. ConsumerApp-1 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID for ConsumerApp-1 .
  • –print-consumer-quota-metrics – Flag indicating whether client metrics should be printed on the terminal by ConsumerApp-1.
  • –cw-dimension-nameAmazon CloudWatch dimension name that will be used to publish client throttling metrics from ConsumerApp-1.
  • –cw-dimension-value – CloudWatch dimension value that will be used to publish client throttling metrics from ConsumerApp-1.
  • –cw-namespace – Namespace where ConsumerApp-1 will publish CloudWatch metrics in order to monitor throttling.
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-1-cg \
--role-session-name consumerapp-1-role-session \
--client-id consumerapp-1-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-1 \
--cw-namespace ConsumerApps

The fetch-throttle-time-avg and fetch-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ConsumerApp-1. Remember that we haven’t set the consume quota for ConsumerApp-1 yet. Let it run for a while.

Run the ConsumerApp-2 application

To run the ConsumerApp-2 application, complete the following steps:

  1. On the Amazon EC2 console, select the ConsumerApp-2 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ConsumerApp-2 application to start consuming messages from Topic-B:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-2 consumer group name> \
--role-session-name <role session name for ConsumerApp-2 to use during STS assume role call> \
--client-id <ConsumerApp-2 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

The code has similar command line parameters details as ConsumerApp-1 discussed previously, except for the following:

  • –consumer-group – Consumer group name for ConsumerApp-2, as specified during the stack update.
  • –role-session-name ConsumerApp-2 assumes the Topic-B-Read-Role using the AWS STS SDK. ConsumerApp-2 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID for ConsumerApp-2 .
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-2-cg \
--role-session-name consumerapp-2-role-session \
--client-id consumerapp-2-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-2 \
--cw-namespace ConsumerApps

The fetch-throttle-time-avg and fetch-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ConsumerApp-2. Remember that we haven’t set the consume quota for ConsumerApp-2 yet. Let it run for a while.

Run the ProducerApp-1 application

To run the ProducerApp-1 application, complete the following steps:

  1. On the Amazon EC2 console, select the ProducerApp-1 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ProducerApp-1 application to start sending messages to Topic-B:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Write-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--num-messages <Number of events> \
--role-session-name <role session name for ProducerApp-1 to use during STS assume role call> \
--client-id <ProducerApp-1 client.id> \
--producer-type <Producer Type, options are sync or async> \
--print-producer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

You can find the source code on GitHub for your reference. The command line parameter details are as follows:

  • –bootstrap-servers – MSK cluster bootstrap brokers IAM endpoint.
  • –assume-role-arnTopic-B-Write-Role IAM role ARN. Assuming this role, ProducerApp-1 will write messages to the topic.
  • –topic-nameProducerApp-1 will send messages to this topic. The default is Topic-B.
  • –region – AWS Region you’re using.
  • –num-messages – Number of messages the ProducerApp-1 application will send to the topic.
  • –role-session-name ProducerApp-1 assumes the Topic-B-Write-Role using the AWS STS SDK. ProducerApp-1 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID of ProducerApp-1 .
  • –producer-typeProducerApp-1can be run either synchronously or asynchronously. Options are sync or async.
  • –print-producer-quota-metrics – Flag indicating whether the client metrics should be printed on the terminal by ProducerApp-1.
  • –cw-dimension-name – CloudWatch dimension name that will be used to publish client throttling metrics from ProducerApp-1.
  • –cw-dimension-value – CloudWatch dimension value that will be used to publish client throttling metrics from ProducerApp-1.
  • –cw-namespace – The namespace where ProducerApp-1 will publish CloudWatch metrics in order to monitor throttling.
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment. To run a synchronous Kafka producer, it uses the option --producer-type sync:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBWriteRole-xxxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--num-messages 10000000 \
--role-session-name producerapp-1-role-session \
--client-id producerapp-1-client-id \
--producer-type sync \
--print-producer-quota-metrics Y \
--cw-dimension-name ProducerApp \
--cw-dimension-value ProducerApp-1 \
--cw-namespace ProducerApps

Alternatively, use --producer-type async to run an asynchronous producer. For more details, refer to Asynchronous send.

The produce-throttle-time-avg and produce-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ProducerApp-1. Remember that we haven’t set the produce quota for ProducerApp-1 yet. Check that ConsumerApp-1 and ConsumerApp-2 can consume messages and notice they are not throttled. Stop the consumer and producer client applications by pressing Ctrl+C in their respective browser tabs.

Set produce and consume quotas for client applications

Now that we have run the producer and consumer applications without quotas, we set their quotas and rerun them.

Open the Sessions Manager terminal for the MSKAdminInstance EC2 instance as described earlier and run the following commands to find the default configuration of one of the brokers in the MSK cluster. MSK clusters are provisioned with the default Kafka quotas configuration.

# Describe Broker-1 default configurations
kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--entity-type brokers \
--entity-name 1 \
--all --describe > broker1_default_configurations.txt
cat broker1_default_configurations.txt | grep quota.consumer.default
cat broker1_default_configurations.txt | grep quota.producer.default

The following screenshot shows the Broker-1 default values for quota.consumer.default and quota.producer.default.

ProducerApp-1 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

According to the architecture diagram discussed earlier, set the ProducerApp-1 produce quota to 1024 bytes/second. For <ProducerApp-1 Client Id> and <ProducerApp-1 Role Session>, make sure you use the same values that you used while running ProducerApp-1 earlier (producerapp-1-client-id and producerapp-1-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'producer_byte_rate=1024' \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

Verify the ProducerApp-1 produce quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

You can remove the ProducerApp-1 produce quota by using the following command, but don’t run the command as we’ll test the quotas next.

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config producer_byte_rate \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

ConsumerApp-1 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

Let’s set a consume quota of 5120 bytes/second for ConsumerApp-1. For <ConsumerApp-1 Client Id> and <ConsumerApp-1 Role Session>, make sure you use the same values that you used while running ConsumerApp-1 earlier (consumerapp-1-client-id and consumerapp-1-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=5120' \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

Verify the ConsumerApp-1 consume quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

You can remove the ConsumerApp-1 consume quota, by using the following command, but don’t run the command as we’ll test the quotas next.

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config consumer_byte_rate \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

ConsumerApp-2 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

Let’s set a consume quota of 1024 bytes/second for ConsumerApp-2. For <ConsumerApp-2 Client Id> and <ConsumerApp-2 Role Session>, make sure you use the same values that you used while running ConsumerApp-2 earlier (consumerapp-2-client-id and consumerapp-2-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=1024' \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

Verify the ConsumerApp-2 consume quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

As with ConsumerApp-1, you can remove the ConsumerApp-2 consume quota using the same command with ConsumerApp-2 client and user details.

Rerun the producer and consumer applications after setting quotas

Let’s rerun the applications to verify the effect of the quotas.

Rerun ProducerApp-1

Rerun ProducerApp-1 in synchronous mode with the same command that you used earlier. The following screenshot illustrates that when ProducerApp-1 reaches its quota on any of the brokers, the produce-throttle-time-avg and produce-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ProducerApp-1 is throttled. Allow ProducerApp-1 to run for a few seconds and then stop it by using Ctrl+C.

You can also test the effect of the produce quota by rerunning ProducerApp-1 again in asynchronous mode (--producer-type async). Similar to a synchronous run, the following screenshot illustrates that when ProducerApp-1 reaches its quota on any of the brokers, the produce-throttle-time-avg and produce-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ProducerApp-1 is throttled. Allow asynchronous ProducerApp-1 to run for a while.

You will eventually see a TimeoutException stating org.apache.kafka.common.errors.TimeoutException: Expiring xxxxx record(s) for Topic-B-2:xxxxxxx ms has passed since batch creation

When using an asynchronous producer and sending messages at a rate greater than the broker can accept due to the quota, the messages will be queued in the client application memory first. The client will eventually run out of buffer space if the rate of sending messages continues to exceed the rate of accepting messages, causing the next Producer.send() call to be blocked. Producer.send() will eventually throw a TimeoutException if the timeout delay is not sufficient to allow the broker to catch up to the producer application. Stop ProducerApp-1 by using Ctrl+C.

Rerun ConsumerApp-1

Rerun ConsumerApp-1 with the same command that you used earlier. The following screenshot illustrates that when ConsumerApp-1 reaches its quota, the fetch-throttle-time-avg and fetch-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ConsumerApp-1 is throttled.

Allow ConsumerApp-1 to run for a few seconds and then stop it by using Ctrl+C.

Rerun ConsumerApp-2

Rerun ConsumerApp-2 with the same command that you used earlier. Similarly, when ConsumerApp-2 reaches its quota, the fetch-throttle-time-avg and fetch-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ConsumerApp-2 is throttled. Allow ConsumerApp-2 to run for a few seconds and then stop it by pressing Ctrl+C.

Client quota metrics in Amazon CloudWatch

In Part 1, we explained that client metrics are metrics exposed by clients connecting to Kafka clusters. Let’s examine the client metrics in CloudWatch.

  1. On the CloudWatch console, choose All metrics.
  2. Under Custom Namespaces, choose the namespace you provided while running the client applications.
  3. Choose the dimension name and select produce-throttle-time-max, produce-throttle-time-avg, fetch-throttle-time-max, and fetch-throttle-time-avg metrics for all the applications.

These metrics indicate throttling behavior for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications tested with the quota configurations in the previous section. The following screenshots indicate the throttling of ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 based on network bandwidth quotas. ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications feed their respective client metrics to CloudWatch. You can find the source code on GitHub for your reference.

Secure client ID and role session name

We discussed how to configure Kafka quotas using an application’s client ID and authenticated user principal. When a client application assumes an IAM role to access Kafka topics on a MSK cluster with IAM authentication enabled, its authenticated user principal is represented in the following format (for more information, refer to IAM identifiers):

arn:aws:sts::111111111111:assumed-role/Topic-B-Write-Role/producerapp-1-role-session

It contains the role session name (in this case, producerapp-1-role-session) used in the client application while assuming an IAM role through the AWS STS SDK. The client application source code is available for your reference. The client ID is a logical name string (for example, producerapp-1-client-id) that is configured in the application code by the application team. Therefore, an application can impersonate another application if it obtains the client ID and role session name of the other application, and if it has permission to assume the same IAM role.

As shown in the architecture diagram, ConsumerApp-1 and ConsumerApp-2 are two separate client applications with their respective quota allocations. Because both have permission to assume the same IAM role (Topic-B-Read-Role) in the demo account, they are allowed to consume messages from Topic-B. Thus, MSK cluster brokers distinguish them based on their client IDs and users (which contain their respective role session name values). If ConsumerApp-2 somehow obtains the ConsumerApp-1 role session name and client ID, it can impersonate ConsumerApp-1 by specifying the ConsumerApp-1 role session name and client ID in the application code.

Let’s assume ConsumerApp-1 uses consumerapp-1-client-id and consumerapp-1-role-session as its client ID and role session name, respectively. Therefore, ConsumerApp-1's authenticated user principal will appear as follows when it assumes the Topic-B-Read-Role IAM role:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

Similarly, ConsumerApp-2 uses consumerapp-2-client-id and consumerapp-2-role-session as its client ID and role session name, respectively. Therefore, ConsumerApp-2's authenticated user principal will appear as follows when it assumes the Topic-B-Read-Role IAM role:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-2-role-session

If ConsumerApp-2 obtains ConsumerApp-1's client ID and role session name and specifies them in its application code, MSK cluster brokers will treat it as ConsumerApp-1 and view its client ID as consumerapp-1-client-id, and the authenticated user principal as follows:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

This allows ConsumerApp-2 to consume data from the MSK cluster at a maximum rate of 5120 bytes per second rather than 1024 bytes per second as per its original quota allocation. Consequently, ConsumerApp-1's throughput will be negatively impacted if ConsumerApp-2 runs concurrently.

Enhanced architecture

You can introduce AWS Secrets Manager and AWS Key Management Service (AWS KMS) in the architecture to secure applications’ client IDs and role session names. To provide stronger governance, the applications’ client ID and role session name must be stored as encrypted secrets in the Secrets Manager. The IAM resource policies associated with encrypted secrets and a KMS customer managed key (CMK) will allow applications to access and decrypt only their respective client ID and role session name. In this way, applications will not be able to access each other’s client ID and role session name and impersonate one another. The following image shows the enhanced architecture.

The updated flow has the following stages:

  • P1ProducerApp-1 retrieves its client-id and role-session-name secrets from Secrets Manager
  • P2ProducerApp-1 configures the secret client-id as CLIENT_ID_CONFIG in the application code, and assumes Topic-B-Write-Role (via its ProducerApp-1-Role IAM role) by passing the secret role-session-name to the AWS STS SDK assumeRole function call
  • P3 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B
  • C1 ConsumerApp-1 and ConsumerApp-2 retrieve their respective client-id and role-session-name secrets from Secrets Manager
  • C2ConsumerApp-1 and ConsumerApp-2 configure their respective secret client-id as CLIENT_ID_CONFIG in their application code, and assume Topic-B-Write-Role (via ConsumerApp-1-Role and ConsumerApp-2-Role IAM roles, respectively) by passing their secret role-session-name in the AWS STS SDK assumeRole function call
  • C3 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B

Refer to the documentation for AWS Secrets Manager and AWS KMS to get a better understanding of how they fit into the architecture.

Clean up resources

Navigate to the CloudFormation console and delete the MSKStack stack. All resources created during this post will be deleted.

Conclusion

In this post, we covered detailed steps to configure Amazon MSK quotas and demonstrated their effect through sample client applications. In addition, we discussed how you can use client metrics to determine if a client application is throttled. We also highlighted a potential issue with plaintext client IDs and role session names. We recommend implementing Kafka quotas with Amazon MSK using Secrets Manager and AWS KMS as per the revised architecture diagram to ensure a zero-trust architecture.

If you have feedback or questions about this post, including the revised architecture, we’d be happy to hear from you. We hope you enjoyed reading this post.


About the Author

Vikas Bajaj is a Senior Manager, Solutions Architects, Financial Services at Amazon Web Services. With over two decades of experience in financial services and working with digital-native businesses, he advises customers on product design, technology roadmaps, and application architectures.

Introducing Protocol buffers (protobuf) schema support in Amazon Glue Schema Registry

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/introducing-protocol-buffers-protobuf-schema-support-in-amazon-glue-schema-registry/

AWS Glue Schema Registry now supports Protocol buffers (protobuf) schemas in addition to JSON and Avro schemas. This allows application teams to use protobuf schemas to govern the evolution of streaming data and centrally control data quality from data streams to data lake. AWS Glue Schema Registry provides an open-source library that includes Apache-licensed serializers and deserializers for protobuf that integrate with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, and Kafka Streams. Similar to Avro and JSON schemas, Protocol buffers schemas also support compatibility modes, schema sourcing via metadata, auto-registration of schemas, and AWS Identity and Access Management (IAM) compatibility.

In this post, we focus on Protocol buffers schema support in AWS Glue Schema Registry and how to use Protocol buffers schemas in stream processing Java applications that integrate with Apache Kafka, Amazon Managed Streaming for Apache Kafka and Amazon Kinesis Data Streams

Introduction to Protocol buffers

Protocol buffers is a language and platform-neutral, extensible mechanism for serializing and deserializing structured data for use in communications protocols and data storage. A protobuf message format is defined in the .proto file. Protobuf is recommended over other data formats when you need language interoperability, faster serialization and deserialization, type safety, schema adherence between data producer and consumer applications, and reduced coding effort. With protobuf, you can use generated code from the schema using the protobuf compiler (protoc) to easily write and read your data to and from data streams using a variety of languages. You can also use build tools plugins such as Maven and Gradle to generate code from protobuf schemas as part of your CI/CD pipelines. ​We use the following schema for code examples in this post, which defines an employee with a gRPC service definition to find an employee by ID:

Employee.proto

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

AWS Glue Schema Registry supports both proto2 and proto3 syntax. The preceding protobuf schema using version 2 contains three message types: Employee, Team, and Project using scalar, composite, and enumeration data types. Each field in the message definitions has a unique number, which is used to identify fields in the message binary format, and should not be changed once your message type is in use. In a proto2 message, a field can be required, optional, or repeated; in proto3, the options are repeated and optional. The package declaration makes sure generated code is namespaced to avoid any collisions. In addition to scalar, composite, and enumeration types, AWS Glue Schema Registry also supports protobuf schemas with common types such as Money, PhoneNumber,Timestamp, Duration, and nullable types such as BoolValue and Int32Value. It also supports protobuf schemas with gRPC service definitions with compatibility rules, such as EmployeeSearch, in the preceding schema. To learn more about the Protocol buffers, refer to its documentation.

Supported Protocol buffers specification and features

AWS Glue Schema Registry supports all the features of Protocol buffers for versions 2 and 3 except for groups, extensions, and importing definitions. AWS Glue Schema Registry APIs and its open-source library supports the latest protobuf runtime version. The protobuf schema operations in AWS Glue Schema Registry are supported via the AWS Management Console, AWS Command Line Interface (AWS CLI), AWS Glue Schema Registry API, AWS SDK, and AWS CloudFormation.

How AWS Glue Schema Registry works

The following diagram illustrates a high-level view of how AWS Glue Schema Registry works. AWS Glue Schema Registry allows you to register and evolve JSON, Apache Avro, and Protocol buffers schemas with compatibility modes. You can register multiple versions of each schema as the business needs or stream processing application’s requirements evolve. The AWS Glue Schema Registry open-source library provides JSON, Avro, and protobuf serializers and deserializers that you configure in producer and consumer stream processing applications, as shown in the following diagram. The open-source library also supports optional compression and caching configuration to save on data transfers.

To accommodate various business use cases, AWS Glue Schema Registry supports multiple compatibility modes. For example, if a consumer application is updated to a new schema version but is still able to consume and process messages based on the previous version of the same schema, then the schema is backward-compatible. However, if a schema version has bumped up in the producer application and the consumer application is not updated yet but can still consume and process the old and new message, then the schema is configured as forward-compatible. For more information, refer to How the Schema Registry Works.

Create a Protocol buffers schema in AWS Glue Schema Registry

In this section, we create a protobuf schema in AWS Glue Schema Registry via the console and AWS CLI.

Create a schema via the console

Make sure you have the required AWS Glue Schema Registry IAM permissions.

  1. On the AWS Glue console, choose Schema registries in the navigation pane.
  2. Click Add registry.
  3. For Registry name, enter employee-schema-registry.
  4. Click Add Registry.
  5. After the registry is created, click Add schema to register a new schema.
  6. For Schema name, enter Employee.proto.

The schema must be either Employee.proto or Employee if the protobuf schema doesn’t have the options option java_multiple_files = true; and option java_outer_classname = "<Outer class name>"; and if you decide to use protobuf schema generated code (POJOs) in your stream processing applications. We cover this with an example in a subsequent section of this post.­ For more information on protobuf options, refer to Options.

  1. For Registry, choose the registry employee-schema-registry.
  2. For Data format, choose Protocol buffers.
  3. For Compatibility mode, choose Backward.

You can choose other compatibility modes as per your use case.

  1. For First schema version, enter the preceding protobuf schema, then click Create schema and version.

After the schema is registered successfully, its status will be Available, as shown in the following screenshot.

Create a schema via the AWS CLI

Make sure you have IAM credentials with AWS Glue Schema Registry permissions.

  1. Run the following AWS CLI command to create a schema registry employee-schema-registry (for this post, we use the Region us-east-2):
    aws glue create-registry \
    --registry-name employee-schema-registry \
    --region us-east-2

The AWS CLI command returns the newly created schema registry ARN in response.

  1. Copy the RegistryArn value from the response to use in the following AWS CLI command.
  2. In the following command, use the preceding protobuf schema and schema name Employee.proto:
    aws glue create-schema --schema-name Employee.proto \
    --registry-id RegistryArn=<Schema Registry ARN that you copied from response of create registry CLI command> \
    --compatibility BACKWARD \
    --data-format PROTOBUF \
    --schema-definition file:///<project-directory>/Employee.proto \
    --region us-east-2

You can also use AWS CloudFormation to create schemas in AWS Glue Schema Registry.

Using a Protocol buffers schema with Amazon MSK and Kinesis Data Streams

Like Apache Avro’s SpecificRecord and GenericRecord, protobuf also supports working with POJOs to ensure type safety and DynamicMessage to create generic data producer and consumer applications. The following examples showcase the use of a protobuf schema registered in AWS Glue Schema Registry with Kafka and Kinesis Data Streams producer and consumer applications.

Use a protobuf schema with Amazon MSK

Create an Amazon MSK or Apache Kafka cluster with a topic called protobuf-demo-topic. If creating an Amazon MSK cluster, you can use the console. For instructions, refer to Getting Started Using Amazon MSK.

Use protobuf schema-generated POJOs

To use protobuf schema-generated POJOs, complete the following steps:

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We use the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Add the following dependencies to your application’s pom.xml file:
    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.19.4</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/software.amazon.glue/schema-registry-serde -->
    <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.9</version>
    </dependency>	

  4. Create a schema registry employee-schema-registry in AWS Glue Schema Registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value:
    mvn clean compile

Next, we configure the Kafka producer publishing protobuf messages to the Kafka topic on Amazon MSK.

  1. Configure the Kafka producer properties:
private Properties getProducerConfig() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
    props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
    props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
    props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
    props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
    props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
    return props;
}

The VALUE_SERIALIZER_CLASS_CONFIG configuration specifies the AWS Glue Schema Registry serializer, which serializes the protobuf message.

  1. Use the schema-generated code (POJOs) to create a protobuf message:
    public EmployeeOuterClass.Employee createEmployeeRecord(int employeeId){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                        .setId(employeeId)
                        .setName("Dummy")
                        .setAddress("Melbourne, Australia")
                        .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                        .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                        .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                        .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                        .setRole(EmployeeOuterClass.Role.ARCHITECT)
                        .setProject(EmployeeOuterClass.Project.newBuilder()
                                .setName("Protobuf Schema Demo")
                                .setState("GA").build())
                        .setTotalAwardValue(Money.newBuilder()
                                            .setCurrencyCode("USD")
                                            .setUnits(5)
                                            .setNanos(50000).build())
                        .setTeam(EmployeeOuterClass.Team.newBuilder()
                                .setName("Solutions Architects")
                                .setLocation("Australia").build()).build();
        return employee;
    }

  2. Publish the protobuf messages to the protobuf-demo-topic topic on Amazon MSK:
    public void startProducer() throws InterruptedException {
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, EmployeeOuterClass.Employee> producer = new KafkaProducer<String, EmployeeOuterClass.Employee>(getProducerConfig());
        logger.info("Starting to send records...");
        int employeeId = 0;
        while(employeeId < 100)
        {
            EmployeeOuterClass.Employee person = createEmployeeRecord(employeeId);
            String key = "key-" + employeeId;
            ProducerRecord<String,  EmployeeOuterClass.Employee> record = new ProducerRecord<String,  EmployeeOuterClass.Employee>(topic, key, person);
            producer.send(record, new ProducerCallback());
            employeeId++;
        }
    }
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e){
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            }
            else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
        ProducerProtobuf producer = new ProducerProtobuf();
        producer.startProducer();
    }

  4. In the Kafka consumer application’s pom.xml, add the same plugin and dependencies as the Kafka producer’s pom.xml.

Next, we configure the Kafka consumer consuming protobuf messages from the Kafka topic on Amazon MSK.

  1. Configure the Kafka consumer properties:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
        return props;
    }

The VALUE_DESERIALIZER_CLASS_CONFIG config specifies the AWS Glue Schema Registry deserializer that deserializes the protobuf messages.

  1. Consume the protobuf message (as a POJO) from the protobuf-demo-topic topic on Amazon MSK:
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, EmployeeOuterClass.Employee> consumer = new KafkaConsumer<String, EmployeeOuterClass.Employee>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, EmployeeOuterClass.Employee> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, EmployeeOuterClass.Employee> record : records) {
                final EmployeeOuterClass.Employee employee = record.value();
                logger.info("Employee Id: " + employee.getId() + " | Name: " + employee.getName() + " | Address: " + employee.getAddress() +
                        " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                        " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                        " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                        " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                        " | Project Name: " + employee.getProject().getName() + "| Award currency code: " + employee.getTotalAwardValue().getCurrencyCode() +
                        " | Award units : " + employee.getTotalAwardValue().getUnits() + " | Award nanos " + employee.getTotalAwardValue().getNanos());
            }
        }
    }

  2. Start the Kafka consumer:
    public static void main(String args[]){
        ConsumerProtobuf consumer = new ConsumerProtobuf();
        consumer.startConsumer();
    }

Use protobuf’s DynamicMessage

You can use DynamicMessage to create generic producer and consumer applications without generating the code from the protobuf schema. To use DynamicMessage, you first need to create a protobuf schema file descriptor.

  1. Generate a file descriptor from the protobuf schema using the following command:
    protoc --include_imports --proto_path=proto --descriptor_set_out=proto/Employeeproto.desc proto/Employee.proto

The option --descritor_set_out has the descriptor file name that this command generates. The protobuf schema Employee.proto is in the proto directory.

  1. Make sure you have created a schema registry and registered the preceding protobuf schema with it.

Now we configure the Kafka producer publishing DynamicMessage to the Kafka topic on Amazon MSK.

  1. Create the Kafka producer configuration. The PROTOBUF_MESSAGE_TYPE configuration is DYNAMIC_MESSAGE instead of POJO.
    private Properties getProducerConfig() {
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
       props.put(ProducerConfig.ACKS_CONFIG, "-1");
       props.put(ProducerConfig.CLIENT_ID_CONFIG,"protobuf-dynamicmessage-record-producer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,GlueSchemaRegistryKafkaSerializer.class.getName());
       props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
       props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
       props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
       props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
       props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
       return props;
        }

  2. Create protobuf dynamic messages and publish them to the Kafka topic on Amazon MSK:
    public void startProducer() throws Exception {
        Descriptor desc = getDescriptor();
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, DynamicMessage> producer = new KafkaProducer<String, DynamicMessage>(getProducerConfig());
        logger.info("Starting to send records...");
        int i = 0;
        while (i < 100) {
            DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                    .setField(desc.findFieldByName("id"), 1234)
                    .setField(desc.findFieldByName("name"), "Dummy Name")
                    .setField(desc.findFieldByName("address"), "Melbourne, Australia")
                    .setField(desc.findFieldByName("employee_age"), Int32Value.newBuilder().setValue(32).build())
                    .setField(desc.findFieldByName("start_date"), Timestamp.newBuilder().setSeconds(235234532434L).build())
                    .setField(desc.findFieldByName("total_time_span_in_company"), Duration.newBuilder().setSeconds(3453245345L).build())
                    .setField(desc.findFieldByName("is_certified"), BoolValue.newBuilder().setValue(true).build())
    		.setField(desc.findFieldByName("total_award_value"), Money.newBuilder().setCurrencyCode("USD")
    						.setUnits(1).setNanos(50000).build())
                    .setField(desc.findFieldByName("team"), createTeam(desc.findFieldByName("team").getMessageType()))
                    .setField(desc.findFieldByName("project"), createProject(desc.findFieldByName("project").getMessageType()))
                    .setField(desc.findFieldByName("role"), desc.findFieldByName("role").getEnumType().findValueByName("ARCHITECT"))
                    .build();
            String key = "key-" + i;
            ProducerRecord<String, DynamicMessage> record = new ProducerRecord<String, DynamicMessage>(topic, key, dynMessage);
            producer.send(record, new ProtobufProducer.ProducerCallback());
            Thread.sleep(1000);
            i++;
        }
    }
    private static DynamicMessage createTeam(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Solutions Architects")
                .setField(desc.findFieldByName("location"), "Australia")
                .build();
        return dynMessage;
    }
    
    private static DynamicMessage createProject(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Protobuf Schema Demo")
                .setField(desc.findFieldByName("state"), "GA")
                .build();
        return dynMessage;
    }
    
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e) {
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            } else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Create a descriptor using the Employeeproto.desc file that we generated from the Employee.proto schema file in the previous steps:
    private Descriptor getDescriptor() throws Exception {
        InputStream inStream = ProtobufProducer.class.getClassLoader().getResourceAsStream("proto/Employeeproto.desc");
        DescriptorProtos.FileDescriptorSet fileDescSet = DescriptorProtos.FileDescriptorSet.parseFrom(inStream);
        Map<String, DescriptorProtos.FileDescriptorProto> fileDescProtosMap = new HashMap<String, DescriptorProtos.FileDescriptorProto>();
        List<DescriptorProtos.FileDescriptorProto> fileDescProtos = fileDescSet.getFileList();
        for (DescriptorProtos.FileDescriptorProto fileDescProto : fileDescProtos) {
            fileDescProtosMap.put(fileDescProto.getName(), fileDescProto);
        }
        DescriptorProtos.FileDescriptorProto fileDescProto = fileDescProtosMap.get("Employee.proto");
        FileDescriptor[] dependencies = getProtoDependencies(fileDescProtosMap, fileDescProto);
        FileDescriptor fileDesc = FileDescriptor.buildFrom(fileDescProto, dependencies);
        Descriptor desc = fileDesc.findMessageTypeByName("Employee");
        return desc;
    }
    
    public static FileDescriptor[] getProtoDependencies(Map<String, FileDescriptorProto> fileDescProtos, 
    				  FileDescriptorProto fileDescProto) throws Exception {
    
        if (fileDescProto.getDependencyCount() == 0)
            return new FileDescriptor[0];
    
        ProtocolStringList dependencyList = fileDescProto.getDependencyList();
        String[] dependencyArray = dependencyList.toArray(new String[0]);
        int noOfDependencies = dependencyList.size();
    
        FileDescriptor[] dependencies = new FileDescriptor[noOfDependencies];
        for (int i = 0; i < noOfDependencies; i++) {
            FileDescriptorProto dependencyFileDescProto = fileDescProtos.get(dependencyArray[i]);
            FileDescriptor dependencyFileDesc = FileDescriptor.buildFrom(dependencyFileDescProto, 
    					     getProtoDependencies(fileDescProtos, dependencyFileDescProto));
            dependencies[i] = dependencyFileDesc;
        }
        return dependencies;
    }

  4. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
    	 ProducerProtobuf producer = new ProducerProtobuf();
             producer.startProducer();
    }

Now we configure the Kafka consumer consuming dynamic messages from the Kaka topic on Amazon MSK.

  1. Enter the following Kafka consumer configuration:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-record-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
        return props;
    }

  2. Consume protobuf dynamic messages from the Kafka topic protobuf-demo-topic. Because we’re using DYNAMIC_MESSAGE, the retrieved objects are of type DynamicMessage.
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, DynamicMessage> consumer = new KafkaConsumer<String, DynamicMessage>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, DynamicMessage> record : records) {
                for (Descriptors.FieldDescriptor field : record.value().getAllFields().keySet()) {
                    logger.info(field.getName() + ": " + record.value().getField(field));
                }
            }
        }
    }

  3. Start the Kafka consumer:
    public static void main(String args[]){
            ConsumerProtobuf consumer = new ConsumerProtobuf();
            consumer.startConsumer();
         }

Use a protobuf schema with Kinesis Data Streams

You can use the protobuf schema-generated POJOs with the Kinesis Producer Library (KPL) and Kinesis Client Library (KCL).

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We’re using the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Because the KPL and KCL latest versions have the AWS Glue Schema Registry open-source library (schema-registry-serde) and protobuf runtime (protobuf-java) included, you only need to add the following dependencies to your application’s pom.xml:
    <!-- https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-producer -->
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-producer</artifactId>
        <version>0.14.11</version>
    	</dependency>
    	<!-- https://mvnrepository.com/artifact/software.amazon.kinesis/amazon-kinesis-client -->
    <dependency>
        <groupId>software.amazon.kinesis</groupId>
        <artifactId>amazon-kinesis-client</artifactId>
        <version>2.4.0version>
    </dependency>

  4. Create a schema registry employee-schema-registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value.
    mvn clean compile

The following Kinesis producer code with the KPL uses the Schema Registry open-source library to publish protobuf messages to Kinesis Data Streams.

  1. Start the Kinesis Data Streams producer:
    private static final String PROTO_SCHEMA_FILE = "proto/Employee.proto";
    private static final String SCHEMA_NAME = "Employee.proto";
    private static String REGION_NAME = "us-east-2";
    private static String REGISTRY_NAME = "employee-schema-registry";
    private static String STREAM_NAME = "employee_data_stream";
    private static int NUM_OF_RECORDS = 100;
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws Exception {
         ProtobufKPLProducer producer = new ProtobufKPLProducer();
         producer.startProducer();
     }
    }

  2. Configure the Kinesis producer:
public void startProducer() throws Exception {
    logger.info("Starting KPL client with Glue Schema Registry Integration...");
    GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration(REGION_NAME);
    schemaRegistryConfig.setCompressionType(AWSSchemaRegistryConstants.COMPRESSION.ZLIB);
    schemaRegistryConfig.setSchemaAutoRegistrationEnabled(false);
    schemaRegistryConfig.setCompatibilitySetting(Compatibility.BACKWARD);
    schemaRegistryConfig.setEndPoint(REGISTRY_ENDPOINT);
    schemaRegistryConfig.setProtobufMessageType(ProtobufMessageType.POJO);
    schemaRegistryConfig.setRegistryName(REGISTRY_NAME);
	
    //Setting Glue Schema Registry configuration in Kinesis Producer Configuration along with other configs
    KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                                        .setRecordMaxBufferedTime(3000)
                                        .setMaxConnections(1)
                                        .setRequestTimeout(60000)
                                        .setRegion(REGION_NAME)
                                        .setRecordTtl(60000)
                                        .setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

    FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
        @Override public void onFailure(Throwable t) {
              t.printStackTrace();
        };
        @Override public void onSuccess(UserRecordResult result) {
            logger.info("record sent successfully. Sequence Number: " + result.getSequenceNumber() + " | Shard Id : " + result.getShardId());
        };
    };
    
	//Creating schema definition object from the Employee.proto schema file.
    Schema gsrSchema = getSchemaDefinition();
    final KinesisProducer producer = new KinesisProducer(config);
    int employeeCount = 1;
    while(true) {
        //Creating and serializing schema generated POJO object (protobuf message)

        EmployeeOuterClass.Employee employee = createEmployeeRecord(employeeCount);
        byte[] serializedBytes = employee.toByteArray();
        ByteBuffer data = ByteBuffer.wrap(serializedBytes);
        Instant timestamp = Instant.now();

        //Publishing protobuf message to the Kinesis Data Stream
        ListenableFuture<UserRecordResult> f =
                    producer.addUserRecord(STREAM_NAME,
                                        Long.toString(timestamp.toEpochMilli()),
                                        new BigInteger(128, new Random()).toString(10),
                                        data,
                                        gsrSchema);
        Futures.addCallback(f, myCallback, MoreExecutors.directExecutor());
        employeeCount++;
        if(employeeCount > NUM_OF_RECORDS)
            break;
    }
    List<Future<UserRecordResult>> putFutures = new LinkedList<>();
    for (Future<UserRecordResult> future : putFutures) {
        UserRecordResult userRecordResult = future.get();
        logger.info(userRecordResult.getShardId() + userRecordResult.getSequenceNumber());
    }
}
  1. Create a protobuf message using schema-generated code (POJOs):
    public EmployeeOuterClass.Employee createEmployeeRecord(int count){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                .setId(count)
                .setName("Dummy")
                .setAddress("Melbourne, Australia")
                .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                .setRole(EmployeeOuterClass.Role.ARCHITECT)
                .setProject(EmployeeOuterClass.Project.newBuilder()
                            .setName("Protobuf Schema Demo")
                            .setState("GA").build())
                .setTotalAwardValue(Money.newBuilder()
                            .setCurrencyCode("USD")
                            .setUnits(5)
                            .setNanos(50000).build())
                .setTeam(EmployeeOuterClass.Team.newBuilder()
                            .setName("Solutions Architects")
                            .setLocation("Australia").build()).build();
        return employee;
    }

  2. Create the schema definition from Employee.proto:
    private Schema getSchemaDefinition() throws IOException {
        InputStream inputStream = ProtobufKPLProducer.class.getClassLoader().getResourceAsStream(PROTO_SCHEMA_FILE);
        StringBuilder resultStringBuilder = new StringBuilder();
        try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
            String line;
            while ((line = br.readLine()) != null) {
                resultStringBuilder.append(line).append("\n");
            }
        }
        String schemaDefinition = resultStringBuilder.toString();
        logger.info("Schema Definition " + schemaDefinition);
        Schema gsrSchema =
                new Schema(schemaDefinition, DataFormat.PROTOBUF.toString(), SCHEMA_NAME);
        return gsrSchema;
    }

The following is the Kinesis consumer code with the KCL using the Schema Registry open-source library to consume protobuf messages from the Kinesis Data Streams.

  1. Initialize the application:
    public void run(){
        logger.info("Starting KCL client with Glue Schema Registry Integration...");
        Region region = Region.of(ObjectUtils.firstNonNull(REGION_NAME, "us-east-2"));
        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
    
        EmployeeRecordProcessorFactory employeeRecordProcessorFactory = new EmployeeRecordProcessorFactory();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(STREAM_NAME,
                        APPLICATION_NAME,
                        kinesisClient,
                        dynamoClient,
                        cloudWatchClient,
                        APPLICATION_NAME,
                        employeeRecordProcessorFactory);
    
        //Creating Glue Schema Registry configuration and Glue Schema Registry Deserializer object.
        GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration(region.toString());
        gsrConfig.setEndPoint(REGISTRY_ENDPOINT);
        gsrConfig.setProtobufMessageType(ProtobufMessageType.POJO);
        GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
                new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), gsrConfig);
        /*
         Setting Glue Schema Registry deserializer in the Retrieval Config for
         Kinesis Client Library to use it while deserializing the protobuf messages.
         */
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(STREAM_NAME, kinesisClient));
        retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
    
        Scheduler scheduler = new Scheduler(
                		configsBuilder.checkpointConfig(),
                		configsBuilder.coordinatorConfig(),
               		configsBuilder.leaseManagementConfig(),
                		configsBuilder.lifecycleConfig(),
                		configsBuilder.metricsConfig(),
                		configsBuilder.processorConfig(),
                		retrievalConfig);
    
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    
        logger.info("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
            Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
            logger.info("Waiting up to 20 seconds for shutdown to complete.");
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.info("Interrupted while waiting for graceful shutdown. Continuing.");
        }
        logger.info("Completed, shutting down now.");
    }

  2. Consume protobuf messages from Kinesis Data Streams:
    public static class EmployeeRecordProcessorFactory implements ShardRecordProcessorFactory {
        @Override
        public ShardRecordProcessor shardRecordProcessor() {
            return new EmployeeRecordProcessor();
        }
    }
    public static class EmployeeRecordProcessor implements ShardRecordProcessor {
        private static final Logger logger = Logger.getLogger(EmployeeRecordProcessor.class.getSimpleName());
        public void initialize(InitializationInput initializationInput) {}
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            try {
                logger.info("Processing " + processRecordsInput.records().size() + " record(s)");
                for (KinesisClientRecord r : processRecordsInput.records()) {
    			
                    //Deserializing protobuf message into schema generated POJO
                    EmployeeOuterClass.Employee employee = EmployeeOuterClass.Employee.parseFrom(r.data().array());
                    
                   logger.info("Processed record: " + employee);
                    logger.info("Employee Id: " + employee.getId() + " | Name: "  + employee.getName() + " | Address: " + employee.getAddress() +
                            " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                            " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                            " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                            " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                            " | Project Name: " + employee.getProject().getName() + " | Award currency code: " +    
                           employee.getTotalAwardValue().getCurrencyCode() + " | Award units : " + employee.getTotalAwardValue().getUnits() + 
    		      " | Award nanos " + employee.getTotalAwardValue().getNanos());
                }
            } catch (Exception e) {
                logger.info("Failed while processing records. Aborting" + e);
                Runtime.getRuntime().halt(1);
            }
        }
        public void leaseLost(LeaseLostInput leaseLostInput) {. . .}
        public void shardEnded(ShardEndedInput shardEndedInput) {. . .}
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {. . .}
    }

  3. Start the Kinesis Data Streams consumer:
    private static final Logger logger = Logger.getLogger(ProtobufKCLConsumer.class.getSimpleName());
    private static String REGION_NAME = "us-east-2";
    private static String STREAM_NAME = "employee_data_stream";
    private static final String APPLICATION_NAME =  "protobuf-demo-kinesis-kpl-consumer";
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws ParseException {
        new ProtobufKCLConsumer().run();
    }
    

Enhance your protobuf schema

We covered examples of data producer and consumer applications integrating with Amazon MSK, Apache Kafka, and Kinesis Data Streams, and using a Protocol buffers schema registered with AWS Glue Schema Registry. You can further enhance these examples with schema evolution using the following rules, which are supported by AWS Glue Schema Registry. For example, the following protobuf schema shown is a backward-compatible updated version of Employee.proto. We have added another gRPC service definition CreateEmployee under EmployeeSearch and added an Optional field in the Employee message type. If you upgrade the consumer application with this version of the protobuf schema, the consumer application can still consume old and new protobuf messages.

Employee.proto (version-2)

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
    rpc CreateEmployee(EmployeeSearchParams) returns (google.protobuf.Empty);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
    optional string title = 12;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

Conclusion

In this post, we introduced Protocol buffers schema support in AWS Glue Schema Registry. AWS Glue Schema Registry now supports Apache Avro, JSON, and Protocol buffers schemas with different compatible modes. The examples in this post demonstrated how to use Protocol buffers schemas registered with AWS Glue Schema Registry in stream processing applications integrated with Apache Kafka, Amazon MSK, and Kinesis Data Streams. We used the schema-generated POJOs for type safety and protobuf’s DynamicMessage to create generic producer and consumer applications. The examples in this post contain the basic components of the stream processing pattern; you can adapt these examples to your use case needs.

To learn more, refer to the following resources:


About the Author

Vikas Bajaj is a Principal Solutions Architect at AWS. Vikas works with digital native customers and advises them on technology architecture and solutions to meet strategic business objectives.

Validate streaming data over Amazon MSK using schemas in cross-account AWS Glue Schema Registry

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/validate-streaming-data-over-amazon-msk-using-schemas-in-cross-account-aws-glue-schema-registry/

Today’s businesses face an unprecedented growth in the volume of data. A growing portion of the data is generated in real time by IoT devices, websites, business applications, and various other sources. Businesses need to process and analyze this data as soon as it arrives to make business decisions in real time. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables building and running stream processing applications that use Apache Kafka to collect and process data in real time.

Stream processing applications using Apache Kafka don’t communicate with each other directly; they communicate via sending and receiving messages over Kafka topics. For stream processing applications to communicate efficiently and confidently, a message payload structure must be defined in terms of attributes and data types. This structure describes the schema applications use when sending and receiving messages. However, with a large number of producer and consumer applications, even a small change in schema (removing a field, adding a new field, or change in data type) may cause issues for downstream applications that are difficult to debug and fix.

Traditionally, teams have relied on change management processes (such as approvals and maintenance windows) or other informal mechanisms (documentation, emails, collaboration tools, and so on) to inform one another of data schema changes. However, these mechanisms don’t scale and are prone to mistakes. The AWS Glue Schema Registry allows you to centrally publish, discover, control, validate, and evolve schemas for stream processing applications. With the AWS Glue Schema Registry, you can manage and enforce schemas on data streaming applications using Apache Kafka, Amazon MSK, Amazon Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

This post demonstrates how Apache Kafka stream processing applications validate messages using an Apache Avro schema stored in the AWS Glue Schema registry residing in a central AWS account. We use the AWS Glue Schema Registry SerDe library and Avro SpecificRecord to validate messages in stream processing applications while sending and receiving messages from a Kafka topic on an Amazon MSK cluster. Although we use an Avro schema for this post, the same approach and concept applies to JSON schemas as well.

Use case

Let’s assume a fictitious rideshare company that offers unicorn rides. To draw actionable insights, they need to process a stream of unicorn ride request messages. They expect rides to be very popular and want to make sure their solution can scale. They’re also building a central data lake where all their streaming and operation data is stored for analysis. They’re customer obsessed, so they expect to add new fun features to future rides, like choosing the hair color of your unicorn, and will need to reflect these attributes in the ride request messages. To avoid issues in downstream applications due to future schema changes, they need a mechanism to validate messages with a schema hosted in a central schema registry. Having schemas in a central schema registry makes it easier for the application teams to publish, validate, evolve, and maintain schemas in a single place.

Solution overview

The company uses Amazon MSK to capture and distribute the unicorn ride request messages at scale. They define an Avro schema for unicorn ride requests because it provides rich data structures, supports direct mapping to JSON, as well as a compact, fast, and binary data format. Because the schema was agreed in advance, they decided to use Avro SpecificRecord.SpecificRecord is an interface from the Avro library that allows the use of an Avro record as a POJO. This is done by generating a Java class (or classes) from the schema, by using avro-maven-plugin. They use AWS Identity and Access Management (IAM) cross-account roles to allow producer and consumer applications from the other AWS account to safely and securely access schemas in the central Schema Registry account.

The AWS Glue Schema Registry is in Account B, whereas the MSK cluster and Kafka producer and consumer applications are in Account A. We use the following two IAM roles to enable cross-account access to the AWS Glue Schema Registry. Apache Kafka clients in Account A assume a role in Account B using an identity-based policy because the AWS Glue Schema Registry doesn’t support resource-based policies.

  • Account A IAM role – Allows producer and consumer applications to assume an IAM role in Account B.
  • Account B IAM role – Trusts all IAM principals from Account A and allows them to perform read actions on the AWS Glue Schema Registry in Account B. In a real use case scenario, IAM principals that can assume cross-account roles should be scoped more specifically.

The following architecture diagram illustrates the solution:

The solution works as follows:

  1. A Kafka producer running in Account A assumes the cross-account Schema Registry IAM role in Account B by calling the AWS Security Token Service (AWS STS) assumeRole API.
  2. The Kafka producer retrieves the unicorn ride request Avro schema version ID from the AWS Glue Schema Registry for the schema that’s embedded in the unicorn ride request POJO. Fetching the schema version ID is internally managed by the AWS Glue Schema Registry SerDe’s serializer. The serializer has to be configured as part of the Kafka producer configuration.
  3. If the schema exists in the AWS Glue Schema Registry, the serializer decorates the data record with the schema version ID and then serializes it before delivering it to the Kafka topic on the MSK cluster.
  4. The Kafka consumer running in Account A assumes the cross-account Schema Registry IAM role in Account B by calling the AWS STS assumeRole API.
  5. The Kafka consumer starts polling the Kafka topic on the MSK cluster for data records.
  6. The Kafka consumer retrieves the unicorn ride request Avro schema from the AWS Glue Schema Registry, matching the schema version ID that’s encoded in the unicorn ride request data record. Fetching the schema is internally managed by the AWS Glue Schema Registry SerDe’s deserializer. The deserializer has to be configured as part of the Kafka consumer configuration. If the schema exists in the AWS Glue Schema Registry, the deserializer deserializes the data record into the unicorn ride request POJO for the consumer to process it.

The AWS Glue Schema Registry SerDe library also supports optional compression configuration to save on data transfers. For more information about the Schema Registry, see How the Schema Registry works.

Unicorn ride request Avro schema

The following schema (UnicornRideRequest.avsc) defines a record representing a unicorn ride request, which contains ride request attributes along with the customer attributes and system-recommended unicorn attributes:

{
    "type": "record",
    "name": "UnicornRideRequest",
    "namespace": "demo.glue.schema.registry.avro",
    "fields": [
      {"name": "request_id", "type": "int", "doc": "customer request id"},
      {"name": "pickup_address","type": "string","doc": "customer pickup address"},
      {"name": "destination_address","type": "string","doc": "customer destination address"},
      {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"},
      {"name": "ride_duration","type": "int","doc": "ride duration in minutes"},
      {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"},
      {
        "name": "recommended_unicorn",
        "type": {
          "type": "record",
          "name": "RecommendedUnicorn",
          "fields": [
            {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"},
            {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}},
            {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
          ]
        }
      },
      {
        "name": "customer",
        "type": {
          "type": "record",
          "name": "Customer",
          "fields": [
            {"name": "customer_account_no","type": "int", "doc": "customer account number"},
            {"name": "first_name","type": "string"},
            {"name": "middle_name","type": ["null","string"], "default": null},
            {"name": "last_name","type": "string"},
            {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]},
            {"name": "customer_address","type": "string","doc": "customer address"},
            {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"},
            {"name": "customer_rating", "type": ["null", "int"], "default": null}
          ]
        }
      }
    ]
  }

Prerequisites

To use this solution, you must have two AWS accounts:

  • Account A – For the MSK cluster, Kafka producer and consumer Amazon Elastic Compute Cloud (Amazon EC2) instances, and AWS Cloud9 environment
  • Account B – For the Schema Registry and schema

For this solution, we use Region us-east-1, but you can change this as per your requirements.

Next, we create the resources in each account using AWS CloudFormation templates.

Create resources in Account B

We create the following resources in Account B:

  • A schema registry
  • An Avro schema
  • An IAM role with the AWSGlueSchemaRegistryReadonlyAccess managed policy and an instance profile, which allows all Account A IAM principals to assume it
  • The UnicornRideRequest.avsc Avro schema shown earlier, which is used as a schema definition in the CloudFormation template

Make sure you have the appropriate permissions to create these resources.

  1. Log in to Account B.
  2. Launch the following CloudFormation stack.
  3. For Stack name, enter SchemaRegistryStack.
  4. For Schema Registry name, enter unicorn-ride-request-registry.
  5. For Avro Schema name, enter unicorn-ride-request-schema-avro.
  6. For the Kafka client’s AWS account ID, enter your Account A ID.
  7. For ExternalId, enter a unique random ID (for example, demo10A), which should be provided by the Kafka clients in Account A while assuming the IAM role in this account.

For more information about cross-account security, see The confused deputy problem.

  1. When the stack is complete, on the Outputs tab of the stack, copy the value for CrossAccountGlueSchemaRegistryRoleArn.

The Kafka producer and consumer applications created in Account A assume this role to access the Schema Registry and schema in Account B.

  1. To verify the resources were created, on the AWS Glue console, choose Schema registries in the navigation bar, and locate unicorn-ride-request-registry.
  2. Choose the registry unicorn-ride-request-registry and verify that it contains unicorn-ride-request-schema-avro in the Schemas section.
  3. Choose the schema to see its content.

The IAM role created by the SchemaRegistryStack stack allows all Account A IAM principals to assume it and perform read actions on the AWS Glue Schema Registry. Let’s look at the trust relationships of the IAM role.

  1. On the SchemaRegistryStack stack Outputs tab, copy the value for CrossAccountGlueSchemaRegistryRoleName.
  2. On the IAM console, search for this role.
  3. Choose Trust relationships and look at its trusted entities to confirm that Account A is listed.
  4. In the Conditions section, confirm that sts:ExternalId has the same unique random ID provided during stack creation.

Create resources in Account A

We create the following resources in Account A:

  • A VPC
  • EC2 instances for the Kafka producer and consumer
  • An AWS Cloud9 environment
  • An MSK cluster

As a prerequisite, create an EC2 keypair and download it on your machine to be able to SSH into EC2 instances. Also create an MSK cluster configuration with default values. You need to have permissions to create the CloudFormation stack, EC2 instances, AWS Cloud9 environment, MSK cluster, MSK cluster configuration, and IAM role.

  1. Log in to Account A.
  2. Launch the following CloudFormation stack to launch the VPC, EC2 instances, and AWS Cloud9 environment.
  3. For Stack name, enter MSKClientStack.
  4. Provide the VPC and subnet CIDR ranges.
  5. For EC2 Keypair, choose an existing EC2 keypair.
  6. For the latest EC2 AMI ID, select the default option.
  7. For the cross-account IAM role ARN, use the value for CrossAccountGlueSchemaRegistryRoleArn (available on the Outputs tab of SchemaRegistryStack).
  8. Wait for the stack to create successfully.
  9. Launch the following CloudFormation stack to create the MSK cluster.
  10. For Stack name, enter MSKClusterStack.
  11. Use Amazon MSK version 2.7.1.
  12. For the MSK cluster configuration ARN, enter the MSK cluster configuration ARN. One that you created as part of the prerequisite.
  13. For the MSK cluster configuration revision number, enter 1 or change it according to your version.
  14. For the client CloudFormation stack name, enter MSKClientStack (the stack name that you created prior to this stack).

Configure the Kafka producer

To configure the Kafka producer accessing the Schema Registry in the central AWS account, complete the following steps:

  1. Log in to Account A.
  2. On the AWS Cloud9 console, choose the Cloud9EC2Bastion environment created by the MSKClientStack stack.
  3. On the File menu, choose Upload Local Files.
  4. Upload the EC2 keypair file that you used earlier while creating the stack.
  5. Open a new terminal and change the EC2 keypair permissions:
    chmod 0400 <keypair PEM file>

  6. SSH into the KafkaProducerInstance EC2 instance and set the Region as per your requirement:
    ssh -i <keypair PEM file> ec2-user@<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. Set the environment variable MSK_CLUSTER_ARN pointing to the MSK cluster’s ARN:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters |  jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d \")

Change the .ClusterName value in the code if you used a different name for the MSK cluster CloudFormation stack. The cluster name is the same as the stack name.

  1. Set the environment variable BOOTSTRAP_BROKERS pointing to the bootstrap brokers:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verify the environment variables:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. Create a Kafka topic called unicorn-ride-request-topic in your MSK cluster, which is used by the Kafka producer and consumer applications later:
    cd ~/kafka
    
    ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS \
    --topic unicorn-ride-request-topic \
    --create --partitions 3 --replication-factor 2
    
    ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

The MSKClientStack stack copied the Kafka producer client JAR file called kafka-cross-account-gsr-producer.jar to the KafkaProducerInstance instance. It contains the Kafka producer client that sends messages to the Kafka topic unicorn-ride-request-topic on the MSK cluster and accesses the unicorn-ride-request-schema-avro Avro schema from the unicorn-ride-request-registry schema registry in Account B. The Kafka producer code, which we cover later in this post, is available on GitHub.

  1. Run the following commands and verify kafka-cross-account-gsr-producer.jar exists:
    cd ~
    ls -ls

  2. Run the following command to run the Kafka producer in the KafkaProducerInstance terminal:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS \
    -rn <Account B IAM role arn that Kafka producer application needs to assume> \
    -topic unicorn-ride-request-topic \
    -reg us-east-1 \
    -nm 500 \
    -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

The code has the following parameters:

  • -bs$BOOTSTRAP_BROKERS (the MSK cluster bootstrap brokers)
  • -rn – The CrossAccountGlueSchemaRegistryRoleArn value from the SchemaRegistryStack stack outputs in Account B
  • -topic – the Kafka topic unicorn-ride-request-topic
  • -regus-east-1 (change it according to your Region, it’s used for the AWS STS endpoint and Schema Registry)
  • -nm: 500 (the number of messages the producer application sends to the Kafka topic)
  • -externalId – The same external ID (for example, demo10A) that you used while creating the CloudFormation stack in Account B

The following screenshot shows the Kafka producer logs showing Schema Version Id received..., which means it has retrieved the Avro schema unicorn-ride-request-schema-avro from Account B and messages were sent to the Kafka topic on the MSK cluster in Account A.

Kafka producer code

The complete Kafka producer implementation is available on GitHub. In this section, we break down the code.

  • getProducerConfig() initializes the producer properties, as shown in the following code:
    • VALUE_SERIALIZER_CLASS_CONFIG – The GlueSchemaRegistryKafkaSerializer.class.getName() AWS serializer implementation that serializes data records (the implementation is available on GitHub)
    • REGISTRY_NAME – The Schema Registry from Account B
    • SCHEMA_NAME – The schema name from Account B
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName);
        props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry");
        props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro");
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
        return props;
}
  • startProducer() assumes the role in Account B to be able to connect with the Schema Registry in Account B and sends messages to the Kafka topic on the MSK cluster:
public void startProducer() {
        assumeGlueSchemaRegistryRole();
        KafkaProducer<String, UnicornRideRequest> producer = 
		new KafkaProducer<String,UnicornRideRequest>(getProducerConfig());
        int numberOfMessages = Integer.valueOf(str_numOfMessages);
        logger.info("Starting to send records...");
        for(int i = 0;i < numberOfMessages;i ++)
        {
            UnicornRideRequest rideRequest = getRecord(i);
            String key = "key-" + i;
            ProducerRecord<String, UnicornRideRequest> record = 
		new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest);
            producer.send(record, new ProducerCallback());
        }
 }
  • assumeGlueSchemaRegistryRole() as shown in the following code uses AWS STS to assume the cross-account Schema Registry IAM role in Account B. (For more information, see Temporary security credentials in IAM.) The response from stsClient.assumeRole(roleRequest) contains the temporary credentials, which include accessKeyId, secretAccessKey, and a sessionToken. It then sets the temporary credentials in the system properties. The AWS SDK for Java uses these credentials while accessing the Schema Registry (through the Schema Registry serializer). For more information, see Using Credentials.
    public void assumeGlueSchemaRegistryRole() {
            try {
    	   Region region = Region.of(regionName);
                if(!Region.regions().contains(region))
                     throw new RuntimeException("Region : " + regionName + " is invalid.");
                StsClient stsClient = StsClient.builder().region(region).build();
                AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
                        .roleArn(this.assumeRoleARN)
                        .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo")
    	           .externalId(this.externalId)	
                        .build();
                AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
                Credentials myCreds = roleResponse.credentials();
                System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
                System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
                System.setProperty("aws.sessionToken", myCreds.sessionToken());
                stsClient.close();
            } catch (StsException e) {
                logger.error(e.getMessage());
                System.exit(1);
            }
        }

  • createUnicornRideRequest() uses the Avro schema (unicorn ride request schema) generated classes to create a SpecificRecord. For this post, the unicorn ride request attributes values are hard-coded in this method. See the following code:
    public UnicornRideRequest getRecord(int requestId){
                /*
                 Initialise UnicornRideRequest object of
                 class that is generated from AVRO Schema
                 */
               UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder()
                .setRequestId(requestId)
                .setPickupAddress("Melbourne, Victoria, Australia")
                .setDestinationAddress("Sydney, NSW, Aus")
                .setRideFare(1200.50F)
                .setRideDuration(120)
                .setPreferredUnicornColor(UnicornPreferredColor.WHITE)
                .setRecommendedUnicorn(RecommendedUnicorn.newBuilder()
                        .setUnicornId(requestId*2)
                        .setColor(unicorn_color.WHITE)
                        .setStarsRating(5).build())
                .setCustomer(Customer.newBuilder()
                        .setCustomerAccountNo(1001)
                        .setFirstName("Dummy")
                        .setLastName("User")
                        .setEmailAddresses(Arrays.asList("[email protected]"))
                        .setCustomerAddress("Flinders Street Station")
                        .setModeOfPayment(ModeOfPayment.CARD)
                        .setCustomerRating(5).build()).build();
                logger.info(rideRequest.toString());
                return rideRequest;
        }

Configure the Kafka consumer

The MSKClientStack stack created the KafkaConsumerInstance instance for the Kafka consumer application. You can view all the instances created by the stack on the Amazon EC2 console.

To configure the Kafka consumer accessing the Schema Registry in the central AWS account, complete the following steps:

  1. Open a new terminal in the Cloud9EC2Bastion AWS Cloud9 environment.
  2. SSH into the KafkaConsumerInstance EC2 instance and set the Region as per your requirement:
    ssh -i <keypair PEM file> ec2-user@<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. Set the environment variable MSK_CLUSTER_ARN pointing to the MSK cluster’s ARN:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters |  jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d \")

Change the .ClusterName value if you used a different name for the MSK cluster CloudFormation stack. The cluster name is the same as the stack name.

  1. Set the environment variable BOOTSTRAP_BROKERS pointing to the bootstrap brokers:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verify the environment variables:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

The MSKClientStack stack copied the Kafka consumer client JAR file called kafka-cross-account-gsr-consumer.jar to the KafkaConsumerInstance instance. It contains the Kafka consumer client that reads messages from the Kafka topic unicorn-ride-request-topic on the MSK cluster and accesses the unicorn-ride-request-schema-avro Avro schema from the unicorn-ride-request-registry registry in Account B. The Kafka consumer code, which we cover later in this post, is available on GitHub.

  1. Run the following commands and verify kafka-cross-account-gsr-consumer.jar exists:
    cd ~
    ls -ls

  2. Run the following command to run the Kafka consumer in the KafkaConsumerInstance terminal:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS \
    -rn <Account B IAM role arn that Kafka consumer application needs to assume> \
    -topic unicorn-ride-request-topic \
    -reg us-east-1 \
    -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

The code has the following parameters:

  • -bs$BOOTSTRAP_BROKERS (the MSK cluster bootstrap brokers)
  • -rn – The CrossAccountGlueSchemaRegistryRoleArn value from the SchemaRegistryStack stack outputs in Account B
  • -topic – The Kafka topic unicorn-ride-request-topic
  • -regus-east-1 (change it according to your Region, it’s used for the AWS STS endpoint and Schema Registry)
  • -externalId – The same external ID (for example, demo10A) that you used while creating the CloudFormation stack in Account B

The following screenshot shows the Kafka consumer logs successfully reading messages from the Kafka topic on the MSK cluster in Account A and accessing the Avro schema unicorn-ride-request-schema-avro from the unicorn-ride-request-registry schema registry in Account B.

If you see the similar logs, it means both the Kafka consumer applications have been able to connect successfully with the centralized Schema Registry in Account B and are able to validate messages while sending and consuming messages from the MSK cluster in Account A.

Kafka consumer code

The complete Kafka consumer implementation is available on GitHub. In this section, we break down the code.

  • getConsumerConfig() initializes consumer properties, as shown in the following code:
    • VALUE_DESERIALIZER_CLASS_CONFIG – The GlueSchemaRegistryKafkaDeserializer.class.getName() AWS deserializer implementation that deserializes the SpecificRecord as per the encoded schema ID from the Schema Registry (the implementation is available on GitHub).
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
        return props;
}
  • startConsumer() assumes the role in Account B to be able to connect with the Schema Registry in Account B and reads messages from the Kafka topic on the MSK cluster:
public void startConsumer() {
  logger.info("starting consumer...");
  assumeGlueSchemaRegistryRole();
  KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig());
  consumer.subscribe(Collections.singletonList(topic));
  int count = 0;
  while (true) {
            final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, UnicornRideRequest> record : records) {
                final UnicornRideRequest rideRequest = record.value();
                logger.info(String.valueOf(rideRequest.getRequestId()));
                logger.info(rideRequest.toString());
            }
        }
}
  • assumeGlueSchemaRegistryRole() as shown in the following code uses AWS STS to assume the cross-account Schema Registry IAM role in Account B. The response from stsClient.assumeRole(roleRequest) contains the temporary credentials, which include accessKeyId, secretAccessKey, and a sessionToken. It then sets the temporary credentials in the system properties. The SDK for Java uses these credentials while accessing the Schema Registry (through the Schema Registry serializer). For more information, see Using Credentials.
public void assumeGlueSchemaRegistryRole() {
        try {
	Region region = Region.of(regionName);
            if(!Region.regions().contains(region))
                 throw new RuntimeException("Region : " + regionName + " is invalid.");
            StsClient stsClient = StsClient.builder().region(region).build();
            AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
                    .roleArn(this.assumeRoleARN)
                    .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo")
                    .externalId(this.externalId)
                    .build();
            AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
            Credentials myCreds = roleResponse.credentials();
            System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
            System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
            System.setProperty("aws.sessionToken", myCreds.sessionToken());
            stsClient.close();
        } catch (StsException e) {
            logger.error(e.getMessage());
            System.exit(1);
        }
    }

Compile and generate Avro schema classes

Like any other part of building and deploying your application, schema compilation and the process of generating Avro schema classes should be included in your CI/CD pipeline. There are multiple ways to generate Avro schema classes; we use avro-maven-plugin for this post. The CI/CD process can also use avro-tools to compile Avro schema to generate classes. The following code is an example of how you can use avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination>

//compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

Implementation overview

To recap, we start with defining and registering an Avro schema for the unicorn ride request message in the AWS Glue Schema Registry in Account B, the central data lake account. In Account A, we create an MSK cluster and Kafka producer and consumer EC2 instances with their respective application code (kafka-cross-account-gsr-consumer.jar and kafka-cross-account-gsr-producer.jar) and deployed in them using the CloudFormation stack.

When we run the producer application in Account A, the serializer (GlueSchemaRegistryKafkaSerializer) from the AWS Glue Schema Registry SerDe library provided as the configuration gets the unicorn ride request schema (UnicornRideRequest.avsc) from the central Schema Registry residing in Account B to serialize the unicorn ride request message. It uses the IAM role (temporary credentials) in Account B and Region, schema registry name (unicorn-ride-request-registry), and schema name (unicorn-ride-request-schema-avro) provided as the configuration to connect to the central Schema Registry. After the message is successfully serialized, the producer application sends it to the Kafka topic (unicorn-ride-request-topic) on the MSK cluster.

When we run the consumer application in Account A, the deserializer (GlueSchemaRegistryKafkaDeserializer) from the Schema Registry SerDe library provided as the configuration extracts the encoded schema ID from the message read from the Kafka topic (unicorn-ride-request-topic) and gets the schema for the same ID from the central Schema Registry in Account B. It then deserializes the message. It uses the IAM role (temporary credentials) in Account B and the Region provided as the configuration to connect to the central Schema Registry. The consumer application also configures Avro’s SPECIFIC_RECORD to inform the deserializer that the message is of a specific type (unicorn ride request). After the message is successfully deserialized, the consumer application processes it as per the requirements.

Clean up

The final step is to clean up. To avoid unnecessary charges, you should remove all the resources created by the CloudFormation stacks used for this post. The simplest way to do so is to delete the stacks. First delete the MSKClusterStack followed by MSKClientStack from Account A. Then delete the SchemaRegistryStack from Account B.

Conclusion

In this post, we demonstrated how to use AWS Glue Schema Registry with Amazon MSK and stream processing applications to validate messages using an Avro schema. We created a distributed architecture where the Schema Registry resides in a central AWS account (data lake account) and Kafka producer and consumer applications reside in a separate AWS account. We created an Avro schema in the schema registry in the central account to make it efficient for the application teams to maintain schemas in a single place. Because AWS Glue Schema Registry supports identity-based access policies, we used the cross-account IAM role to allow the Kafka producer and consumer applications running in a separate account to securely access the schema from the central account to validate messages. Because the Avro schema was agreed in advance, we used Avro SpecificRecord to ensure type safety at compile time and avoid runtime schema validation issues at the client side. The code used for this post is available on GitHub for reference.

To learn more about the services and resources in this solution, refer to AWS Glue Schema Registry, the Amazon MSK Developer Guide, the AWS Glue Schema Registry SerDe library, and IAM tutorial: Delegate access across AWS accounts using IAM roles.


About the Author

Vikas Bajaj is a Principal Solutions Architect at Amazon Web Service. Vikas works with digital native customers and advises them on technology architecture and modeling, and options and solutions to meet strategic business objectives. He makes sure designs and solutions are efficient, sustainable, and fit-for-purpose for current and future business needs. Apart from architecture and technology discussions, he enjoys watching and playing cricket.