Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry

Post Syndicated from Brian Likosar original https://aws.amazon.com/blogs/big-data/validate-evolve-and-control-schemas-in-amazon-msk-and-amazon-kinesis-data-streams-with-aws-glue-schema-registry/

Data streaming technologies like Apache Kafka and Amazon Kinesis Data Streams capture and distribute data generated by thousands or millions of applications, websites, or machines. These technologies serve as a highly available transport layer that decouples the data-producing applications from data processors. However, the sheer number of applications producing, processing, routing, and consuming data can make it hard to coordinate and evolve data schemas, like adding or removing a data field, without introducing data quality issues and downstream application failures. Developers often build complex tools, write custom code, or rely on documentation, change management, and Wikis to protect against schema changes. This is quite error prone because it relies too heavily on human oversight. A common solution with data streaming technologies is a schema registry that provides for validation of schema changes to allow for safe evolution as business needs adjust over time.

AWS Glue Schema Registry, a serverless feature of AWS Glue, enables you to validate and reliably evolve streaming data against Apache Avro schemas at no additional charge. Through Apache-licensed serializers and deserializers, the Glue Schema Registry integrates with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Kinesis Data Streams, Apache Flink, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

This post explains the benefits of using the Glue Schema Registry and provides examples of how to use it with both Apache Kafka and Kinesis Data Streams.

With the Glue Schema Registry, you can eliminate defensive coding and cross-team coordination, improve data quality, reduce downstream application failures, and use a registry that is integrated across multiple AWS services. Each schema can be versioned within the guardrails of a compatibility mode, providing developers the flexibility to reliably evolve schemas. Additionally, the Glue Schema Registry can serialize data into a compressed format, helping you save on data transfer and storage costs.

Although there are many ways to leverage the Glue Schema Registry (including using the API to build your own integrations), in this post, we show two use cases. The Schema Registry is a free feature that can significantly improve data quality and developer productivity. If you use Avro schemas, you should be using the Schema Registry to supplement your solutions built on Apache Kafka (including Amazon MSK) or Kinesis Data Streams. The following diagram illustrates this architecture.

AWS Glue Schema Registry features

Glue Schema Registry has the following features:

  • Schema discovery – When a producer registers a schema change, metadata can be applied as a key-value pair to provide searchable information for administrators or developers. This metadata can indicate the original source of the data (source=MSK_west), the team name to contact (owner=DataEngineering), or AWS tags (environment=Production). You could potentially encrypt a field in your data on the producing client and use metadata to specify to potential consumer clients which public key fingerprint to use for decryption.
  • Schema compatibility – The versioning of each schema is governed by a compatibility mode. If a new version of a schema is requested to be registered that breaks the specified compatibility mode, the request fails and an exception is thrown. Compatibility checks enable developers building downstream applications to have a bounded set of scenarios to build applications against, which helps to prepare for the changes without issue. Commonly used modes are FORWARD, BACKWARD, and FULL. For more information about mode definitions, see Schema Versioning and Compatibility.
  • Schema validation – Glue Schema Registry serializers work to validate that the schema used during data production is compatible. If it isn’t, the data producer receives an exception from the serializer. This ensures that potentially breaking changes are found earlier in development cycles, and can also help prevent unintentional schema changes due to human error.
  • Auto-registration of schemas – If configured to do so, the producer of data can auto-register schema changes as they flow in the data stream. This is especially useful for use cases where the source of the data is change data capture from a database.
  • IAM support – Thanks to integrated AWS Identity and Access Management (IAM) support, only authorized producers can change certain schemas. Furthermore, only those consumers authorized to read the schema can do so. Schema changes are typically performed deliberately and with care, so it’s important to use IAM to control who performs these changes. Additionally, access control to schemas is important in situations where you might have sensitive information included in the schema definition itself. In the examples that follow, IAM roles are inferred via the AWS SDK for Java, so they are inherited from the Amazon Elastic Compute Cloud (Amazon EC2) instance’s role that the application runs in. IAM roles can also be applied to any other AWS service that could contain this code, such as containers or Lambda functions.
  • Integrations and other support – The provided serializers and deserializers are currently for Java clients using Apache Avro for data serialization. The GitHub repo also contains support for Apache Kafka Streams, Apache Kafka Connect, and Apache Flink—all licensed using the Apache License 2.0. We’re already working on additional language and data serialization support, but we need your feedback on what you’d like to see next.
  • Secondary deserializer – If you have already registered schemas in another schema registry, there’s an option for specifying a secondary deserializer when performing schema lookups. This allows for migrations from other schema registries without having to start anew. If the schema ID being used isn’t known to the Glue Schema Registry, it’s looked for in the secondary deserializer.
  • Compression – Using the Avro format already reduces message size due to its compact, binary format. Using a schema registry can further reduce data payload by no longer needing to send and receive schemas with each message. Glue Schema Registry libraries also provide an option for zlib compression, which can reduce data requirements even further by compressing the payload of the message. This varies by use case, but compression can reduce the size of the message significantly.

Example schema

For this post, we use the following schema to begin each of our use cases:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"}
 ]
}

Using AWS Glue Schema Registry with Amazon MSK and Apache Kafka

You can use the following Apache Kafka producer code to produce Apache Avro formatted messages to a topic with the preceding schema:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;

public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "customer");
properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
Schema schema_customer = new Parser().parse(new File("Customer.avsc"));
GenericRecord customer = new GenericData.Record(schema_customer);

try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, customer);
customer.put("first_name", "Ada");
customer.put("last_name", "Lovelace");
customer.put("full_name", "Ada Lovelace");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Sue");
customer.put("last_name", "Black");
customer.put("full_name", "Sue Black");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Anita");
customer.put("last_name", "Borg");
customer.put("full_name", "Anita Borg");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Grace");
customer.put("last_name", "Hopper");
customer.put("full_name", "Grace Hopper");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Neha");
customer.put("last_name", "Narkhede");
customer.put("full_name", "Neha Narkhede");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);
producer.flush();
System.out.println("Successfully produced 5 messages to a topic called " + topic);
} catch (final InterruptedException | SerializationException e) {
e.printStackTrace();
}
}
}

Use the following Apache Kafka consumer code to look up the schema information while consuming from a topic to learn the schema details:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSAvroDeserializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;


public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "gsr-client");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

try (final KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(properties)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);
for (final ConsumerRecord<String, GenericRecord> record : records) {
final GenericRecord value = record.value();
System.out.println("Received message: value = " + value);
}
			}
} catch (final SerializationException e) {
e.printStackTrace();
}
}
}

Using AWS Glue Schema Registry with Kinesis Data Streams

You can use the following Kinesis Producer Library (KPL) code to publish messages in Apache Avro format to a Kinesis data stream with the preceding schema:

private static final String SCHEMA_DEFINITION = "{"namespace": "Customer.avro",\n"
+ " "type": "record",\n"
+ " "name": "Customer",\n"
+ " "fields": [\n"
+ " {"name": "first_name", "type": "string"},\n"
+ " {"name": "last_name", "type": "string"}\n"
+ " ]\n"
+ "}";

KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion("us-west-1")

//[Optional] configuration for Schema Registry.

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration("us-west-1");

schemaRegistryConfig.setCompression(true);

config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

///Optional configuration ends.

final KinesisProducer producer = 
new KinesisProducer(config);

final ByteBuffer data = getDataToSend();

com.amazonaws.services.schemaregistry.common.Schema gsrSchema = 
new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");

ListenableFuture<UserRecordResult> f = producer.addUserRecord(
config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);

private static ByteBuffer getDataToSend() {
org.apache.avro.Schema avroSchema = 
new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);

GenericRecord user = new GenericData.Record(avroSchema);
user.put("name", "Emily");
user.put("favorite_number", 32);
user.put("favorite_color", "green");

ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
new GenericDatumWriter<>(avroSchema).write(user, encoder);
encoder.flush();
return ByteBuffer.wrap(outBytes.toByteArray());
}

On the consumer side, you can use the Kinesis Client Library (KCL) (v2.3 or later) to look up schema information while retrieving messages from a Kinesis data stream:

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration(this.region.toString());

 GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = 
new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);

 RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
 retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
 
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
retrievalConfig
);

 public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", 
processRecordsInput.records().size());
processRecordsInput.records()
.forEach(
r -> 
log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}", 
r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
 }
 
 private GenericRecord recordToAvroObj(KinesisClientRecord r) {
byte[] data = new byte[r.data().remaining()];
r.data().get(data, 0, data.length);
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
DatumReader datumReader = new GenericDatumReader<>(schema);

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
return (GenericRecord) datumReader.read(null, binaryDecoder);
 }

Example of schema evolution

As a producer, let’s say you want to add an additional field to our schema:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"},
 {"name": "full_name", "type": ["string", “null”], “default”: null}
]
}

Regardless of whether you’re following the Apache Kafka or Kinesis Data Streams example, you can use the previously provided producer code to publish new messages using this new schema version with the full_name field. This is simply a concatenation of first_name and last_name.

This schema change added an optional field (full_name), which is indicated by the type field having an option of null in addition to string with a default of null. In adding this optional field, we’ve created a schema evolution. This qualifies as a FORWARD compatible change because the producer has modified the schema and the consumer can read without updating its version of the schema. It’s a good practice to provide a default for a given field. This allows for its eventual removal if necessary. If it’s removed by the producer, the consumer uses the default that it knew for that field from before the removal.

This change is also a BACKWARD compatible change, because if the consumer changes the schema it expects to receive, it can use that default to fill in the value for the field it isn’t receiving. By being both FORWARD and BACKWARD compatible, it is therefore a FULL compatible change. The Glue Schema Registry serializers default to BACKWARD compatible, so we have to add a line declaring it as FULL.

In looking at the full option set, you may find FORWARD_ALL, BACKWARD_ALL, and FULL_ALL. These typically only come into play when you want to change data types for a field whose name you don’t change. The most common observed compatibility mode is BACKWARD, which is why it’s the default.

As a consumer application, however, you don’t want to have to recompile your application to handle the addition of a new field. If you want to reference the customer by full name, that’s your choice in your app instead of being forced to consume the new field and use it. When you consume the new messages you’ve just produced, your application doesn’t crash or have problems, because it’s still using the prior version of the schema, and that schema change is compatible with your application. To experience this in action, run the consumer code in one window and don’t interrupt it. As you run the producer application again, this time with messages following the new schema, you can still see output without issue, thanks to the Glue Schema Registry.

Conclusion

In this post, we discussed the benefits of using the Glue Schema Registry to register, validate, and evolve schemas for data streams as business needs change. We also provided examples of how to use Glue Schema Registry with Apache Kafka and Kinesis Data Streams.

For more information and to get started, see AWS Glue Schema Registry.


About the Authors

Brian Likosar is a Senior Streaming Specialist Solutions Architect at Amazon Web Services. Brian loves helping customers capture value from real-time streaming architectures, because he knows life doesn’t happen in batch. He’s a big fan of open-source collaboration, theme parks, and live music.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

 

 

Using self-hosted Apache Kafka as an event source for AWS Lambda

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/

Apache Kafka is an open source event streaming platform used to support workloads such as data pipelines and streaming analytics. Apache Kafka is a distributed streaming platform that it is conceptually similar to Amazon Kinesis.

With the launch of Kafka as an event source for Lambda, you can now consume messages from a topic in a Lambda function. This makes it easier to integrate your self-hosted Kafka clusters with downstream serverless workflows.

In this blog post, I explain how to set up an Apache Kafka cluster on Amazon EC2 and configure key elements in the networking configuration. I also show how to create a Lambda function to consume messages from a Kafka topic. Although the process is similar to using Amazon Managed Streaming for Apache Kafka (Amazon MSK) as an event source, there are also some important differences.

Overview

Using Kafka as an event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides the message batches to your function in the event payload.

Lambda is a consumer application for your Kafka topic. It processes records from one or more partitions and sends the payload to the target function. Lambda continues to process batches until there are no more messages in the topic.

Configuring networking for self-hosted Kafka

It’s best practice to deploy the Amazon EC2 instances running Kafka in private subnets. For the Lambda function to poll the Kafka instances, you must ensure that there is a NAT Gateway running in the public subnet of each Region.

