Validate streaming data over Amazon MSK using schemas in cross-account AWS Glue Schema Registry

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/validate-streaming-data-over-amazon-msk-using-schemas-in-cross-account-aws-glue-schema-registry/

Today’s businesses face an unprecedented growth in the volume of data. A growing portion of the data is generated in real time by IoT devices, websites, business applications, and various other sources. Businesses need to process and analyze this data as soon as it arrives to make business decisions in real time. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables building and running stream processing applications that use Apache Kafka to collect and process data in real time.

Stream processing applications using Apache Kafka don’t communicate with each other directly; they communicate via sending and receiving messages over Kafka topics. For stream processing applications to communicate efficiently and confidently, a message payload structure must be defined in terms of attributes and data types. This structure describes the schema applications use when sending and receiving messages. However, with a large number of producer and consumer applications, even a small change in schema (removing a field, adding a new field, or change in data type) may cause issues for downstream applications that are difficult to debug and fix.

Traditionally, teams have relied on change management processes (such as approvals and maintenance windows) or other informal mechanisms (documentation, emails, collaboration tools, and so on) to inform one another of data schema changes. However, these mechanisms don’t scale and are prone to mistakes. The AWS Glue Schema Registry allows you to centrally publish, discover, control, validate, and evolve schemas for stream processing applications. With the AWS Glue Schema Registry, you can manage and enforce schemas on data streaming applications using Apache Kafka, Amazon MSK, Amazon Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

This post demonstrates how Apache Kafka stream processing applications validate messages using an Apache Avro schema stored in the AWS Glue Schema registry residing in a central AWS account. We use the AWS Glue Schema Registry SerDe library and Avro SpecificRecord to validate messages in stream processing applications while sending and receiving messages from a Kafka topic on an Amazon MSK cluster. Although we use an Avro schema for this post, the same approach and concept applies to JSON schemas as well.

Use case

Let’s assume a fictitious rideshare company that offers unicorn rides. To draw actionable insights, they need to process a stream of unicorn ride request messages. They expect rides to be very popular and want to make sure their solution can scale. They’re also building a central data lake where all their streaming and operation data is stored for analysis. They’re customer obsessed, so they expect to add new fun features to future rides, like choosing the hair color of your unicorn, and will need to reflect these attributes in the ride request messages. To avoid issues in downstream applications due to future schema changes, they need a mechanism to validate messages with a schema hosted in a central schema registry. Having schemas in a central schema registry makes it easier for the application teams to publish, validate, evolve, and maintain schemas in a single place.

Solution overview

The company uses Amazon MSK to capture and distribute the unicorn ride request messages at scale. They define an Avro schema for unicorn ride requests because it provides rich data structures, supports direct mapping to JSON, as well as a compact, fast, and binary data format. Because the schema was agreed in advance, they decided to use Avro SpecificRecord.SpecificRecord is an interface from the Avro library that allows the use of an Avro record as a POJO. This is done by generating a Java class (or classes) from the schema, by using avro-maven-plugin. They use AWS Identity and Access Management (IAM) cross-account roles to allow producer and consumer applications from the other AWS account to safely and securely access schemas in the central Schema Registry account.

The AWS Glue Schema Registry is in Account B, whereas the MSK cluster and Kafka producer and consumer applications are in Account A. We use the following two IAM roles to enable cross-account access to the AWS Glue Schema Registry. Apache Kafka clients in Account A assume a role in Account B using an identity-based policy because the AWS Glue Schema Registry doesn’t support resource-based policies.

  • Account A IAM role – Allows producer and consumer applications to assume an IAM role in Account B.
  • Account B IAM role – Trusts all IAM principals from Account A and allows them to perform read actions on the AWS Glue Schema Registry in Account B. In a real use case scenario, IAM principals that can assume cross-account roles should be scoped more specifically.

The following architecture diagram illustrates the solution:

The solution works as follows:

  1. A Kafka producer running in Account A assumes the cross-account Schema Registry IAM role in Account B by calling the AWS Security Token Service (AWS STS) assumeRole API.
  2. The Kafka producer retrieves the unicorn ride request Avro schema version ID from the AWS Glue Schema Registry for the schema that’s embedded in the unicorn ride request POJO. Fetching the schema version ID is internally managed by the AWS Glue Schema Registry SerDe’s serializer. The serializer has to be configured as part of the Kafka producer configuration.
  3. If the schema exists in the AWS Glue Schema Registry, the serializer decorates the data record with the schema version ID and then serializes it before delivering it to the Kafka topic on the MSK cluster.
  4. The Kafka consumer running in Account A assumes the cross-account Schema Registry IAM role in Account B by calling the AWS STS assumeRole API.
  5. The Kafka consumer starts polling the Kafka topic on the MSK cluster for data records.
  6. The Kafka consumer retrieves the unicorn ride request Avro schema from the AWS Glue Schema Registry, matching the schema version ID that’s encoded in the unicorn ride request data record. Fetching the schema is internally managed by the AWS Glue Schema Registry SerDe’s deserializer. The deserializer has to be configured as part of the Kafka consumer configuration. If the schema exists in the AWS Glue Schema Registry, the deserializer deserializes the data record into the unicorn ride request POJO for the consumer to process it.

The AWS Glue Schema Registry SerDe library also supports optional compression configuration to save on data transfers. For more information about the Schema Registry, see How the Schema Registry works.

Unicorn ride request Avro schema

The following schema (UnicornRideRequest.avsc) defines a record representing a unicorn ride request, which contains ride request attributes along with the customer attributes and system-recommended unicorn attributes:

{
    "type": "record",
    "name": "UnicornRideRequest",
    "namespace": "demo.glue.schema.registry.avro",
    "fields": [
      {"name": "request_id", "type": "int", "doc": "customer request id"},
      {"name": "pickup_address","type": "string","doc": "customer pickup address"},
      {"name": "destination_address","type": "string","doc": "customer destination address"},
      {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"},
      {"name": "ride_duration","type": "int","doc": "ride duration in minutes"},
      {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"},
      {
        "name": "recommended_unicorn",
        "type": {
          "type": "record",
          "name": "RecommendedUnicorn",
          "fields": [
            {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"},
            {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}},
            {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
          ]
        }
      },
      {
        "name": "customer",
        "type": {
          "type": "record",
          "name": "Customer",
          "fields": [
            {"name": "customer_account_no","type": "int", "doc": "customer account number"},
            {"name": "first_name","type": "string"},
            {"name": "middle_name","type": ["null","string"], "default": null},
            {"name": "last_name","type": "string"},
            {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]},
            {"name": "customer_address","type": "string","doc": "customer address"},
            {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"},
            {"name": "customer_rating", "type": ["null", "int"], "default": null}
          ]
        }
      }
    ]
  }

Prerequisites

To use this solution, you must have two AWS accounts:

  • Account A – For the MSK cluster, Kafka producer and consumer Amazon Elastic Compute Cloud (Amazon EC2) instances, and AWS Cloud9 environment
  • Account B – For the Schema Registry and schema

For this solution, we use Region us-east-1, but you can change this as per your requirements.

Next, we create the resources in each account using AWS CloudFormation templates.

Create resources in Account B

We create the following resources in Account B:

  • A schema registry
  • An Avro schema
  • An IAM role with the AWSGlueSchemaRegistryReadonlyAccess managed policy and an instance profile, which allows all Account A IAM principals to assume it
  • The UnicornRideRequest.avsc Avro schema shown earlier, which is used as a schema definition in the CloudFormation template

Make sure you have the appropriate permissions to create these resources.

  1. Log in to Account B.
  2. Launch the following CloudFormation stack.
  3. For Stack name, enter SchemaRegistryStack.
  4. For Schema Registry name, enter unicorn-ride-request-registry.
  5. For Avro Schema name, enter unicorn-ride-request-schema-avro.
  6. For the Kafka client’s AWS account ID, enter your Account A ID.
  7. For ExternalId, enter a unique random ID (for example, demo10A), which should be provided by the Kafka clients in Account A while assuming the IAM role in this account.

For more information about cross-account security, see The confused deputy problem.

  1. When the stack is complete, on the Outputs tab of the stack, copy the value for CrossAccountGlueSchemaRegistryRoleArn.

The Kafka producer and consumer applications created in Account A assume this role to access the Schema Registry and schema in Account B.

  1. To verify the resources were created, on the AWS Glue console, choose Schema registries in the navigation bar, and locate unicorn-ride-request-registry.
  2. Choose the registry unicorn-ride-request-registry and verify that it contains unicorn-ride-request-schema-avro in the Schemas section.
  3. Choose the schema to see its content.

The IAM role created by the SchemaRegistryStack stack allows all Account A IAM principals to assume it and perform read actions on the AWS Glue Schema Registry. Let’s look at the trust relationships of the IAM role.

  1. On the SchemaRegistryStack stack Outputs tab, copy the value for CrossAccountGlueSchemaRegistryRoleName.
  2. On the IAM console, search for this role.
  3. Choose Trust relationships and look at its trusted entities to confirm that Account A is listed.
  4. In the Conditions section, confirm that sts:ExternalId has the same unique random ID provided during stack creation.

Create resources in Account A

We create the following resources in Account A:

  • A VPC
  • EC2 instances for the Kafka producer and consumer
  • An AWS Cloud9 environment
  • An MSK cluster

As a prerequisite, create an EC2 keypair and download it on your machine to be able to SSH into EC2 instances. Also create an MSK cluster configuration with default values. You need to have permissions to create the CloudFormation stack, EC2 instances, AWS Cloud9 environment, MSK cluster, MSK cluster configuration, and IAM role.

  1. Log in to Account A.
  2. Launch the following CloudFormation stack to launch the VPC, EC2 instances, and AWS Cloud9 environment.
  3. For Stack name, enter MSKClientStack.
  4. Provide the VPC and subnet CIDR ranges.
  5. For EC2 Keypair, choose an existing EC2 keypair.
  6. For the latest EC2 AMI ID, select the default option.
  7. For the cross-account IAM role ARN, use the value for CrossAccountGlueSchemaRegistryRoleArn (available on the Outputs tab of SchemaRegistryStack).
  8. Wait for the stack to create successfully.
  9. Launch the following CloudFormation stack to create the MSK cluster.
  10. For Stack name, enter MSKClusterStack.
  11. Use Amazon MSK version 2.7.1.
  12. For the MSK cluster configuration ARN, enter the MSK cluster configuration ARN. One that you created as part of the prerequisite.
  13. For the MSK cluster configuration revision number, enter 1 or change it according to your version.
  14. For the client CloudFormation stack name, enter MSKClientStack (the stack name that you created prior to this stack).

Configure the Kafka producer

To configure the Kafka producer accessing the Schema Registry in the central AWS account, complete the following steps:

  1. Log in to Account A.
  2. On the AWS Cloud9 console, choose the Cloud9EC2Bastion environment created by the MSKClientStack stack.
  3. On the File menu, choose Upload Local Files.
  4. Upload the EC2 keypair file that you used earlier while creating the stack.
  5. Open a new terminal and change the EC2 keypair permissions:
    chmod 0400 <keypair PEM file>

  6. SSH into the KafkaProducerInstance EC2 instance and set the Region as per your requirement:
    ssh -i <keypair PEM file> ec2-user@<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. Set the environment variable MSK_CLUSTER_ARN pointing to the MSK cluster’s ARN:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters |  jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d \")

Change the .ClusterName value in the code if you used a different name for the MSK cluster CloudFormation stack. The cluster name is the same as the stack name.

  1. Set the environment variable BOOTSTRAP_BROKERS pointing to the bootstrap brokers:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verify the environment variables:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. Create a Kafka topic called unicorn-ride-request-topic in your MSK cluster, which is used by the Kafka producer and consumer applications later:
    cd ~/kafka
    
    ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS \
    --topic unicorn-ride-request-topic \
    --create --partitions 3 --replication-factor 2
    
    ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

The MSKClientStack stack copied the Kafka producer client JAR file called kafka-cross-account-gsr-producer.jar to the KafkaProducerInstance instance. It contains the Kafka producer client that sends messages to the Kafka topic unicorn-ride-request-topic on the MSK cluster and accesses the unicorn-ride-request-schema-avro Avro schema from the unicorn-ride-request-registry schema registry in Account B. The Kafka producer code, which we cover later in this post, is available on GitHub.

  1. Run the following commands and verify kafka-cross-account-gsr-producer.jar exists:
    cd ~
    ls -ls

  2. Run the following command to run the Kafka producer in the KafkaProducerInstance terminal:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS \
    -rn <Account B IAM role arn that Kafka producer application needs to assume> \
    -topic unicorn-ride-request-topic \
    -reg us-east-1 \
    -nm 500 \
    -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

The code has the following parameters:

  • -bs$BOOTSTRAP_BROKERS (the MSK cluster bootstrap brokers)
  • -rn – The CrossAccountGlueSchemaRegistryRoleArn value from the SchemaRegistryStack stack outputs in Account B
  • -topic – the Kafka topic unicorn-ride-request-topic
  • -regus-east-1 (change it according to your Region, it’s used for the AWS STS endpoint and Schema Registry)
  • -nm: 500 (the number of messages the producer application sends to the Kafka topic)
  • -externalId – The same external ID (for example, demo10A) that you used while creating the CloudFormation stack in Account B

The following screenshot shows the Kafka producer logs showing Schema Version Id received..., which means it has retrieved the Avro schema unicorn-ride-request-schema-avro from Account B and messages were sent to the Kafka topic on the MSK cluster in Account A.

Kafka producer code

The complete Kafka producer implementation is available on GitHub. In this section, we break down the code.

  • getProducerConfig() initializes the producer properties, as shown in the following code:
    • VALUE_SERIALIZER_CLASS_CONFIG – The GlueSchemaRegistryKafkaSerializer.class.getName() AWS serializer implementation that serializes data records (the implementation is available on GitHub)
    • REGISTRY_NAME – The Schema Registry from Account B
    • SCHEMA_NAME – The schema name from Account B
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName);
        props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry");
        props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro");
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
        return props;
}
  • startProducer() assumes the role in Account B to be able to connect with the Schema Registry in Account B and sends messages to the Kafka topic on the MSK cluster:
public void startProducer() {
        assumeGlueSchemaRegistryRole();
        KafkaProducer<String, UnicornRideRequest> producer = 
		new KafkaProducer<String,UnicornRideRequest>(getProducerConfig());
        int numberOfMessages = Integer.valueOf(str_numOfMessages);
        logger.info("Starting to send records...");
        for(int i = 0;i < numberOfMessages;i ++)
        {
            UnicornRideRequest rideRequest = getRecord(i);
            String key = "key-" + i;
            ProducerRecord<String, UnicornRideRequest> record = 
		new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest);
            producer.send(record, new ProducerCallback());
        }
 }
  • assumeGlueSchemaRegistryRole() as shown in the following code uses AWS STS to assume the cross-account Schema Registry IAM role in Account B. (For more information, see Temporary security credentials in IAM.) The response from stsClient.assumeRole(roleRequest) contains the temporary credentials, which include accessKeyId, secretAccessKey, and a sessionToken. It then sets the temporary credentials in the system properties. The AWS SDK for Java uses these credentials while accessing the Schema Registry (through the Schema Registry serializer). For more information, see Using Credentials.
    public void assumeGlueSchemaRegistryRole() {
            try {
    	   Region region = Region.of(regionName);
                if(!Region.regions().contains(region))
                     throw new RuntimeException("Region : " + regionName + " is invalid.");
                StsClient stsClient = StsClient.builder().region(region).build();
                AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
                        .roleArn(this.assumeRoleARN)
                        .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo")
    	           .externalId(this.externalId)	
                        .build();
                AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
                Credentials myCreds = roleResponse.credentials();
                System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
                System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
                System.setProperty("aws.sessionToken", myCreds.sessionToken());
                stsClient.close();
            } catch (StsException e) {
                logger.error(e.getMessage());
                System.exit(1);
            }
        }

  • createUnicornRideRequest() uses the Avro schema (unicorn ride request schema) generated classes to create a SpecificRecord. For this post, the unicorn ride request attributes values are hard-coded in this method. See the following code:
    public UnicornRideRequest getRecord(int requestId){
                /*
                 Initialise UnicornRideRequest object of
                 class that is generated from AVRO Schema
                 */
               UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder()
                .setRequestId(requestId)
                .setPickupAddress("Melbourne, Victoria, Australia")
                .setDestinationAddress("Sydney, NSW, Aus")
                .setRideFare(1200.50F)
                .setRideDuration(120)
                .setPreferredUnicornColor(UnicornPreferredColor.WHITE)
                .setRecommendedUnicorn(RecommendedUnicorn.newBuilder()
                        .setUnicornId(requestId*2)
                        .setColor(unicorn_color.WHITE)
                        .setStarsRating(5).build())
                .setCustomer(Customer.newBuilder()
                        .setCustomerAccountNo(1001)
                        .setFirstName("Dummy")
                        .setLastName("User")
                        .setEmailAddresses(Arrays.asList("[email protected]"))
                        .setCustomerAddress("Flinders Street Station")
                        .setModeOfPayment(ModeOfPayment.CARD)
                        .setCustomerRating(5).build()).build();
                logger.info(rideRequest.toString());
                return rideRequest;
        }