It’s possible to route the traffic to a single NAT Gateway in one AZ for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT Gateway available in each Availability Zone. This walkthrough creates the following architecture:

Self-hosted Kafka architecture

  1. Deploy a VPC with public and private subnets and a NAT Gateway that enables internet access. To configure this infrastructure with AWS CloudFormation, deploy this template.
  2. From the VPC console, edit the default security group created by this template to provide inbound access to the following ports:
    • Custom TCP: ports 2888–3888 from all sources.
    • SSH (port 22), restricted to your own IP address.
    • Custom TCP: port 2181 from all sources.
    • Custom TCP: port 9092 from all sources.
    • All traffic from the same security group identifier.

Security Group configuration

Deploying the EC2 instances and installing Kafka

Next, you deploy the EC2 instances using this network configuration and install the Kafka application:

  1. From the EC2 console, deploy an instance running Ubuntu Server 18.04 LTS. Ensure that there is one instance in each private subnet, in different Availability Zones. Assign the default security group configured by the template.
  2. Next, deploy another EC2 instance in either of the public subnets. This is a bastion host used to access the private instances. Assign the default security group configured by the template.EC2 instances
  3. Connect to the bastion host, then SSH to the first private EC2 instance using the method for your preferred operating system. This post explains different methods. Repeat the process in another terminal for the second private instance.EC2 terminals
  4. On each instance, install Java:
    sudo add-apt-repository ppa:webupd8team/java
    sudo apt update
    sudo apt install openjdk-8-jdk
    java –version
  5. On each instance, install Kafka:
    wget http://www-us.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
    tar xzf kafka_2.12-2.3.1.tgz
    ln -s kafka_2.12-2.3.1 kafka

Configure and start Zookeeper

Configure and start the Zookeeper service that manages the Kafka brokers:

  1. On the first instance, configure the Zookeeper ID:
    cd kafka
    mkdir /tmp/zookeeper
    touch /tmp/zookeeper/myid
    echo "1" >> /tmp/zookeeper/myid
  2. Repeat the process on the second instance, using a different ID value:
    cd kafka
    mkdir /tmp/zookeeper
    touch /tmp/zookeeper/myid
    echo "2" >> /tmp/zookeeper/myid
  3. On the first instance, edit the config/zookeeper.properties file, adding the private IP address of the second instance:
    initLimit=5
    syncLimit=2
    tickTime=2000
    # list of servers: <ip>:2888:3888
    server.1=0.0.0.0:2888:3888 
    server.2=<<IP address of second instance>>:2888:3888
    
  4. On the second instance, edit the config/zookeeper.properties file, adding the private IP address of the first instance:
    initLimit=5
    syncLimit=2
    tickTime=2000
    # list of servers: <ip>:2888:3888
    server.1=<<IP address of first instance>>:2888:3888 
    server.2=0.0.0.0:2888:3888
  5. On each instance, start Zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties

Configure and start Kafka

Configure and start the Kafka broker:

  1. On the first instance, edit the config/server.properties file:
    broker.id=1
    zookeeper.connect=0.0.0.0:2181, =<<IP address of second instance>>:2181
  2. On the second instance, edit the config/server.properties file:
    broker.id=2
    zookeeper.connect=0.0.0.0:2181, =<<IP address of first instance>>:2181
  3. Start Kafka on each instance:
    bin/kafka-server-start.sh config/server.properties

At the end of this process, Zookeeper and Kafka are running on both instances. If you use separate terminals, it looks like this:

Zookeeper and Kafka terminals

Configuring and publishing to a topic

Kafka organizes channels of messages around topics, which are virtual groups of one or many partitions across Kafka brokers in a cluster. Multiple producers can send messages to Kafka topics, which can then be routed to and processed by multiple consumers. Producers publish to the tail of a topic and consumers read the topic at their own pace.

From either of the two instances:

  1. Create a new topic called test:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic test
  2. Start a producer:
    bin/kafka-console-producer.sh --broker-list localhost:9092 –topic
  3. Enter test messages to check for successful publication:Sending messages to the Kafka topic

At this point, you can successfully publish messages to your self-hosted Kafka cluster. Next, you configure a Lambda function as a consumer for the test topic on this cluster.

Configuring the Lambda function and event source mapping

You can create the Lambda event source mapping using the AWS CLI or AWS SDK, which provide the CreateEventSourceMapping API. In this walkthrough, you use the AWS Management Console to create the event source mapping.

Create a Lambda function that uses the self-hosted cluster and topic as an event source:

  1. From the Lambda console, select Create function.
  2. Enter a function name, and select Node.js 12.x as the runtime.
  3. Select the Permissions tab, and select the role name in the Execution role panel to open the IAM console.
  4. Choose Add inline policy and create a new policy called SelfHostedKafkaPolicy with the following permissions. Replace the resource example with the ARNs of your instances:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "ec2:CreateNetworkInterface",
                    "ec2:DescribeNetworkInterfaces",
                    "ec2:DescribeVpcs",
                    "ec2:DeleteNetworkInterface",
                    "ec2:DescribeSubnets",
                    "ec2:DescribeSecurityGroups",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": " arn:aws:ec2:<REGION>:<ACCOUNT_ID>:instance/<instance-id>"
            }
        ]
    }
    

    Create policy

  5. Choose Create policy and ensure that the policy appears in Permissions policies.IAM role page
  6. Back in the Lambda function, select the Configuration tab. In the Designer panel, choose Add trigger.
  7. In the dropdown, select Apache Kafka:
    • For Bootstrap servers, add each of the two instances private IPv4 DNS addresses with port 9092 appended.
    • For Topic name, enter ‘test’.
    • Enter your preferred batch size and starting position values (see this documentation for more information).
    • For VPC, select the VPC created by the template.
    • For VPC subnets, select the two private subnets.
    • For VPC security groups, select the default security group.
    • Choose Add.

Add trigger configuration

The trigger’s status changes to Enabled in the Lambda console after a few seconds. It then takes several minutes for the trigger to receive messages from the Kafka cluster.

Testing the Lambda function

At this point, you have created a VPC with two private and public subnets and a NAT Gateway. You have created a Kafka cluster on two EC2 instances in private subnets. You set up a target Lambda function with the necessary IAM permissions. Next, you publish messages to the test topic in Kafka and see the resulting invocation in the logs for the Lambda function.

  1. In the Function code panel, replace the contents of index.js with the following code and choose Deploy:
    exports.handler = async (event) => {
        // Iterate through keys
        for (let key in event.records) {
          console.log('Key: ', key)
          // Iterate through records
          event.records[key].map((record) => {
            console.log('Record: ', record)
            // Decode base64
            const msg = Buffer.from(record.value, 'base64').toString()
            console.log('Message:', msg)
          }) 
        }
    }
  2. Back in the terminal with the producer script running, enter a test message:Send test message in Kafka
  3. In the Lambda function console, select the Monitoring tab then choose View logs in CloudWatch. In the latest log stream, you see the original event and the decoded message:Log events output

Using Lambda as event source

The Lambda function target in the event source mapping does not need to be connected to a VPC to receive messages from the private instance hosting Kafka. However, you must provide details of the VPC, subnets, and security groups in the event source mapping for the Kafka cluster.

The Lambda function must have permission to describe VPCs and security groups, and manage elastic network interfaces. These execution roles permissions are:

  • ec2:CreateNetworkInterface
  • ec2:DescribeNetworkInterfaces
  • ec2:DescribeVpcs
  • ec2:DeleteNetworkInterface
  • ec2:DescribeSubnets
  • ec2:DescribeSecurityGroups

The event payload for the Lambda function contains an array of records. Each array item contains details of the topic and Kafka partition identifier, together with a timestamp and base64 encoded message:

Event payload example

There is an important difference in the way the Lambda service connects to the self-hosted Kafka cluster compared with Amazon MSK. MSK encrypts data in transit by default so the broker connection defaults to using TLS. With a self-hosted cluster, TLS authentication is not supported when using the Apache Kafka event source. Instead, if accessing brokers over the internet, the event source uses SASL/SCRAM authentication, which can be configured in the event source mapping:

SASL/SCRAM configuration

To learn how to configure SASL/SCRAM authentication your self-hosted Kafka cluster, see this documentation.

Conclusion

Lambda now supports self-hosted Kafka as an event source so you can invoke Lambda functions from messages in Kafka topics to integrate into other downstream serverless workflows.

This post shows how to configure a self-hosted Kafka cluster on EC2 and set up the network configuration. I also cover how to set up the event source mapping in Lambda and test a function to decode the messages sent from Kafka.

To learn more about how to use this feature, read the documentation. For more serverless learning resource, visit Serverless Land.

Using Amazon MSK as an event source for AWS Lambda

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available service that uses Apache Kafka to process real-time streaming data. Many producers can send messages to Kafka, which can then be routed to and processed by multiple consumers. Lambda now supports Amazon MSK as an event source, so it can consume messages and integrate with downstream serverless workflows.

Apache Kafka is a distributed streaming platform that it is similar to Amazon Kinesis. Amazon MSK simplifies the setup, scaling, and management of clusters running Kafka. It makes it easier to configure the application for multiple Availability Zones and securing with IAM. It’s fully compatible with Kafka and supports familiar community-build tools such as MirrorMakerApache Flink, and Prometheus.

In this blog post, I explain how to set up a test Amazon MSK cluster and configure key elements in the networking configuration. I also show how to create a Lambda function that is invoked by messages in Amazon MSK topics.

Overview

Using Amazon MSK as an event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

Lambda is a consumer application for your Kafka topic. It processes records from one or more partitions and sends the payload to the target function. Lambda continues to process batches until there are no more messages in the topic.

The Lambda function’s event payload contains an array of records. Each array item contains details of the topic and Kafka partition identifier, together with a timestamp and base64 encoded message:

MSK Lambda event payload

Network configuration overview

Amazon MSK is a highly available service, so it must be configured to run in a minimum of two Availability Zones in your preferred Region. To comply with security best practice, the brokers are usually configured in private subnets in each Region.

For Amazon MSK to invoke Lambda, you must ensure that there is a NAT Gateway running in the public subnet of each Region. It’s possible to route the traffic to a single NAT Gateway in one AZ for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT Gateway available in each Availability Zone.

Amazon MSK network architecture

The Lambda function target in the event source mapping does not need to be running in a VPC to receive messages from Amazon MSK. To learn more about configuring the private subnet table to use a NAT Gateway, see this Premium Support response. For an example of how to configure the infrastructure with AWS CloudFormation, see this template.

Required Lambda function permissions

The Lambda must have permission to describe VPCs and security groups, and manage elastic network interfaces. These execution roles permissions are:

  • ec2:CreateNetworkInterface
  • ec2:DescribeNetworkInterfaces
  • ec2:DescribeVpcs
  • ec2:DeleteNetworkInterface
  • ec2:DescribeSubnets
  • ec2:DescribeSecurityGroups

To access the Amazon MSK data stream, the Lambda function also needs two Kafka permissions: kafka:DescribeCluster and kafka:GetBootstrapBrokers. The policy template AWSLambdaMSKExecutionRole includes these permissions.

In AWS SAM templates, you can configure a Lambda function with an Amazon MSK event source mapping and the necessary permissions. For example:

Resources:
  ProcessMSKfunction:
    Type: AWS::Serverless::Function 
    Properties:
      CodeUri: code/
      Timeout: 3
      Handler: app.lambdaHandler
      Runtime: nodejs12.x
      Events:
  MSKEvent:
    Type: MSK
    Properties:
      StartingPosition: LATEST
      Stream: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/12abcd12-1234-1234-1234-1234abcd1234-1
      Topics:
        - MyTopic
      VpcConfig:
        SecurityGroupIds: "sg-ab1234ab"
        SubnetIds: "subnet-12345678,subnet-abcd1234"
  Policies:
  - AWSLambdaMSKExecutionRole: {}

Setting up an Amazon MSK cluster

Before starting, configure a new VPC with public and private subnets in two Availability Zones using this AWS CloudFormation template. In this configuration, the private subnets are set up to use a NAT Gateway.