Configure the Kafka consumer

The MSKClientStack stack created the KafkaConsumerInstance instance for the Kafka consumer application. You can view all the instances created by the stack on the Amazon EC2 console.

To configure the Kafka consumer accessing the Schema Registry in the central AWS account, complete the following steps:

  1. Open a new terminal in the Cloud9EC2Bastion AWS Cloud9 environment.
  2. SSH into the KafkaConsumerInstance EC2 instance and set the Region as per your requirement:
    ssh -i <keypair PEM file> ec2-user@<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. Set the environment variable MSK_CLUSTER_ARN pointing to the MSK cluster’s ARN:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters |  jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d \")

Change the .ClusterName value if you used a different name for the MSK cluster CloudFormation stack. The cluster name is the same as the stack name.

  1. Set the environment variable BOOTSTRAP_BROKERS pointing to the bootstrap brokers:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verify the environment variables:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

The MSKClientStack stack copied the Kafka consumer client JAR file called kafka-cross-account-gsr-consumer.jar to the KafkaConsumerInstance instance. It contains the Kafka consumer client that reads messages from the Kafka topic unicorn-ride-request-topic on the MSK cluster and accesses the unicorn-ride-request-schema-avro Avro schema from the unicorn-ride-request-registry registry in Account B. The Kafka consumer code, which we cover later in this post, is available on GitHub.

  1. Run the following commands and verify kafka-cross-account-gsr-consumer.jar exists:
    cd ~
    ls -ls

  2. Run the following command to run the Kafka consumer in the KafkaConsumerInstance terminal:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS \
    -rn <Account B IAM role arn that Kafka consumer application needs to assume> \
    -topic unicorn-ride-request-topic \
    -reg us-east-1 \
    -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