To create the Amazon MSK cluster:

  1. From the Amazon MSK console, choose Create Cluster.
  2. Select Create cluster with custom settings, apply the name “my-MSK-cluster”, and keep the recommended Apache Kafka version.

    Create MSK cluster

  3. In the Configuration panel, keep the “Use the MSK default configuration” option selected.

    MSK configuration dialog box

  4. In Networking, select the new VPC and choose “2” for Number of Availability Zones. From the dropdowns, select the two Availability Zones in the VPC, and choose the private subnets for each.

    MSK networking configuration dialog box

  5. In the Brokers panel, choose kafka.t3.small for Broker instance type, and enter “1” for Number of brokers per Availability Zone.

    Brokers dialog box

  6. For Storage, enter “1” for EBS storage volume per broker.
  7. Keep the existing defaults and choose Create cluster. It takes up to 15 minutes to create the cluster and the status is displayed in the Cluster Summary panel.

    Creating MSK cluster summary

Configuring the Lambda event source mapping

You can create the Lambda event source mapping using the AWS CLI or AWS SDK, which provide the CreateEventSourceMapping API. In this walkthrough, you use the AWS Management Console to create the event source mapping.

First, you must create an Amazon MSK topic:

  1. Launch an EC2 instance, selecting t2.micro as the instance size. Connect to the instance once it is available.
  2. Follow these instructions in the Amazon MSK documentation to create a topic. Use the name “mytopic” and a replication-factor of 2, since there are only two Availability Zones in this test.

Now create a Lambda function that uses the Amazon MSK cluster and topic as an event source:

  1. From the Lambda console, select Create function.
  2. Enter a function name, and select Node.js 12.x as the runtime.
  3. Select the Permissions tab, and select the role name in the Execution role panel to open the IAM console.
  4. Choose Attach policies.Attach policies to IAM role
  5. In the filter box, enter “MSK”, then check the AWSLambdaMSKExecutionRole policy name.

    AWSLambdaMSKExecutionRole role

  6. Choose Attach policy.

    IAM role summary

  7. Back in the Lambda function, select the Configuration tab. In the Designer panel, choose Add trigger.
  8. In the dropdown, select MSK. In the MSK cluster box, select the cluster you configured earlier. Enter “mytopic” for Topic name and choose “Latest” for Starting Position. Choose Add.

    Add trigger
    Note: it takes several minutes for the trigger status to update from Creating to Enabled.

  9. In the Function code panel, replace the contents of index.js with:
    exports.handler = async (event) => {
        // Iterate through keys
        for (let key in event.records) {
          console.log('Key: ', key)
          // Iterate through records
          event.records[key].map((record) => {
            console.log('Record: ', record)
            // Decode base64
            const msg = Buffer.from(record.value, 'base64').toString()
            console.log('Message:', msg)
          }) 
        }
    }
  10. Choose Save.

Testing the event source mapping

At this point, you have created a VPC with two private and public subnets and a NAT Gateway. You have placed a new Amazon MSK cluster in the two private subnets. You set up a target Lambda function in the same VPC and private subnets, with the necessary IAM permissions. Next, you publish messages to the Amazon MSK topic and see the resulting invocation in the Lambda function’s logs.

  1. From the EC2 instance you created earlier, follow these instructions to produce messages. In the Kafka console producer script running in the terminal, enter “Message #1”:

    Produce Kafka messages from terminal

  2. Back in the Lambda function, select the Monitoring tab and choose View logs in CloudWatch. Select the first log stream.

    CloudWatch log streams

  3. In the Log events panel, expand the entries to see the message sent from the Amazon MSK topic. Note that the value attribute containing the message is base64 encoded.

    MSK event log in Lambda

Conclusion

Amazon MSK provide a fully managed, highly available service that uses Kafka to process real-time streaming data. Now Lambda supports Amazon MSK as an event source, you can invoke Lambda functions from messages in Kafka topics to integrate into your downstream serverless workflows.

In this post, I give an overview of how the integration compared with other event source mappings. I show how to create a test Amazon MSK cluster, configure the networking, and create the event source mapping with Lambda. I also show how to set up the Lambda function in the AWS Management Console, and refer to the equivalent AWS SAM syntax to simplify deployment.

To learn more about how to use this feature, read the documentation.

 

Stream Twitter data into Amazon Redshift using Amazon MSK and AWS Glue streaming ETL

Post Syndicated from Jobin George original https://aws.amazon.com/blogs/big-data/stream-twitter-data-into-amazon-redshift-using-amazon-msk-and-aws-glue-streaming-etl/

Real-time analytics provide a point-in-time view for a variety of use cases. At the heart of any real-time solution is streaming data processing, especially when dynamic new content is being continually regenerated. Organizations might start using streaming data for simple analytics from logs or basic arithmetic dashboards, but eventually develop applications to perform more sophisticated forms of analysis, including machine learning, and extract deeper insights.

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. In addition to capabilities such as a Data Catalog, automated schema discovery, automated code generation, and deduplication of data, AWS Glue is serverless, and you don’t have to provision resources while paying for what you use. AWS Glue recently released serverless streaming ETL, which makes it easy to set up continuous ingestion pipelines that stream data from various sources using Amazon Kinesis and Apache Kafka and load data to data lakes, data warehouses, and other data stores while cleaning and enriching the data as needed.

This post demonstrates how customers, system integrator (SI) partners, and developers can use the serverless streaming ETL capabilities of AWS Glue with Amazon Managed Streaming for Kafka (Amazon MSK) to stream data to a data warehouse such as Amazon Redshift. We also show you how to view Twitter streaming data on Amazon QuickSight via Amazon Redshift.

Background

Before AWS Glue streaming ETL, you had to stitch multiple components together. For example, to stream real-time data from a social media feed, you needed to use either Amazon MSK or Kinesis to load data, using a combination of AWS Lambda and Amazon Simple Storage Service (Amazon S3) with multiple staging buckets. With AWS Glue streaming ETL, you can now simplify your pipeline with reduced touchpoints that better allow you to focus on business outcomes rather than pipeline management.

For more information about how AWS Glue streaming ETL integrates with Amazon Kinesis, see New – Serverless Streaming ETL with AWS Glue.

The following architecture shows an end-to-end implementation of a streaming solution using Amazon MSK, AWS Glue streaming ETL, Amazon Redshift, and QuickSight.

To illustrate how to set up this architecture, we’ll walk you through the following steps:

  1. Deploying an AWS CloudFormation template to launch a three-node MSK cluster and a Kafka client with an instance of Apache NiFi running on it
  2. Creating a Kafka topic and accessing the Apache NiFi UI
  3. Configuring data streams from Twitter to Kafka using Apache NiFi
  4. Creating an Amazon Redshift cluster and a table to persist streaming data
  5. Creating Amazon MSK and Amazon Redshift tables on AWS Glue Data Catalog tables
  6. Authoring an AWS Glue streaming ETL job to load data to Amazon Redshift
  7. Visualizing data from Amazon Redshift in QuickSight
  8. Cleaning up your resources

By default, the CloudFormation template launches the cluster on kafka.m5.large nodes and the client instance on m5.2xlarge, but you may choose to configure it with the instance type appropriate for your use case.

Prerequisites

Make sure to complete the following steps as prerequisites:

Launching your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your stack in us-east-1:

  2. On the Quick create stack page, for Stack Name, enter Twitter-MSK-Glue-Redshift-Blog.
  3. For KeyName, choose KeyPair.
  4. For SSH location, enter your IP to log in to the Kafka client instance.

To find your IP, use checkip.amazonaws.com.

  1. Choose Create stack.

The stack creation can take up to 15 minutes to complete.

  1. When the stack creation is complete, on the Stack Outputs tab, record the values of the following:
    1. KafkaNiFiEC2Instance
    2. MSKSecurityGroupID
    3. PrivateSubnetOne
    4. RedshiftEndpoint

Creating a Kafka topic and accessing the NiFi UI

With the .pem key, you can now SSH to the NiFi node and do a local port forwarding to access the web interface on your local computer. We use Apache NiFi to easily configure and pull data from Twitter and publish it to Amazon MSK without dealing with coding. You may use any tool to poll data from Twitter and publish to Amazon MSK.

  1. To access the NiFi UI from your local system, use the following command to set up an SSH tunnel into the NiFi instance (replace KafkaNiFiEC2Instance with the information from the AWS CloudFormation output) running on port 8888:
ssh -i ~/Downloads/KeyPair.pem -L 8888:Localhost:8888 [email protected]: <KafkaNiFiEC2Instance> 

This command allows you to access NiFi via a browser running on your local system. Leave that terminal open to remain connected.

  1. To create a Kafka topic, enter the following code in the terminal (replace ZookeeperConnectString with your Amazon MSK cluster ZooKeeper URL):
/opt/kafka/bin/kafka-topics.sh --create --zookeeper <ZookeeperConnectString> --replication-factor 3 --partitions 1 --topic CovidTweets 

For instructions on finding your ZooKeeper URL, see Getting the Apache ZooKeeper Connection String for an Amazon MSK Cluster.

The following screenshot shows the resulting output.

  1. While the terminal is connected, launch your browser and use the following URL to access NiFi UI: http://localhost:8888/nifi/.

You should be able to view the NiFi cluster UI (see the following screenshot). You can see four processors already added on to the canvas.

NiFi supports user authentication via client certificates, username and password, Apache Knox, or OpenId Connect. For this post, we keep it open without security configuration, but make sure you have robust security in place for your NiFi instances when used for your own use cases. For more information, see Security Configuration.

Configuring data streams from Twitter to Amazon MSK using Apache NiFi

The following steps take you through connecting to Twitter and pulling data related to Twitter handles without coding.

  1. Choose the GetTwitter
  2. On the Properties tab, enter the following Twitter credentials:
    1. API key
    2. API secret key
    3. Access token
    4. Access token secret
  3. Choose Apply.

For security purposes, you can’t read the credentials entered. Additionally, in these configurations, you filter Tweets based on the term COVID19. You can add a comma-separated list if you want to customize these values.

  1. Choose the PublishKafka
  2. On the Properties tab, for Kafka Brokers, enter your comma-separated SSL Amazon MSK bootstrap URL (running on port 9094).

For instructions on finding the broker URL, see Getting the Bootstrap Brokers for an Amazon MSK cluster.

The Topic Name value is already set to CovidTweets, but you can change the topic name if you prefer a different name.

  1. Choose Apply.

You’re now ready to start the flow and stream data from the Twitter API into the MSK cluster. However, before you start streaming Twitter data, create the Data Catalog tables and author the AWS Glue streaming job.

Creating an Amazon Redshift cluster and target table

As part of the AWS CloudFormation deployment, you create a single-node Amazon Redshift cluster. To create the target table for storing relevant fields extracted from Tweets, connect to the cluster and complete the following steps:

  1. On the Amazon Redshift console, connect to the query editor.
  2. Enter the following credentials:
    1. Cluster – Choose the cluster with endpoint noted earlier
    2. Database namestreaming-data
    3. Database userawsuser
    4. Database passwordStr0ngPas$wd
  3. On the query editor page, enter the following DDL command to create a table named msk_tweets:
create table msk_tweets(created_at VARCHAR(max),id_str VARCHAR(100),text VARCHAR(max), source VARCHAR(max),user_location VARCHAR(1000),hashtags1 VARCHAR(1000),hashtags2 VARCHAR(1000), lang VARCHAR(max))

You need to modify these tables and fields per the use case.

Creating the Amazon MSK and Amazon Redshift Data Catalog tables in AWS Glue

This section walks you through creating your connections to Amazon MSK and Amazon Redshift, crawling Amazon Redshift, and creating Data Catalog tables to use as the target for the AWS Glue streaming ETL job.

Creating the Amazon MSK connection

To create your Amazon MSK connection, complete the following steps:

  1. On the AWS Glue console, choose Catalog.
  2. Choose Connection.
  3. Choose Add connection.
  4. On the Set up your connection’s properties page, for Connection name, enter MSK_Connection.
  5. For Connection type, choose Kafka.
  6. For Kafka bootstrap server URLs, enter your Amazon MSK SSL bootstrap URL running on port 9094.