The code has the following parameters:

  • -bs$BOOTSTRAP_BROKERS (the MSK cluster bootstrap brokers)
  • -rn – The CrossAccountGlueSchemaRegistryRoleArn value from the SchemaRegistryStack stack outputs in Account B
  • -topic – The Kafka topic unicorn-ride-request-topic
  • -regus-east-1 (change it according to your Region, it’s used for the AWS STS endpoint and Schema Registry)
  • -externalId – The same external ID (for example, demo10A) that you used while creating the CloudFormation stack in Account B

The following screenshot shows the Kafka consumer logs successfully reading messages from the Kafka topic on the MSK cluster in Account A and accessing the Avro schema unicorn-ride-request-schema-avro from the unicorn-ride-request-registry schema registry in Account B.

If you see the similar logs, it means both the Kafka consumer applications have been able to connect successfully with the centralized Schema Registry in Account B and are able to validate messages while sending and consuming messages from the MSK cluster in Account A.

Kafka consumer code

The complete Kafka consumer implementation is available on GitHub. In this section, we break down the code.

  • getConsumerConfig() initializes consumer properties, as shown in the following code:
    • VALUE_DESERIALIZER_CLASS_CONFIG – The GlueSchemaRegistryKafkaDeserializer.class.getName() AWS deserializer implementation that deserializes the SpecificRecord as per the encoded schema ID from the Schema Registry (the implementation is available on GitHub).
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
        return props;
}
  • startConsumer() assumes the role in Account B to be able to connect with the Schema Registry in Account B and reads messages from the Kafka topic on the MSK cluster:
public void startConsumer() {
  logger.info("starting consumer...");
  assumeGlueSchemaRegistryRole();
  KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig());
  consumer.subscribe(Collections.singletonList(topic));
  int count = 0;
  while (true) {
            final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, UnicornRideRequest> record : records) {
                final UnicornRideRequest rideRequest = record.value();
                logger.info(String.valueOf(rideRequest.getRequestId()));
                logger.info(rideRequest.toString());
            }
        }
}
  • assumeGlueSchemaRegistryRole() as shown in the following code uses AWS STS to assume the cross-account Schema Registry IAM role in Account B. The response from stsClient.assumeRole(roleRequest) contains the temporary credentials, which include accessKeyId, secretAccessKey, and a sessionToken. It then sets the temporary credentials in the system properties. The SDK for Java uses these credentials while accessing the Schema Registry (through the Schema Registry serializer). For more information, see Using Credentials.