For instructions on finding your broker URL, see Getting the Bootstrap Brokers for an Amazon MSK Cluster.

  1. Choose Next.

  1. On the Set up access to your data store page, for VPC, choose the VPC containing the name MSK-GLUE-VPC.
  2. For Subnet, choose the subnet containing the name MMPrivateSubnetOne.
  3. For Security groups, select the group with the prefix MSK-Glue-Redshift-MSKSecurityGroup.
  4. Choose Next.

  1. Review the information and choose Finish.

Creating the Amazon Redshift connection

You’re now ready to create the Amazon Redshift connection.

  1. On the AWS Glue Data Catalog Connection page, choose Add connection.
  2. For Connection name, enter Redshift_Connection.
  3. For Connection type, choose Amazon Redshift.
  4. Choose Next.
  5. On the next page, choose the cluster you created as part of the CloudFormation stack.
  6. Enter the following information:
    1. Database namestreaming-data
    2. Usernameawsuser
    3. Password – Str0ngPas$wd
  7. Choose Next.
  8. Review the details and choose Finish.

You can test the connection when creation is complete.

Crawling the database

You can now create an AWS Glue Data Catalog for your Amazon Redshift table by crawling the database using the connection you just created.

  1. On the AWS Glue Data Catalog Crawlers page, choose Add crawlers.
  2. For Crawler name, enter Redshift_Crawler.
  3. Choose Next.
  4. Choose Data stores.
  5. Chose Next.
  6. On the Add a data store page, for Choose a data store, choose JDBC.
  7. For Connection, choose Redshift_Connection.
  8. For Include path, enter streaming-data.
  9. Choose Next.

  1. On the page asking if you want to add additional data stores, choose No.
  2. Choose Next.
  3. On the Choose IAM role page, choose Glue_Service Role (you created this as part of the CloudFormation stack).
  4. Choose Next.
  5. For Frequency, choose Run on demand.
  6. Choose Next.
  7. On the next page, select an AWS Glue database and choose Next.

If you don’t have a database, choose Add database and create one.

  1. Review the information and choose Finish.

When you’re prompted to run the crawler, choose the prompt. It may take a few minutes for the crawler to finish, after which you can verify in the Data Catalog table section that you have a table called streaming_data_public_msk_tweets with Amazon Redshift classification.

Creating your table

You can now create a table for the Amazon MSK topic.

  1. On the Catalog page, choose Tables.
  2. Choose Add tables.
  3. Choose Add tables manually.
  4. For Table name, enter msk_covidtweets.
  5. Choose the database you created earlier.
  6. Choose Next.
  7. On the Add a data store page, for Select the type of source, select Kafka.
  8. For Topic name, enter CovidTweets.
  9. For Connection, enter MSK_Connection.
  10. Choose Next.

  1. On the next page, for Classification, choose JSON.
  2. Choose Next.
  3. On the Define schema page, choose Add column.
  4. Add the following commas, with Data type as string:
    1. id_str
    2. created_at
    3. source
    4. text
    5. location
    6. hashtags[0].text
    7. hashtags[1].text
    8. lang

  1. Choose Next.
  2. Review and choose Finish.

Authoring an AWS Glue streaming ETL job

In the following steps, you author a streaming ETL job.

  1. On the AWS Glue console, choose Jobs.
  2. Choose Add job.
  3. For Name, enter MSK-Glue-Redshift.
  4. For IAM role¸ choose Glue_Service_role.
  5. For Type, choose Spark Streaming.
  6. For This job runs, select A proposed script generated by AWS Glue.
  7. Leave other fields at their default.
  8. Choose Next.

  1. On the Choose a data source page, select msk_covidtweets.

  1. Choose Next.
  2. On the Choose a data target page, select streaming_data_public_msk_tweets.

  1. Choose Next.
  2. On the map source columns to target, verify that the columns are mapped correctly.
  3. Choose Save job and edit script.
  4. On the next page, verify that on the last line of the script, windowSize is set to 5 seconds.
  5. Choose Save.
  6. Choose Run job.

The AWS Glue streaming ETL job may take a few minutes to start running, after which the streaming from Amazon MSK starts.

While you’re waiting, you can start the NiFi Twitter flow to publish messages to Amazon MSK.

Starting the NiFi flow to stream Twitter data

To start your NiFi flow to stream data from Twitter and publish it to Amazon MSK, complete the following steps:

  1. Navigate back to the NiFi UI running on http://localhost:8888/nifi/.
  2. Choose the canvas (right-click) and choose Start.

After the NiFi flow starts, you can see that the Twitter data is flowing from GetTwitter and is pushed to the MSK cluster using the PublishKafka processor. When the publish is successful, the data is pending in the success queue and is truncated after 60 seconds.

After the flow begins, data is published to Amaon MSK. The AWS Glue streaming ETL job loads the data into the Amazon Redshift table msk_tweets. You may notice that the data is being queued up in the success connection of Publish_to_MSK, indicating that the data was successfully published to Amazon MSK.

Visualizing Twitter data from Amazon Redshift using QuickSight

This section reviews the steps to visualize the data from the Twitter feed.

  1. Create a new analysis in QuickSight.
  2. Create a new dataset with Amazon Redshift as the source.
  3. Choose msk_tweets as the Amazon Redshift table.
  4. Choose the Custom SQL
  5. Enter the following query:
select cast(created_at as timestamp) as create_timestamp, extract(minute from cast(created_at as timestamp)) as extracted_minutes,id_str,text,split_part(split_part(source,'>',2),'<',1) as formatted_source,user_location,hashtags1,hashtags2,lang from msk_tweets;
  1. Choose the Directly query your data option to query real-time data directly from the database.
  2. Choose Visualize.

You can select various visual types such as stacked area line chart, pie chart, hash cloud, and bar charts on QuickSight to build the following dashboard.

For instructions on building a QuickSight dashboard, see Tutorial: Create a Dashboard. For more information about improving dashboard performance, see Speed up your ELT and BI queries with Amazon Redshift materialized views.

Cleaning up

To clean up your resources, delete the AWS Glue database, tables, crawlers, and job, and service role.

Additionally, be sure to clean up all other AWS resources that you created using AWS CloudFormation. You can delete these resources on the AWS CloudFormation console or via the AWS Command Line Interface (AWS CLI) by deleting the stack named Twitter-MSK-Glue-Redshift-Blog.

Conclusion

In this post, we demonstrated a use case for building a serverless and cost-effective ETL pipeline for streaming, which allows you focus on the outcomes of your analytics. The CloudFormation template gives you an easy way to set up the process, which you can further modify to meet your specific use case needs. You can also modify your serverless AWS Glue ETL code with transformations and mappings to ensure that only valid data gets loaded to your data store. With this solution, you can use AWS Glue streaming as a mechanism to solve your streaming ETL use cases.

Please let us know if you have comments about this post!

 


About the Authors

Jobin George is a Sr. Partner Solutions Architect at AWS. He has more than a decade of experience with designing and implementing large scale Big Data and Analytics solutions. He provides technical guidance, design advice and thought leadership to some of the key AWS customers and Big Data partners.

 

 

 

 

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.

 

 

 

 

 

Dilip Rajan is a Partner Solutions Architect at AWS. His role is to help partners and customers design and build solutions at scale on AWS. Before AWS, he helped Amazon Fulfillment Operations migrate their Oracle Data Warehouse to Redshift while designing the next generation big data analytics platform using AWS technologies.

Vortexa delivers real-time insights on Amazon MSK with Lenses.io

Post Syndicated from Andrew Stevenson original https://aws.amazon.com/blogs/big-data/vortexa-delivers-real-time-insights-on-amazon-msk-with-lenses-io/

This is a guest post by Lenses.io. In their own words, “Lenses.io is a group of engineers, designers, developers, data analysts, tinkerers, writers, and open-source contributors. We specialize in the democratizing of data technologies. That’s why we developed Lenses.io (Lenses-eye-oh) to help enterprises simplify their operations by making data work for teams, not the other way around.”

This post discusses how Vortexa harnesses the power of Apache Kafka to improve real-time data accuracy and accelerate time-to-market by using a combination of Lenses.io for greater observability and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to create clusters on demand.

Vortexa provides a real-time analytics platform for seaborne oil supply, which enables energy market players to make smarter trading and shipping decisions. Their service needs to process tens of millions of data points per day, and instant insights on the slightest changes in ship movements are crucial to their customers.

That’s why Vortexa’s data analytics team relies on powerful data streaming technologies such as Apache Kafka to make their real-time applications work.

However, as they scaled their self-managed Apache Kafka environment, they saw several challenges emerge:

  • Debugging incidents took days. Investigation of incidents involved a series of in-house developed scripts and open-source tools.
  • The smallest mistake could bring down a whole cluster. The team ran the risk of downtime due to difficulties in manual configuration and a nebulous command line.
  • Slowdowns affected production. This meant more time spent on everyday tasks, such as assessing the data quality, and less time shipping new features to market.
  • Specialist skills were difficult to come by. Having a specialist in-house for every data platform permutation—like someone who understood how to both build and operate Apache Kafka—wasn’t wise or sustainable.

And so Fridays for Vortexa became “Kafka Fridays.” The fragilities and complexities in an all-important Apache Kafka infrastructure cost the team their Fridays and often their weekends, and also ate into their release schedules.

Self-healing and observability save the day

To address this, the Vortexa team evaluated several managed solutions, and found that Amazon MSK best met their requirements and budget. Amazon MSK continuously monitors cluster performance, is self-healing, and automatically replaces unhealthy nodes without downtime.

Already users of multiple other AWS services, Vortexa benefited from Amazon MSK integrating with other AWS services such as Amazon Virtual Private Cloud (Amazon VPC), AWS Key Management Service (AWS KMS), and AWS Identity and Access Management (IAM). Amazon MSK can also integrate with Amazon CloudWatch, AWS CloudTrail, and Lenses.io for more effective, integrated monitoring and logging of issues.

However, Apache Kafka was still a black box when it came to understanding their deployed flows. Vortexa uses Lenses.io as their data operations portal, which gives them Apache Kafka monitoring, data observability and governance control for their real-time applications that run across their Amazon MSK and Kubernetes platform. With a clear UI, they can troubleshoot a problem on a streaming application within a few minutes compared to hours taken before, which led to an improvement of at least 30-fold.

“Lenses.io gives us a complete vision and extra management capabilities for our streaming applications in Apache Kafka clusters,” says Nguyen Lam Phuc, Senior DevOps Engineer at Vortexa. “Testing as well as operations suddenly became painless and felt much more intuitive than ever before.”

Previously, the team also needed to manually run commands, add connectors involving trial and error, and implement complex deployment procedures—and cross their fingers that this would work.

“In our four-year history, we’ve had at least five different ways of operating, and four different ways of monitoring Apache Kafka, says Jakub Korzeniowski, Head of Data Services at Vortexa. “The combination of Lenses.io and Amazon MSK has been the only solution that allows us to focus on business logic instead of concentrating the majority of our efforts on just meeting SLAs.”

Now, through a single dashboard, Vortexa can use Lenses.io to view their application topology and expose consumer lag metrics that they can visualize and alert in Lenses.io before forwarding to third-party tools such as Slack and PagerDuty.

Open monitoring with Amazon MSK and Lenses.io

You can monitor the health of the Amazon MSK infrastructure within external tools using the AWS open monitoring framework. Vortexa uses open monitoring with Prometheus to access JMX metrics that Amazon MSK brokers generate. The following diagram illustrates this architecture.

Value in data quality and release velocity for Vortexa

Maksym Schipka, Vortexa’s CTO, explains how Amazon MSK and Lenses.io saved 15% of working hours in Apache Kafka management: “At least twice a week, we would experience issues with Kafka. We were spending so long trying to understand the issue that it was more efficient for us to replace the cluster with a completely new one.

“Amazon MSK and Lenses.io have been pivotal technologies for Vortexa, enabling us to shift significant efforts from maintaining and stabilizing a complex and fragile Kafka infrastructure to focusing more on the quality of analytics and market insights that directly impact the value we deliver to our customers.”

“We can now confidently and frequently update our applications at scale, run complex Kafka streams topologies with ease, and debug applications instead of infrastructure. Our revenue and market share are directly impacted by the speed at which we deliver accurate data to the market, and Amazon MSK and Lenses.io are key enablers of that.”

Fast-tracking to production in minutes, not days

Vortexa can now build new data pipelines that integrate data across their different technologies, including Amazon Elasticsearch Service and Redis, and deploy into production in minutes.

Everyday tasks, such as inspecting messages on a stream with SQL, understanding the profile of a topic, or verifying the replication factor are now easy and performed in under 10 seconds, without the need to have specific in-house Apache Kafka expertise.

“Amazon MSK with Lenses SQL is one step towards making Kafka feel like any other database,” says Romain Guion, Head of Signal Processing and Enrichment at Vortexa. “It allows us to verify the outputs of our models well, which recently resulted in a 90% decrease in AIS data affected by signal noise and spoofing.”

The Vortexa team sees a real-time topological view of their entire data landscape, including their flows to Elasticsearch and Redis and inside their Apache Kafka streams applications.

Conclusion

Using Amazon MSK and Lenses.io helped Vortexa to:

  • Save 15% of working hours on Apache Kafka management
  • Deploy new Amazon MSK clusters into production in minutes, not days
  • Increase Apache Kafka users (both operators and application developers) by 200%
  • Reduce automatic identification system signal noise by 90%, aided by quality assurance done exclusively in Lenses.io

To learn how to get greater visibility into your data streams and streaming platform, visit Lenses.io. For information on how to build and run production applications on Apache Kafka without needing Apache Kafka infrastructure management expertise, visit Amazon Managed Streaming for Apache Kafka.

 


About the author

Andrew Stevenson is the Chief Technology Officer and co-founder of Lenses.io. He leads Lenses.io’s world-class engineering team and technical strategy.

 

 

Best practices from Delhivery on migrating from Apache Kafka to Amazon MSK

Post Syndicated from Akash Deep Verma original https://aws.amazon.com/blogs/big-data/best-practices-from-delhivery-on-migrating-from-apache-kafka-to-amazon-msk/

This is a guest post by Delhivery.

In this post, we describe the steps Delhivery took to migrate from self-managed Apache Kafka running on Amazon Elastic Compute Cloud (Amazon EC2) to Amazon Managed Streaming for Apache Kafka (Amazon MSK). “We’ve been in production for over a year now,” said Akash Deep Verma, Senior Technical Architect, at Delhivery. “We have over 350+ applications running on Amazon MSK that are producing and consuming data every second of every day. Overall, it’s a happy experience to be working with Amazon MSK!”

Delhivery is India’s leading fulfilment platform for digital commerce. With its nationwide network extending beyond 18,000 pin codes and 2,500 cities, the company provides a full suite of logistics services like express parcel transportation, LTL and FTL freight, reverse logistics, cross-border, B2B and B2C warehousing, and technology services.

“Our vision is to become the operating system for commerce in India, through a combination of world-class infrastructure, logistics operations of the highest quality, and cutting-edge engineering and technology capabilities,” Verma says. “Our team has successfully fulfilled over 650 million orders to more than 120 million households across India. We operate 24 automated sort centers, 75 fulfilment centers, 70 hubs, over 2,500+ direct delivery centers, over 8,000+ partner centers, and more than 14,000+ vehicles. Over 40,000 team members make it possible to deliver a million packages a day, 24 hours a day, 7 days a week, 365 days a year.”

Self-managing Apache Kafka was difficult

We process close to 1 TB of data per day to serve various analytical functions. The data comes from shipment tracking, order tracking, GPS, biometrics, handheld devices, sorters, weights, clients, and facilities. It moves through various systems and services using several real-time and batch pipelines. The data is processed and enriched to serve both business and technical use cases. Due to the nature of our business, incoming messages and events on Apache Kafka consisted of a steady pace with intermittent spikes, and ranged from 10,000–12,000 messages coming in per second and 50,000–55,000 messages going out per second.

Apache Kafka serves as the critical messaging and events backbone to these dependent systems and services.

We were self-managing Apache Kafka brokers and its associated components, like Apache ZooKeeper, on Amazon Elastic Compute Cloud (Amazon EC2) instances.

With the growth in our business, managing these components and ensuring uptime became a significant resource-intensive operation. We had to allocate two developers on a constant basis to manage our Apache Kafka infrastructure and maintain uptime. This undifferentiated heavy lifting caused productivity loss because the developers couldn’t contribute effectively towards business feature development.

“We wanted a managed Apache Kafka service to reduce the time and resources we used for infrastructure management,” says Verma. “This would enable us to reprioritize our technical team to focus on feature development that added more business value.”

Getting time back by migrating to Amazon MSK

We looked at several options to replace our self-hosted Apache Kafka on EC2 instances, and chose Amazon MSK. With Amazon MSK, we could continue to use native Apache Kafka APIs and run our existing applications on AWS without changing the code. It provisions, configures, and maintains Apache Kafka clusters and Apache ZooKeeper nodes for us. This enables us to shift developers from managing infrastructure to writing more creative applications for our business.

On advice from the AWS team, we took the following steps:

  1. Size the MSK cluster
  2. Migrate individual Apache Kafka topics to Amazon MSK
  3. Monitor on Amazon MSK

Sizing the MSK cluster

To properly size our MSK cluster, we needed to understand our existing workload dynamics. We retrieved the following metrics from our existing Amazon EC2-based Apache Kafka clusters:

  • Ingestion rate from producers – We considered the broker level metric BytesInPerSec, chose the average value for individual broker, and aggregated the value across all brokers present in the cluster to estimate the net ingestion rate (not to be confused with ReplicationBytesInPerSec, which gives ingestion rate from other brokers).
  • Consumption rate from consumers – We looked at the broker level metric BytesOutPerSec, chose the average value for individual broker, and aggregated the value across all brokers present in the cluster to estimate net consumption rate for the cluster (not to be confused with ReplicationBytesOutPerSec which gives consumption rate to other brokers).
  • Data replication strategy – We determined the highest value of replication factor, evaluated between cluster global parameter default.replication.factor, and that specified for individual topics.
  • Data retention strategy and target percentage of disk utilization – We considered the highest value of data retention evaluated between cluster global parameter log.retention.hours and that specified for individual topics. We also specified the percentage of used storage and estimated the headroom that we needed to be comfortable with our use case.

AWS provided us with an Amazon MSK Sizing and Pricing spreadsheet to help us estimate the number of brokers that we needed in our MSK cluster. We subsequently performed POCs in our environment and found the suggested cluster sizing from the spreadsheet accurate. The spreadsheet also helped us to easily estimate the cluster pricing well in advance. For more information, see Numbers of brokers per cluster.

Migrating individual Apache Kafka topics to Amazon MSK

We considered several options to migrate individual topics to Amazon MSK:

  • MirrorMaker 1.0, which ships with Apache Kafka and is a standalone tool that can move data from self-managed Apache Kafka clusters to Amazon MSK with minimal downtime.
  • Using consumers to read data from self-managed Apache Kafka clusters and write to Amazon MSK. This migration approach required some application downtime.
  • Other replication tools that we had experience with.

We used a combination of the first two approaches for our migration. For time-critical topics, we used MirrorMaker 1.0 to migrate data to Amazon MSK. For non-time-critical topics, our internal SLAs allowed us to perform a redirection of application traffic from our self-managed Apache Kafka clusters to Amazon MSK.

The MirrorMaker option involved setting up a MirrorMaker 1.0 daemon on a self-managed EC2 instance to consume messages from the source cluster, and republish them to the target MSK cluster. Each MirrorMaker 1.0 thread gets equipped with a separate consumer instance and shares one common producer. The process is as follows:

  1. The MirrorMaker 1.0 instance spawns a consumer process that interacts with the Apache ZooKeeper ensemble supporting the source Apache Kafka cluster for topic discovery.
  2. The consumer process reads payload from the concerned topic.
  3. MirrorMaker 1.0 spawns a producer process that interacts with the managed Apache ZooKeeper fleet of Amazon MSK via the Apache ZooKeeper endpoint.
  4. The producer process relays the payload retrieved by the consumer process to the respective topic of Amazon MSK via the broker endpoint

The following diagram shows our migration topology.

The MSK cluster creation process requires subnet IDs as input so that the broker and Apache ZooKeeper nodes can map to the customer VPC. This mapping is acheived by creating ENIs within these subnet IDs with a primary private IPv4 address. The broker and Apache ZooKeeper endpoints attached with the MSK cluster actually resolve to these private IPv4 addresses.

We used the following command to mirror all topics from the Amazon EC2-based source cluster to Amazon MSK:

kafka-mirror-maker.sh 
--consumer.config config/mirrormaker-consumer.properties 
--producer.config config/mirrormaker-producer.properties 
--whitelist '*'

The command contains the following details:

With acceptance of KIP-382, Amazon MSK can now support MirrorMaker 2.0 and benefit from MirrorMaker 2.0 advantages. For instructions, configuration files, sample code, and labs using MirrorMaker 2.0 to migrate a self-managed Apache Kafka cluster to an MSK cluster, see the MirrorMaker2 on Amazon EC2 workshop.

You can migrate an existing Apache Kafka cluster to Amazon MSK using Amazon Kinesis Data Analytics, a fully managed service for Apache Flink. This enables you to use fully managed Apache Flink applications to process streaming data stored in Amazon MSK. For more information about using Amazon Kinesis Data Analytics with Amazon MSK, see Tutorial: Using a Kinesis Data Analytics application to Replicate Data from One MSK Cluster to Another in a VPC and the Clickstream Lab.

At a steady state, our MSK cluster in production uses the following configuration:

  • Broker nodes – 6 x m5.4xlarge
  • Replication factor – 3
  • Number of producers – 110+
  • Number of consumers – 300+
  • Topics – 500+
  • Kstreams running on broker – 55+

Monitoring on Amazon MSK

Amazon MSK provides Amazon CloudWatch metrics under three levels of granularity:

  • DEFAULT
  • PER_BROKER
  • PER_TOPIC_PER_BROKER

For our use case, we used the highest granularity PER_TOPIC_PER_BROKER for monitoring with optimum visibility. To automate the detection of operational issues, we developed custom CloudWatch alarms with the following metrics.

Metric Name

Alarm Trigger

Significance

UnderReplicatedPartitionsWhen not 0If partition replicas fall too far behind their leaders, the follower partition is removed from the ISR pool. A corresponding increase in the IsrShrinksPerSec metric is also expected.
OfflinePartitionsCountWhen not 0Defines the number of partitions without an active leader. Any partition without an active leader is completely inaccessible, and both consumers and producers of that partition are blocked until a leader becomes available.
IsrShrinksPerSecWhen increases without an increase in IsrExpandsPerSecA replica could be removed from the ISR pool for a couple of reasons:

  • The replica is too far behind the leader’s offset.
  • The replica hasn’t contacted the leader for some time (configurable with the replica.socket.timeout.ms).
  • More brokers are being added to the cluster (IsrExpandsPerSec should increase as well).
  • Partitions are removed (IsrExpandsPerSec should increase as well).
UncleanLeaderElectionsPerSecWhen not 0An unclean leader election is a special case in which no available replicas are in sync. Because each topic must have a leader, an election is held among the out-of-sync replicas and a leader is chosen—meaning any messages that weren’t synced prior to the loss of the former leader are lost forever.
ActiveControllerCountWhen not 1The controller in an Apache Kafka cluster is responsible for maintaining the list of partition leaders and coordinating leadership transitions (in the event a partition leader becomes unavailable).
KafkaDataLogsDiskUsedSet an alarm when the disk utilization exceeds 85%, and add storage either via API or using the console such that disk utilization drops to 70% or belowPercentage of space utilized in the EBS volume per broker.

Amazon MSK also supports metrics exposed via port 11001 (for the JMX Exporter) and port 11002 (for the Node Exporter) to be captured using monitoring tools such as Prometheus and a variety of third-party tools compatible with Prometheus-formatted metrics like Datadog, New Relic, and SumoLogic. For more information, see Open Monitoring with Prometheus. For instructions on configuring open monitoring, see the Open Monitoring lab.

Conclusion