public void assumeGlueSchemaRegistryRole() {
        try {
	Region region = Region.of(regionName);
            if(!Region.regions().contains(region))
                 throw new RuntimeException("Region : " + regionName + " is invalid.");
            StsClient stsClient = StsClient.builder().region(region).build();
            AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
                    .roleArn(this.assumeRoleARN)
                    .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo")
                    .externalId(this.externalId)
                    .build();
            AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
            Credentials myCreds = roleResponse.credentials();
            System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
            System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
            System.setProperty("aws.sessionToken", myCreds.sessionToken());
            stsClient.close();
        } catch (StsException e) {
            logger.error(e.getMessage());
            System.exit(1);
        }
    }

Compile and generate Avro schema classes

Like any other part of building and deploying your application, schema compilation and the process of generating Avro schema classes should be included in your CI/CD pipeline. There are multiple ways to generate Avro schema classes; we use avro-maven-plugin for this post. The CI/CD process can also use avro-tools to compile Avro schema to generate classes. The following code is an example of how you can use avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination>

//compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

Implementation overview

To recap, we start with defining and registering an Avro schema for the unicorn ride request message in the AWS Glue Schema Registry in Account B, the central data lake account. In Account A, we create an MSK cluster and Kafka producer and consumer EC2 instances with their respective application code (kafka-cross-account-gsr-consumer.jar and kafka-cross-account-gsr-producer.jar) and deployed in them using the CloudFormation stack.

When we run the producer application in Account A, the serializer (GlueSchemaRegistryKafkaSerializer) from the AWS Glue Schema Registry SerDe library provided as the configuration gets the unicorn ride request schema (UnicornRideRequest.avsc) from the central Schema Registry residing in Account B to serialize the unicorn ride request message. It uses the IAM role (temporary credentials) in Account B and Region, schema registry name (unicorn-ride-request-registry), and schema name (unicorn-ride-request-schema-avro) provided as the configuration to connect to the central Schema Registry. After the message is successfully serialized, the producer application sends it to the Kafka topic (unicorn-ride-request-topic) on the MSK cluster.

When we run the consumer application in Account A, the deserializer (GlueSchemaRegistryKafkaDeserializer) from the Schema Registry SerDe library provided as the configuration extracts the encoded schema ID from the message read from the Kafka topic (unicorn-ride-request-topic) and gets the schema for the same ID from the central Schema Registry in Account B. It then deserializes the message. It uses the IAM role (temporary credentials) in Account B and the Region provided as the configuration to connect to the central Schema Registry. The consumer application also configures Avro’s SPECIFIC_RECORD to inform the deserializer that the message is of a specific type (unicorn ride request). After the message is successfully deserialized, the consumer application processes it as per the requirements.

Clean up

The final step is to clean up. To avoid unnecessary charges, you should remove all the resources created by the CloudFormation stacks used for this post. The simplest way to do so is to delete the stacks. First delete the MSKClusterStack followed by MSKClientStack from Account A. Then delete the SchemaRegistryStack from Account B.

Conclusion

In this post, we demonstrated how to use AWS Glue Schema Registry with Amazon MSK and stream processing applications to validate messages using an Avro schema. We created a distributed architecture where the Schema Registry resides in a central AWS account (data lake account) and Kafka producer and consumer applications reside in a separate AWS account. We created an Avro schema in the schema registry in the central account to make it efficient for the application teams to maintain schemas in a single place. Because AWS Glue Schema Registry supports identity-based access policies, we used the cross-account IAM role to allow the Kafka producer and consumer applications running in a separate account to securely access the schema from the central account to validate messages. Because the Avro schema was agreed in advance, we used Avro SpecificRecord to ensure type safety at compile time and avoid runtime schema validation issues at the client side. The code used for this post is available on GitHub for reference.

To learn more about the services and resources in this solution, refer to AWS Glue Schema Registry, the Amazon MSK Developer Guide, the AWS Glue Schema Registry SerDe library, and IAM tutorial: Delegate access across AWS accounts using IAM roles.


About the Author

Vikas Bajaj is a Principal Solutions Architect at Amazon Web Service. Vikas works with digital native customers and advises them on technology architecture and modeling, and options and solutions to meet strategic business objectives. He makes sure designs and solutions are efficient, sustainable, and fit-for-purpose for current and future business needs. Apart from architecture and technology discussions, he enjoys watching and playing cricket.