Self-managed Apache Kafka brokers meant that we had to make sure Apache ZooKeeper always maintained quorum, the network connectivity between brokers was monitored, and different auxiliary Apache Kafka processes like LogCleaner were monitored. All these parts failed at some point, so we were constantly fixing issues with our infrastructure and monitoring. The amount of time that we spent taking care of Apache Kafka could have been better utilized delivering actual business value.

Amazon MSK enabled us to be more productive by reducing the time we spent maintaining our infrastructure, finding and fixing issues, and maintaining our brokers. It takes care of Apache Kafka maintenance in the background, gives us the level of monitoring that we needed, and frees our team to take on more activities to improve our applications and provide value to our customers.

 


About the Authors

Akash Deep Verma is a Senior Technical Architect at Delhivery. Akash joined Delhivery in 2016 and worked on many projects related to Big Data and Analytics. Most recently, he led architecture initiatives related to Democratization of Data. In his free time, he enjoys playing table tennis and watching mind-boggling movies.

 

 

 

Dipta S. Bhattacharya is an Enterprise Solutions Architect at AWS. Dipta joined AWS in 2018. He works with large startup customers to design and develop architectures on AWS and support their journey on the cloud.

 

 

 

 

Nikhil Khokhar is a Solutions Architect at AWS. He joined AWS in 2016 and specializes in building and supporting data streaming solutions that help customers analyze and get value out of their data. In his free time, he makes use of his 3D printing skills to solve everyday problems.

Streaming web content with a log-based architecture with Amazon MSK

Post Syndicated from Sebastian Doell original https://aws.amazon.com/blogs/big-data/streaming-web-content-with-a-log-based-architecture-with-amazon-msk/

Content such as breaking news or sports scores require updates in near-real-time. To stay up to date, you may be constantly refreshing your browser or mobile app. Building APIs to deliver this content at speed and scale can be challenging. In this post, I present an alternative to an API-based approach. I outline the concept and virtues of a log-based architecture, a software architecture that uses a commit log to capture data changes to easily build new services and databases on other services’ full datasets. These data changes can also be content for the web. The architecture enables you to stream this content in real time. It’s simple and easy to scale.

The following video clip shows you an example of this architecture in action.

In this post, I show you how you can use Amazon Managed Streaming for Apache Kafka (Amazon MSK) to build a log-based architecture, and the other technologies you need to stream content on the web. I also show you an example microblogging service that puts everything into action. For more information, see the GitHub repo. It contains a web application, the backend service that runs on AWS Fargate on Amazon Elastic Container Service (Amazon ECS), and the AWS CloudFormation templates to create the infrastructure on the AWS Cloud. You can walk through running the example on AWS Cloud9 or your local machine.

Benefits of a push model

Most websites and mobile applications distribute content by pulling it from an API. The client has to pull any new content or updates to the content by making a new request on the backend. A common behavior is a browser window refresh, or a pull-to-refresh on a mobile app. Another behavior is to poll the server for new content in defined intervals. This is known as the pull model.

When clients use this model, they create a new request to the server every time they update the content. Every request creates stress on your application. You need to check for updates in a database or cache and send data to the client. This consumes CPU and memory, beyond the eventual need to create new connections to your services.

A different approach is to update the client from the server side, known as the push model. The server pushes new content or updates to the client. It’s an asynchronous communication between the server and the client. The following diagram illustrates the architecture for publish/subscribe (pub/sub) messaging, a common pattern for this asynchronous communication.

In the pub/sub pattern, new or updated content is an event. It originates from the publisher of the event and gets distributed to the subscriber. This pattern is used with the reader and the pub/sub service, whereby the reader subscribes to the service and waits for the service to publish new articles. The pub/sub service subscribes to the article log to consume the articles that the editor published to it. A reader that is subscribed to the pub/sub services is only required to keep the connection to the service open and wait for new articles to flow in. In the example of the microblogging service, the reader uses a simple React app to read the articles. The web application keeps the connection to the backend service open, waits for new articles to be published and updates the displayed articles on publication.

When a new article is published to the log, the article is stored as a message. A message is a key-value pair of which the log keeps the order in which the messages are stored. For the example use case, the message stored here is an article, but it can be any kind of data as they are stored as serialized bytes. The most common formats are plain text, JSON, Protobuf, and Avro. The pub/sub service consumes the messages as they flow in and publishes them to connected clients. Again, this can be a web application in a browser or a mobile application on iOS or Android. In the example it is a React app in the browser. The subscribers receive the new articles without the need to pull for the content.

This behavior of pushing the content to the clients is called content streaming, because the client waits to read from a stream of messages. These messages contain the published content. It’s similar to video streaming, in which videos are continuously delivered in pieces to the viewer.

The virtues of log-based architectures

A common approach to give consumers access to your content is building APIs. For a long time, these have been RESTful APIs; more recently, GraphQL-based APIs have gained momentum. However, API-based architectures have several issues:

  • Many content-producers define their own schema for representing the data. The names and fields vary between them, especially as the number of endpoints increase and the API evolves.
  • The endpoints made available differ in behavior. They use different semantics and have different request parameters.
  • Many APIs don’t include a feature to notify clients about new content or updates. They need additional notification services or extensive polling mechanisms to offer this behavior.

This post’s approach is to use a log-based architecture to stream content on the web and mobile. The idea is to use a log as generic mechanism to store and distribute content. For more information, see Turning the database inside-out with Apache Samza and Designing Data-Intensive Applications.

Although many different log technologies may exist, Apache Kafka has become an industry standard, and has created a rich ecosystem. Amazon MSK became generally available last year, and is a fully managed service to build applications that use Apache Kafka. The sample microblogging service you run in this post uses Amazon MSK. It appends the published posts to the log in a partition of a topic. Partitions allow you to parallelize the orderly processing of messages in a topic by splitting the data in a topic. The topic in the microblogging example has only one partition because the global order of articles should be guaranteed. With multiple partitions, only the order in a partition is guaranteed. The pub/sub service in the example consumes the articles by reading the log in chronological order and publishing them in that order to the subscribers.

Using a log to store the articles has advantages compared to a traditional database system. Firstly, they are schema-less. They store simple key-value pairs with some additional metadata. The value can be any kind of binary-encoded data, which means that the producer and consumer are responsible to serialize and de-serialize the stored value. Therefore it’s easy to change the data stored over time without any downtime.

A log is also easy to back up and restore. You can consume and store the single messages in the log as objects to Amazon Simple Storage Service (Amazon S3) and restore the messages by republishing them to a new log. The replay of the messages happens from the object store, which is cost-effective and secure, and uses the capabilities of Amazon S3 like object versioning and lifecycle management.

Replication is also much easier compared to traditional database systems. Because the log is in chronological order, replicas are also always in order, and it’s easy to determine if they are in sync or not.

Furthermore, building derived stores (for example, the latest articles) is much easier this way. The log represents everything needed to build up those stores, whereby databases represent the latest state. Log-based architectures are inherently consistent.

A log is an ordered representation of all events that happened to the system. The events themselves are the changes to the system. This can be a new article or an updated article. The log is used to create materialized views. This can be a NoSQL database like Amazon DynamoDB, which drives a GraphQL API via AWS AppSync, or any other specific view on the data. The data in a log can materialize in any kind of view because the needed state can always be recreated by replaying the messages in the log. These views are used to eventually consume the data. Every service can have its own data store and view of the data. They can expose as much or as little of the data as they need for their view on it. The databases of these services can be more purposeful and are easier to maintain. They can use DynamoDB in one instance or Amazon Aurora in another. The example of the microblogging service, however, doesn’t use a materialized view to show the posts. It publishes and consumes directly to and from the log.

Log-based architectures are appealing to content providers because there is no distinction between accessing current and future data. Consumers are always replaying the data; they get current and all future data from where they start to read the log. The current position of the consumer is the offset. It’s a simple integer number that points to the last record the consumer read from the log. The number of messages between the offset and the latest message in the log is the lag. When a consumer starts to read messages from the offset, everything from this point on fuses to a message stream. If this is combined with a protocol that provides the capability to push data to the browser, you can stream these messages to the web.

Streaming content with gRPC

gRPC is a high-performance RPC framework that can run in any environment. It’s often used to connect services in distributed backend systems, but is also applicable to connecting devices, mobile applications, and browsers in the last mile. An RPC framework allows applications to call a function in a remote process. gRPC uses protocol buffers to define a service and its remote calls. It’s a powerful binary serialization tool and language. Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data and sending it over the wire. You define the structure of your data and use source code generation to create the code to easily read and write data in your data structures. You don’t need to write object model wrappers in your services or on the client, and it supports many popular programming languages.

The microblogging example for this post uses the Go support for the backend service, and grpc-web in the React app, which is the client for the service. The code for both languages is created from one Protobuf definition. If you’re developing a mobile application, there is support for Swift and Android. The elimination of object model wrappers to unwrap JSON into an object is a big advantage.

The most important feature to streaming content is bidirectional streaming with HTTP/2 based transport. The example uses a server-side streaming RPC to list the published articles. The client sends a request to the server and gets a stream to read the sequence of articles. Because the connection is kept alive, the server continues to push all future articles. gRPC guarantees message ordering within an individual RPC call, which is important for our example, because the articles are directly consumed from the log, and the articles in the log are in chronological order.

Microblogging example service

Learning about the virtues of log-based architectures is one thing; building a service on top of this pattern is another. This post provides a complementary microblogging service that uses the pattern and gRPC to exemplify everything that you learn. For more information, see the GitHub repo.

The service has two main components:

  • A simple React app that connects to the backend service via a gRPC to list all published articles and create new articles
  • A backend that implements the gRPC service that the app calls

The React app publishes new articles to a topic in Amazon MSK and lists these articles as they are appended to the log. Listing the articles is a call to the ListArticles remote procedure, which subscribes to the topic of articles and reads the log from the beginning. They are pushed to the web application as they are read from the log.

The GitHub repo also contains the CloudFormation templates to create the needed infrastructure and deploy the services. The following diagram illustrates the architecture of the services and infrastructure.

The service uses Amazon Elastic Container Registry (Amazon ECR) to store the Docker images for Fargate. The pub/sub service uses Amazon MSK for its commit log. The service is discovered via AWS Cloud Map. Clients connect to the service via an Application Load Balancer. You use this load balancer because there can be a longer idle timeout on the connection, so that data can be pushed from the service to the client without managing the connection in the client.

Running on AWS Cloud9

You can deploy and run the service from your development machine. If you want to try it out and experiment with it, you can also run it using AWS Cloud9. AWS Cloud9 is a cloud-based integrated development environment (IDE) that lets you write, run, and debug your code with just a browser.

The only prerequisite for completing this walkthrough is an AWS account. For instructions on creating one, see How do I create and activate a new AWS account?

Creating the environment and running the example consumes AWS resources. Make sure you remove all resources when you’re finished to avoid ongoing charges to your AWS account.

Creating the AWS Cloud9 environment

To create your AWS Cloud9 environment, complete the following steps:

  1. On the AWS Cloud9 console, choose Create environment.
  2. Name the environment mskworkshop.
  3. Choose Next step.
  4. For Instance type, choose small.
  5. Accept all default values and choose Next Step.
  6. On the summary page, review your inputs and choose Create environment.

AWS Cloud9 provides a default auto-hibernation setting of 30 minutes for your Amazon Elastic Compute Cloud (Amazon EC2) instances created through it. With this setting, your EC2 instances automatically stop 30 minutes after you close the IDE. They only start again when you reopen the environment.

  1. When your environment is ready, close the Welcome
  2. From the remaining tab, choose New Terminal.

Your environment should look like the following screenshot.

For more information, see Working with Environments in AWS Cloud9.

Preparing the AWS Cloud9 environment

To proceed with the walkthrough, you have to install the most current version of the AWS Command Line Interface (AWS CLI), install the needed tools, and resize the AWS Cloud9 environment.

  1. To view your current version of AWS CLI, enter the following code:

Bash $ > aws –version

  1. To update to the latest version, enter the following code:

Bash $ > pip install –user –upgrade awscli

You can ignore warnings about the outdated pip version and check the installed AWS CLI. You need the jq command installed. It’s a lightweight and flexible command-line JSON processor that the example scripts use.

  1. Install the tool with the following code:

Bash $ > sudo yum install -y jq

The client runs in the Cloud9 environment. It needs a current version of Node.js and the Yarn package manager to be installed.

  1. To manage the installed Node.js, use the Node Version Manager (nvm). See the following code:

Bash $ > curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash

  1. Activate the version manager in your environment with the following code:

Bash $ > . ~/.nvm/nvm.sh

  1. Use nvm to install the current version of Node.js:

Bash $ > nvm install node

  1. Use the npm to install the latest version of the Yarn package manager:

Bash $ > npm install yarn -g

You have finished preparing your AWS Cloud9 environment. Next, you clone the example repository and set up the example.

Cloning and setting up the example repository

You first need to change into the folder you want to clone the repository to. See the following code:

Bash $ > cd ~/environment

Clone the repository and enter the folder with the source code:

Bash $ > git clone https://github.com/aws-samples/aws-msk-content-streaming aws-msk-content-streaming && cd $_

You have successfully cloned the source code in your Cloud9 environment.

Resizing the environment

By default, AWS Cloud9 has 8 GB of storage attached. The example service needs various Docker containers that the provided scripts build. They consume more than default storage. Therefore, you have to resize the size of the attached storage. See the following code:

Bash $ > make resize

This resizes the attached storage to 20 GB, which is sufficient for the microblogging service. If you encounter an error that the /dev/nvme0n1 device doesn’t exist, you’re not running on a Nitro-based architecture. For instructions on replacing the devices, see Moving an environment or resizing an Amazon EBS volume.

The micoblogging service contains the option of a bastion host, which is a special-purpose computer on a network specifically designed to securely access resources in this network. You can access this host via SSH.

Creating an SSH key

To generate an SSH key, enter the following code (if needed, you can use this key to access Amazon MSK and your Fargate containers):

Bash $ > ssh-keygen

Choose Enter three times to take the default choices. Next, upload the public key to your Amazon EC2 Region with the following code:

Bash $ > aws ec2 import-key-pair --key-name ${C9_PROJECT} --public-key-material file://~/.ssh/id_rsa.pub

Set the key as the KEY_PAIR environment variable for the scripts to use. You’re now ready to deploy the example application to your AWS account.

Deploying the application

Before you can run the web application, you have to deploy the needed services to your AWS account. The deployment scripts create two CloudFormation stacks. One stack with the core services VPC, NAT Gateway, Internet Gateway, and Amazon MSK. The other stack is the application stack, with the Docker containers and Application Load Balancer.

To deploy these stacks, enter the following code:

Bash $ > make deploy

The deployment process may take some time. When the deployment process is complete, you can start the web application. It starts a local development server that is also exposed to the public. See the following code:

Bash $ > make start

The build process is finished when you see a Compiled successfully! message and the URLs to the development server. You access the preview to the web application by choosing Preview, Preview Running Application in the toolbar. This opens a split view with a window and the web application.

Unfortunately, you can’t post or read any content in this preview because you didn’t configure a custom domain for the Application Load Balancer, and therefore the deployed service is only accessible via HTTP. However, AWS Cloud9 is a secure environment and expects content to be served via HTTPS.

To make the example work, you can either copy the full URL from the preview (https://12345678910.vfs.cloud9.eu-west-1.amazonaws.com/) or choose the Pop Out Into New Window icon next to the browser bar.

In the URL, replace https with http. You can now access the service with HTTP.

You can test the example by creating a new item. Give it a title and add some content. When you’re finished, choose Create Post. If you see an error because of a connection problem, refresh your browser.

Cleaning up

When you’re finished exploring the example and discovering the deployed resources, the last step is to clean up your account. The following code deletes all the resources you created:

Bash $ > make delete

Running on your machine

If you want to run the example on your local machine, you need to install the required tools:

  • Docker
  • AWS CLI
  • js and Yarn package manager
  • Linux userland with bash
  • GNU Make

Installing them enables you to build the needed Docker containers for the pub/sub service, create the needed infrastructure and deploy the containers, and run the React app to create and read posts. For more information, see the GitHub repo.

To run the infrastructure and access the example, you also clone the repository to your machine and enter the folder. See the following code:

bash $ > git clone https://github.com/aws-samples/aws-msk-content-streaming aws-msk-content-streaming && cd $_

In the next step, you build the backend service, the envoy proxy for the gRPC calls, and the needed infrastructure for the service. The envoy proxy is needed to bridge the gRPC-Web client in the web app to the gRPC server in the backend service. The calls from the web app are text-encoded, while the backend service uses the binary protobuf format. You have to set the following environment variables for the deployment to work:

bash $ > export PROJECT_NAME=<YOUR_PROJECT_NAME>

bash $ > export AWS_ACCOUNT_ID=<YOUR_ACCOUNT_ID>

bash $ > export AWS_DEFAULT_REGION=<YOUR_AWS_REGION>

bash $ > export KEY_PAIR=<YOUR_AWS_EC2_KEY_PAIR>

Replace the <> with your details, the name of the project, the account you want to deploy the infrastructure to, and the Region you want to deploy the stack to.

To deploy the same CloudFormation stacks as in the AWS Cloud9 environment, enter the following code:

bash $ > make deploy

To start a development webserver at localhost:3030 and open a browser window with this URL in your default browser, enter the following code:

bash $ > make start

The client is configured with the environment variable REACT_APP_ENDPOINT to the URL of the Application Load Balancer.

Create your articles

You can now create a new article with a title and content and publish it to the log. The list of articles should then automatically update as it’s pushed the new article. You can also test this by duplicating the tab and creating a new article in the new tab.

The following diagram illustrates the behavior of the solution from the client perspective:

The remote call to the pub/sub service subscribes the client to the list of articles in the article log. A unidirectional gRPC stream is created. The pub/sub services push all available articles and all new articles to the client. The envoy proxy filters the grpc-web calls, which are in text format, and translates them into the binary gRPC calls for the pub/sub service. The insert of an article is an additional unary gRPC call to the pub/sub service.

Summary

This post discussed log-based architectures and how you can use them to stream content on the web. We talked about the virtues of gRPC to stream the content to the browser or to mobile devices. Furthermore, you experimented with a log-based architecture with a microblogging service built on Amazon MSK. You also saw how to deploy the needed infrastructure and run it with the sample code.

You can use the principle idea and provided example and build your own solution. Please share what you built or your questions regarding running log-based architectures on the AWS Cloud.

 


About the Author

Sebastian Doell is a Startup Solutions Architect at AWS. He helps startups execute on their ideas at speed and at scale. Sebastian also maintains a number of open-source projects and is an advocate of Dart and Flutter.

 

How Goldman Sachs builds cross-account connectivity to their Amazon MSK clusters with AWS PrivateLink

Post Syndicated from Robert L. Cossin original https://aws.amazon.com/blogs/big-data/how-goldman-sachs-builds-cross-account-connectivity-to-their-amazon-msk-clusters-with-aws-privatelink/

This guest post presents patterns for accessing an Amazon Managed Streaming for Apache Kafka cluster across your AWS account or Amazon Virtual Private Cloud (Amazon VPC) boundaries using AWS PrivateLink. In addition, the post discusses the pattern that the Transaction Banking team at Goldman Sachs (TxB) chose for their cross-account access, the reasons behind their decision, and how TxB satisfies its security requirements with Amazon MSK. Using Goldman Sachs’s implementation as a use case, this post aims to provide you with general guidance that you can use when implementing an Amazon MSK environment.

Overview

Amazon MSK is a fully managed service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. When you create an MSK cluster, the cluster resources are available to participants within the same Amazon VPC. This allows you to launch the cluster within specific subnets of the VPC, associate it with security groups, and attach IP addresses from your VPC’s address space through elastic network interfaces (ENIs). Network traffic between clients and the cluster stays within the AWS network, with internet access to the cluster not possible by default.

You may need to allow clients access to an MSK cluster in a different VPC within the same or a different AWS account. You have options such as VPC peering or a transit gateway that allow for resources in either VPC to communicate with each other as if they’re within the same network. For more information about access options, see Accessing an Amazon MSK Cluster.

Although these options are valid, this post focuses on a different approach, which uses AWS PrivateLink. Therefore, before we dive deep into the actual patterns, let’s briefly discuss when AWS PrivateLink is a more appropriate strategy for cross-account and cross-VPC access.

VPC peering, illustrated below, is a bidirectional networking connection between two VPCs that enables you to route traffic between them using private IPv4 addresses or IPv6 addresses.

VPC peering is more suited for environments that have a high degree of trust between the parties that are peering their VPCs. This is because, after a VPC peering connection is established, the two VPCs can have broad access to each other, with resources in either VPC capable of initiating a connection. You’re responsible for implementing fine-grained network access controls with security groups to make sure that only specific resources intended to be reachable are accessible between the peered VPCs.

You can only establish VPC peering connections across VPCs that have non-overlapping CIDRs. This can pose a challenge when you need to peer VPCs with overlapping CIDRs, such as when peering across accounts from different organizations.

Additionally, if you’re running at scale, you can have hundreds of Amazon VPCs, and VPC peering has a limit of 125 peering connections to a single Amazon VPC. You can use a network hub like transit gateway, which, although highly scalable in enabling you to connect thousands of Amazon VPCs, requires similar bidirectional trust and non-overlapping CIDRs as VPC peering.

In contrast, AWS PrivateLink provides fine-grained network access control to specific resources in a VPC instead of all resources by default, and is therefore more suited for environments that want to follow a lower trust model approach, thus reducing their risk surface. The following diagram shows a service provider VPC that has a service running on Amazon Elastic Compute Cloud (Amazon EC2) instances, fronted by a Network Load Balancer (NLB). The service provider creates a configuration called a VPC endpoint service in the service provider VPC, pointing to the NLB. You can share this endpoint service with another Amazon VPC (service consumer VPC), which can use an interface VPC endpoint powered by AWS PrivateLink to connect to the service. The service consumers use this interface endpoint to reach the end application or service directly.

AWS PrivateLink makes sure that the connections initiated to a specific set of network resources are unidirectional—the connection can only originate from the service consumer VPC and flow into the service provider VPC and not the other way around. Outside of the network resources backed by the interface endpoint, no other resources in the service provider VPC get exposed. AWS PrivateLink allows for VPC CIDR ranges to overlap, and it can relatively scale better because thousands of Amazon VPCs can consume each service.

VPC peering and AWS PrivateLink are therefore two connectivity options suited for different trust models and use cases.

Transaction Banking’s micro-account strategy

An AWS account is a strong isolation boundary that provides both access control and reduced blast radius for issues that may occur due to deployment and configuration errors. This strong isolation is possible because you need to deliberately and proactively configure flows that cross an account boundary. TxB designed a strategy that moves each of their systems into its own AWS account, each of which is called a TxB micro-account. This strategy allows TxB to minimize the chances of a misconfiguration exposing multiple systems. For more information about TxB micro-accounts, see the video AWS re:Invent 2018: Policy Verification and Enforcement at Scale with AWS on YouTube.

To further complement the strong gains realized due to a TxB micro-account segmentation, TxB chose AWS PrivateLink for cross-account and cross-VPC access of their systems. AWS PrivateLink allows TxB service providers to expose their services as an endpoint service and use whitelisting to explicitly configure which other AWS accounts can create interface endpoints to these services. This also allows for fine-grained control of the access patterns for each service. The endpoint service definition only allows access to resources attached to the NLBs and thereby makes it easy to understand the scope of access overall. The one-way initiation of connection from a service consumer to a service provider makes sure that all connectivity is controlled on a point-to-point basis.  Furthermore, AWS PrivateLink allows the CIDR blocks of VPCs to overlap between the TxB micro-accounts. Thus the use of AWS PrivateLink sets TxB up for future growth as a part of their default setup, because thousands of TxB micro-account VPCs can consume each service if needed.

MSK broker access patterns using AWS PrivateLink

As a part of their micro-account strategy, TxB runs an MSK cluster in its own dedicated AWS account, and clients that interact with this cluster are in their respective micro-accounts. Considering this setup and the preference to use AWS PrivateLink for cross-account connectivity, TxB evaluated the following two patterns for broker access across accounts.

Pattern 1: Front each MSK broker with a unique dedicated interface endpoint

In this pattern, each MSK broker is fronted with a unique dedicated NLB in the TxB MSK account hosting the MSK cluster. The TxB MSK account contains an endpoint service for every NLB and is shared with the client account. The client account contains interface endpoints corresponding to the endpoint services. Finally, DNS entries identical to the broker DNS names point to the respective interface endpoint. The following diagram illustrates this pattern in the US East (Ohio) Region.

High-level flow

After setup, clients from their own accounts talk to the brokers using their provisioned default DNS names as follows:

  1. The client resolves the broker DNS name to the interface endpoint IP address inside the client VPC.
  2. The client initiates a TCP connection to the interface endpoint IP over port 9094.
  3. With AWS PrivateLink technology, this TCP connection is routed to the dedicated NLB setup for the respective broker listening on the same port within the TxB MSK account.
  4. The NLB routes the connection to the single broker IP registered behind it on TCP port 9094.

High-level setup

The setup steps in this section are shown for the US East (Ohio) Region, please modify if using another region. In the TxB MSK account, complete the following:

  1. Create a target group with target type as IP, protocol TCP, port 9094, and in the same VPC as the MSK cluster.
    • Register the MSK broker as a target by its IP address.
  2. Create an NLB with a listener of TCP port 9094 and forwarding to the target group created in the previous step.
    • Enable the NLB for the same AZ and subnet as the MSK broker it fronts.
  3. Create an endpoint service configuration for each NLB that requires acceptance and grant permissions to the client account so it can create a connection to this endpoint service.

In the client account, complete the following:

  1. Create an interface endpoint in the same VPC the client is in (this connection request needs to be accepted within the TxB MSK account).
  2. Create a Route 53 private hosted zone, with the domain name kafka.us-east-2.amazonaws.com, and associate it with the same VPC as the clients are in.
  3. Create A-Alias records identical to the broker DNS names to avoid any TLS handshake failures and point it to the interface endpoints of the respective brokers.

Pattern 2: Front all MSK brokers with a single shared interface endpoint

In this second pattern, all brokers in the cluster are fronted with a single unique NLB that has cross-zone load balancing enabled. You make this possible by modifying each MSK broker’s advertised.listeners config to advertise a unique port. You create a unique NLB listener-target group pair for each broker and a single shared listener-target group pair for all brokers. You create an endpoint service configuration for this single NLB and share it with the client account. In the client account, you create an interface endpoint corresponding to the endpoint service. Finally, you create DNS entries identical to the broker DNS names that point to the single interface. The following diagram illustrates this pattern in the US East (Ohio) Region.

High-level flow

After setup, clients from their own accounts talk to the brokers using their provisioned default DNS names as follows:

  1. The client resolves the broker DNS name to the interface endpoint IP address inside the client VPC.
  2. The client initiates a TCP connection to the interface endpoint over port 9094.
  3. The NLB listener within the TxB MSK account on port 9094 receives the connection.
  4. The NLB listener’s corresponding target group load balances the request to one of the brokers registered to it (Broker 1). In response, Broker 1 sends back the advertised DNS name and port (9001) to the client.
  5. The client resolves the broker endpoint address again to the interface endpoint IP and initiates a connection to the same interface endpoint over TCP port 9001.
  6. This connection is routed to the NLB listener for TCP port 9001.
  7. This NLB listener’s corresponding target group is configured to receive the traffic on TCP port 9094, and forwards the request on the same port to the only registered target, Broker 1.

High-level setup

The setup steps in this section are shown for the US East (Ohio) Region, please modify if using another region. In the TxB MSK account, complete the following:

  1. Modify the port that the MSK broker is advertising by running the following command against each running broker. The following example command shows changing the advertised port on a specific broker b-1 to 9001. For each broker you run the below command against, you must change the values of bootstrap-server, entity-name, CLIENT_SECURE, REPLICATION and REPLICATION_SECURE. Please note that while modifying the REPLICATION and REPLICATION_SECURE values, -internal has to be appended to the broker name and the ports 9093 and 9095 shown below should not be changed.
    ./kafka-configs.sh \
    --bootstrap-server b-1.exampleClusterName.abcde.c2.kafka.us-east-2.amazonaws.com:9094 \
    --entity-type brokers \
    --entity-name 1 \
    --alter \
    --command-config kafka_2.12-2.2.1/bin/client.properties \
    --add-config advertised.listeners=[\
    CLIENT_SECURE://b-1.exampleClusterName.abcde.c2.kafka.us-east-2.amazonaws.com:9001,\
    REPLICATION://b-1-internal.exampleClusterName.abcde.c2.kafka.us-east-2.amazonaws.com:9093,\
    REPLICATION_SECURE://b-1-internal.exampleClusterName.abcde.c2.kafka.us-east-2.amazonaws.com:9095]

  2. Create a target group with target type as IP, protocol TCP, port 9094, and in the same VPC as the MSK cluster. The preceding diagram represents this as B-ALL.
    • Register all MSK brokers to B-ALL as a target by its IP address.
  3. Create target groups dedicated for each broker (B1, B2) with the same properties as B-ALL.
    • Register the respective MSK broker to each target group by its IP address.
  4. Perform the same steps for additional brokers if needed and create unique listener-target group corresponding to the advertised port for each broker.
  5. Create an NLB that is enabled for the same subnets that the MSK brokers are in and with cross-zone load balancing enabled.
    • Create a TCP listener for every broker’s advertised port (9001, 9002) that forwards to the corresponding target group you created (B1, B2).
    • Create a special TCP listener 9094 that forwards to the B-ALL target group.
  6. Create an endpoint service configuration for the NLB that requires acceptance and grant permissions to the client account to create a connection to this endpoint service.

In the client account, complete the following:

  1. Create an interface endpoint in the same VPC the client is in (this connection request needs to be accepted within the TxB MSK account).
  2. Create a Route 53 private hosted zone, with the domain name kafka.us-east-2.amazonaws.com and associate it with the same VPC as the client is in.
  3. Under this hosted zone, create A-Alias records identical to the broker DNS names to avoid any TLS handshake failures and point it to the interface endpoint.

This post shows both of these patterns to be using TLS on TCP port 9094 to talk to the MSK brokers. If your security posture allows the use of plaintext communication between the clients and brokers, these patterns apply in that scenario as well, using TCP port 9092.

With both of these patterns, if Amazon MSK detects a broker failure, it mitigates the failure by replacing the unhealthy broker with a new one. In addition, the new MSK broker retains the same IP address and has the same Kafka properties, such as any modified advertised.listener configuration.

Amazon MSK allows clients to communicate with the service on TCP ports 9092, 9094, and 2181. As a byproduct of modifying the advertised.listener in Pattern 2, clients are automatically asked to speak with the brokers on the advertised port. If there is a need for clients in the same account as Amazon MSK to access the brokers, you should create a new Route53 hosted zone in the Amazon MSK account with identical broker DNS names pointing to the NLB DNS name. The Route53 record sets override the MSK broker DNS and allow for all traffic to the brokers to go via the NLB.

Transaction Banking’s MSK broker access pattern

For broker access across TxB micro-accounts, TxB chose Pattern 1, where one interface endpoint per broker is exposed to the client account. TxB streamlined this overall process by automating the creation of the endpoint service within the TxB MSK account and the interface endpoints within the client accounts without any manual intervention.

At the time of cluster creation, the bootstrap broker configuration is retrieved by calling the Amazon MSK APIs and stored in AWS Systems Manager Parameter Store in the client account so that they can be retrieved on application startup. This enables clients to be agnostic of the Kafka broker’s DNS names being launched in a completely different account.

A key driver for TxB choosing Pattern 1 is that it avoids having to modify a broker property like the advertised port. Pattern 2 creates the need for TxB to track which broker is advertising which port and make sure new brokers aren’t reusing the same port. This adds the overhead of having to modify and track the advertised port of new brokers being launched live and having to create a corresponding listener-target group pair for these brokers. TxB avoided this additional overhead by choosing Pattern 1.

On the other hand, Pattern 1 requires the creation of additional dedicated NLBs and interface endpoint connections when more brokers are added to the cluster. TxB limits this management overhead through automation, which requires additional engineering effort.

Also, using Pattern 1 costs more compared to Pattern 2, because each broker in the cluster has a dedicated NLB and an interface endpoint. For a single broker, it costs $37.80 per month to keep the end-to-end connectivity infrastructure up. The breakdown of the monthly connectivity costs is as follows:

  • NLB running cost – 1 NLB x $0.0225 x 720 hours/month = $16.20/month
  • 1 VPC endpoint spread across three AZs – 1 VPCE x 3 ENIs x $0.01 x 720 hours/month = $21.60/month

Additional charges for NLB capacity used and AWS PrivateLink data processed apply. For more information about pricing, see Elastic Load Balancing pricing and AWS PrivateLink pricing.

To summarize, Pattern 1 is best applicable when:

  • You want to minimize the management overhead associated with modifying broker properties, such as advertised port
  • You have automation that takes care of adding and removing infrastructure when new brokers are created or destroyed
  • Simplified and uniform deployments are primary drivers, with cost as a secondary concern

Transaction Banking’s security requirements for Amazon MSK

The TxB micro-account provides a strong application isolation boundary, and accessing MSK brokers using AWS PrivateLink using Pattern 1 allows for tightly controlled connection flows between these TxB micro-accounts. TxB further builds on this foundation through additional infrastructure and data protection controls available in Amazon MSK. For more information, see Security in Amazon Managed Streaming for Apache Kafka.

The following are the core security tenets that TxB’s internal security team require for using Amazon MSK:

  • Encryption at rest using Customer Master Key (CMK) – TxB uses the Amazon MSK managed offering of encryption at rest. Amazon MSK integrates with AWS Key Management Service (AWS KMS) to offer transparent server-side encryption to always encrypt your data at rest. When you create an MSK cluster, you can specify the AWS KMS CMK that AWS KMS uses to generate data keys that encrypt your data at rest. For more information, see Using CMKs and data keys.
  • Encryption in transit – Amazon MSK uses TLS 1.2 for encryption in transit. TxB makes client-broker encryption and encryption between the MSK brokers mandatory.
  • Client authentication with TLS – Amazon MSK uses AWS Certificate Manager Private Certificate Authority (ACM PCA) for client authentication. The ACM PCA can either be a root Certificate Authority (CA) or a subordinate CA. If it’s a root CA, you need to install a self-signed certificate. If it’s a subordinate CA, you can choose its parent to be an ACM PCA root, a subordinate CA, or an external CA. This external CA can be your own CA that issues the certificate and becomes part of the certificate chain when installed as the ACM PCA certificate. TxB takes advantage of this capability and uses certificates signed by ACM PCA that are distributed to the client accounts.
  • Authorization using Kafka Access Control Lists (ACLs) – Amazon MSK allows you to use the Distinguished Name of a client’s TLS certificates as the principal of the Kafka ACL to authorize client requests. To enable Kafka ACLs, you must first have client authentication using TLS enabled. TxB uses the Kafka Admin API to create Kafka ACLs for each topic using the certificate names of the certificates deployed on the consumer and producer client instances. For more information, see Apache Kafka ACLs.

Conclusion

This post illustrated how the Transaction Banking team at Goldman Sachs approaches an application isolation boundary through the TxB micro-account strategy and how AWS PrivateLink complements this strategy.  Additionally, this post discussed how the TxB team builds connectivity to their MSK clusters across TxB micro-accounts and how Amazon MSK takes the undifferentiated heavy lifting away from TxB by allowing them to achieve their core security requirements. You can leverage this post as a reference to build a similar approach when implementing an Amazon MSK environment.

 


About the Authors

Robert L. Cossin is a Vice President at Goldman Sachs in New York. Rob joined Goldman Sachs in 2004 and has worked on many projects within the firm’s cash and securities flows. Most recently, Rob is a technical architect on the Transaction Banking team, focusing on cloud enablement and security.

 

 

 

Harsha W. Sharma is a Solutions Architect with AWS in New York. Harsha joined AWS in 2016 and works with Global Financial Services customers to design and develop architectures on AWS, and support their journey on the cloud.