Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

AWS Weekly Roundup – Amazon Bedrock Is Now Generally Available, Attend AWS Innovate Online, and More – Oct 2, 2023

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-amazon-bedrock-is-now-generally-available-attend-aws-innovate-online-and-more-oct-2-2023/

Last week I attended the AWS Summit Johannesburg. This was the first summit to be hosted in my own country and my own city since 2019 so it was very special to have the opportunity to attend. It was great to get to meet with so many of our customers and hear how they are building on AWS.

Now on to the AWS updates. I’ve compiled a few announcements and upcoming events you need to know about. Let’s get started!

Last Week’s Launches
Amazon Bedrock Is Now Generally Available – Amazon Bedrock was announced in preview in April of this year as part of a set of new tools for building with generative AI on AWS. Last week’s announcement of this service being generally available was received with a lot of excitement and customers have already been sharing what they are building with Amazon Bedrock. I quite enjoyed this lighthearted post from AWS Serverless Hero Jones Zachariah Noel about the “Bengaluru with traffic-filled roads” image he produced using Stability AI’s Stable Diffusion XL image generation model on Amazon Bedrock.

Amazon MSK Introduces Managed Data Delivery from Apache Kafka to Your Data Lake – Amazon MSK was released in 2019 to help our customers reduce the work needed to set up, scale, and manage Apache Kafka in production. Now you can continuously load data from an Apache Kafka cluster to Amazon Simple Storage Service (Amazon S3).

Other AWS News
A few more news items and blog posts you might have missed:

The Community.AWS Blog is where builders share and learn with the community of cloud enthusiasts. Contributors to this blog include AWS employees, AWS Heroes, AWS Community Builders, and other members of the AWS Community. Last week, AWS Hero Johannes Koch published this awesome post on how to build a simple website using Flutter that interacts with a serverless backend powered by AppSync-merged APIs.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Upcoming AWS Events
We have the following upcoming events:

AWS Cloud Days (October 10, 24) – Connect and collaborate with other like-minded folks while learning about AWS at the AWS Cloud Day in Athens and Prague.

AWS Innovate Online (October 19)Register for AWS Innovate Online to learn how you can build, run, and scale next-generation applications on the most extensive cloud platform. There will be 80+ sessions delivered in five languages and you’ll receive a certificate of attendance to showcase all you’ve learned.

We’re focused on improving our content to provide a better customer experience, and we need your feedback to do so. Take this quick survey to share insights on your experience with the AWS Blog. Note that this survey is hosted by an external company, so the link doesn’t lead to our website. AWS handles your information as described in the AWS Privacy Notice.

Veliswa

Non-JSON ingestion using Amazon Kinesis Data Streams, Amazon MSK, and Amazon Redshift Streaming Ingestion

Post Syndicated from M Mehrtens original https://aws.amazon.com/blogs/big-data/non-json-ingestion-using-amazon-kinesis-data-streams-amazon-msk-and-amazon-redshift-streaming-ingestion/

Organizations are grappling with the ever-expanding spectrum of data formats in today’s data-driven landscape. From Avro’s binary serialization to the efficient and compact structure of Protobuf, the landscape of data formats has expanded far beyond the traditional realms of CSV and JSON. As organizations strive to derive insights from these diverse data streams, the challenge lies in seamlessly integrating them into a scalable solution.

In this post, we dive into Amazon Redshift Streaming Ingestion to ingest, process, and analyze non-JSON data formats. Amazon Redshift Streaming Ingestion allows you to connect to Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK) directly through materialized views, in real time and without the complexity associated with staging the data in Amazon Simple Storage Service (Amazon S3) and loading it into the cluster. These materialized views not only provide a landing zone for streaming data, but also offer the flexibility of incorporating SQL transforms and blending into your extract, load, and transform (ELT) pipeline for enhanced processing. For a deeper exploration on configuring and using streaming ingestion in Amazon Redshift, refer to Real-time analytics with Amazon Redshift streaming ingestion.

JSON data in Amazon Redshift

Amazon Redshift enables storage, processing, and analytics on JSON data through the SUPER data type, PartiQL language, materialized views, and data lake queries. The base construct to access streaming data in Amazon Redshift provides metadata from the source stream (attributes like stream timestamp, sequence numbers, refresh timestamp, and more) and the raw binary data from the stream itself. For streams that contain the raw binary data encoded in JSON format, Amazon Redshift provides a variety of tools for parsing and managing the data. For more information about the metadata of each stream format, refer to Getting started with streaming ingestion from Amazon Kinesis Data Streams and Getting started with streaming ingestion from Amazon Managed Streaming for Apache Kafka.

At the most basic level, Amazon Redshift allows parsing the raw data into distinct columns. The JSON_EXTRACT_PATH_TEXT and JSON_EXTRACT_ARRAY_ELEMENT_TEXT functions enable the extraction of specific details from JSON objects and arrays, transforming them into separate columns for analysis. When the structure of the JSON documents and specific reporting requirements are defined, these methods allow for pre-computing a materialized view with the exact structure needed for reporting, with improved compression and sorting for analytics.

In addition to this approach, the Amazon Redshift JSON functions allow storing and analyzing the JSON data in its original state using the adaptable SUPER data type. The function JSON_PARSE allows you to extract the binary data in the stream and convert it into the SUPER data type. With the SUPER data type and PartiQL language, Amazon Redshift extends its capabilities for semi-structured data analysis. It uses the SUPER data type for JSON data storage, offering schema flexibility within a column. For more information on using the SUPER data type, refer to Ingesting and querying semistructured data in Amazon Redshift. This dynamic capability simplifies data ingestion, storage, transformation, and analysis of semi-structured data, enriching insights from diverse sources within the Redshift environment.

Streaming data formats

Organizations using alternative serialization formats must explore different deserialization methods. In the next section, we dive into the optimal approach for deserialization. In this section, we take a closer look at the diverse formats and strategies organizations use to effectively manage their data. This understanding is key in determining the data parsing approach in Amazon Redshift.

Many organizations use a format other than JSON for their streaming use cases. JSON is a self-describing serialization format, where the schema of the data is stored alongside the actual data itself. This makes JSON flexible for applications, but this approach can lead to increased data transmission between applications due to the additional data contained in the JSON keys and syntax. Organizations seeking to optimize their serialization and deserialization performance, and their network communication between applications, may opt to use a format like Avro, Protobuf, or even a custom proprietary format to serialize application data into binary format in an optimized way. This provides the advantage of an efficient serialization where only the message values are packed into a binary message. However, this requires the consumer of the data to know what schema and protocol was used to serialize the data to deserialize the message. There are several ways that organizations can solve this problem, as illustrated in the following figure.

Visualization of different binary message serialization approaches

Embedded schema

In an embedded schema approach, the data format itself contains the schema information alongside the actual data. This means that when a message is serialized, it includes both the schema definition and the data values. This allows anyone receiving the message to directly interpret and understand its structure without needing to refer to an external source for schema information. Formats like JSON, MessagePack, and YAML are examples of embedded schema formats. When you receive a message in this format, you can immediately parse it and access the data with no additional steps.

Assumed schema

In an assumed schema approach, the message serialization contains only the data values, and there is no schema information included. To interpret the data correctly, the receiving application needs to have prior knowledge of the schema that was used to serialize the message. This is typically achieved by associating the schema with some identifier or context, like a stream name. When the receiving application reads a message, it uses this context to retrieve the corresponding schema and then decodes the binary data accordingly. This approach requires an additional step of schema retrieval and decoding based on context. This generally requires setting up a mapping in-code or in an external database so that consumers can dynamically retrieve the schemas based on stream metadata (such as the AWS Glue Schema Registry).

One drawback of this approach is in tracking schema versions. Although consumers can identify the relevant schema from the stream name, they can’t identify the particular version of the schema that was used. Producers need to ensure that they are making backward-compatible changes to schemas to ensure consumers aren’t disrupted when using a different schema version.

Embedded schema ID

In this case, the producer continues to serialize the data in binary format (like Avro or Protobuf), similar to the assumed schema approach. However, an additional step is involved: the producer adds a schema ID at the beginning of the message header. When a consumer processes the message, it starts by extracting the schema ID from the header. With this schema ID, the consumer then fetches the corresponding schema from a registry. Using the retrieved schema, the consumer can effectively parse the rest of the message. For example, the AWS Glue Schema Registry provides Java SDK SerDe libraries, which can natively serialize and deserialize messages in a stream using embedded schema IDs. Refer to How the schema registry works for more information about using the registry.

The usage of an external schema registry is common in streaming applications because it provides a number of benefits to consumers and developers. This registry contains all the message schemas for the applications and associates them with a unique identifier to facilitate schema retrieval. In addition, the registry may provide other functionalities like schema version change handling and documentation to facilitate application development.

The embedded schema ID in the message payload can contain version information, ensuring publishers and consumers are always using the same schema version to manage data. When schema version information isn’t available, schema registries can help enforce producers making backward-compatible changes to avoid causing issues in consumers. This helps decouple producers and consumers, provides schema validation at both the publisher and consumer stage, and allows for more flexibility in stream usage to allow for a variety of application requirements. Messages can be published with one schema per stream, or with multiple schemas inside a single stream, allowing consumers to dynamically interpret messages as they arrive.

For a deeper dive into the benefits of a schema registry, refer to Validate streaming data over Amazon MSK using schemas in cross-account AWS Glue Schema Registry.

Schema in file

For batch processing use cases, applications may embed the schema used to serialize the data into the data file itself to facilitate data consumption. This is an extension of the embedded schema approach but is less costly because the data file is generally larger, so the schema accounts for a proportionally smaller amount of the overall data. In this case, the consumers can process the data directly without additional logic. Amazon Redshift supports loading Avro data that has been serialized in this manner using the COPY command.

Convert non-JSON data to JSON

Organizations aiming to use non-JSON serialization formats need to develop an external method for parsing their messages outside of Amazon Redshift. We recommend using an AWS Lambda-based external user-defined function (UDF) for this process. Using an external Lambda UDF allows organizations to define arbitrary deserialization logic to support any message format, including embedded schema, assumed schema, and embedded schema ID approaches. Although Amazon Redshift supports defining Python UDFs natively, which may be a viable alternative for some use cases, we demonstrate the Lambda UDF approach in this post to cover more complex scenarios. For examples of Amazon Redshift UDFs, refer to AWS Samples on GitHub.

The basic architecture for this solution is as follows.

See the following code:

-- Step 1
CREATE OR REPLACE EXTERNAL FUNCTION fn_lambda_decode_avro_binary(varchar)
RETURNS varchar IMMUTABLE LAMBDA 'redshift-avro-udf';

-- Step 2
CREATE EXTERNAL SCHEMA kds FROM KINESIS

-- Step 3
CREATE MATERIALIZED VIEW {name} AUTO REFRESH YES AS
SELECT
    -- Step 4
   t.kinesis_data AS binary_avro,
   to_hex(binary_avro) AS hex_avro,
   -- Step 5
   fn_lambda_decode_avro_binary('{stream-name}', hex_avro) AS json_string,
   -- Step 6
   JSON_PARSE(json_string) AS super_data,
   t.sequence_number,
   t.refresh_time,
   t.approximate_arrival_timestamp,
   t.shard_id
FROM kds.{stream_name} AS t

Let’s explore each step in more detail.

Create the Lambda UDF

The overall goal is to develop a method that can accept the raw data as input and produce JSON-encoded data as an output. This aligns with the Amazon Redshift ability to natively process JSON into the SUPER data type. The specifics of the function depend on the serialization and streaming approach. For example, using the assumed schema approach with Avro format, your Lambda function may complete the following steps:

  1. Take in the stream name and hexadecimal-encoded data as inputs.
  2. Use the stream name to perform a lookup to identify the schema for the given stream name.
  3. Decode the hexadecimal data into binary format.
  4. Use the schema to deserialize the binary data into readable format.
  5. Re-serialize the data into JSON format.

The f_glue_schema_registry_avro_to_json AWS samples example illustrates the process of decoding Avro using the assumed schema approach using the AWS Glue Schema Registry in a Lambda UDF to retrieve and use Avro schemas by stream name. For other approaches (such as embedded schema ID), you should author your Lambda function to handle deserialization as defined by your serialization process and schema registry implementation. If your application depends on an external schema registry or table lookup to process the message schema, we recommend that you implement caching for schema lookups to help reduce the load on the external systems and reduce the average Lambda function invocation duration.

When creating the Lambda function, make sure you accommodate the Amazon Redshift input event format and ensure compliance with the expected Amazon Redshift event output format. For details, refer to Creating a scalar Lambda UDF.

After you create and test the Lambda function, you can define it as a UDF in Amazon Redshift. For effective integration within Amazon Redshift, designate this Lambda function UDF as IMMUTABLE. This classification supports incremental materialized view updates. This treats the Lambda function as idempotent and minimizes the Lambda function costs for the solution, because a message doesn’t need to be processed if it has been processed before.

Configure the baseline Kinesis data stream

Regardless of your messaging format or approach (embedded schema, assumed schema, and embedded schema ID), you begin with setting up the external schema for streaming ingestion from your messaging source into Amazon Redshift. For more information, refer to Streaming ingestion.

CREATE EXTERNAL SCHEMA kds FROM KINESIS

IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';

Create the raw materialized view

Next, you define your raw materialized view. This view contains the raw message data from the streaming source in Amazon Redshift VARBYTE format.

Convert the VARBYTE data to VARCHAR format

External Lambda function UDFs don’t support VARBYTE as an input data type. Therefore, you must convert the raw VARBYTE data from the stream into VARCHAR format to pass to the Lambda function. The best way to do this in Amazon Redshift is using the TO_HEX built-in method. This converts the binary data into hexadecimal-encoded character data, which can be sent to the Lambda UDF.

Invoke the Lambda function to retrieve JSON data

After the UDF has been defined, we can invoke the UDF to convert our hexadecimal-encoded data into JSON-encoded VARCHAR data.

Use the JSON_PARSE method to convert the JSON data to SUPER data type

Finally, we can use the Amazon Redshift native JSON parsing methods like JSON_PARSE, JSON_EXTRACT_PATH_TEXT, and more to parse the JSON data into a format that we can use for analytics.

Considerations

Consider the following when using this strategy:

  • Cost – Amazon Redshift invokes the Lambda function in batches to improve scalability and reduce the overall number of Lambda invocations. The cost of this solution depends on the number of messages in your stream, the frequency of the refresh, and the invocation time required to process the messages in a batch from Amazon Redshift. Using the IMMUTABLE UDF type in Amazon Redshift can also help minimize costs by utilizing the incremental refresh strategy for the materialized view.
  • Permissions and network access – The AWS Identity and Access Management (IAM) role used for the Amazon Redshift UDF must have permissions to invoke the Lambda function, and you must deploy the Lambda function such that it has access to invoke its external dependencies (for example, you may need to deploy it in a VPC to access private resources like a schema registry).
  • Monitoring – Use Lambda function logging and metrics to identify errors in deserialization, connection to the schema registry, and data processing. For details on monitoring the UDF Lambda function, refer to Embedding metrics within logs and Monitoring and troubleshooting Lambda functions.

Conclusion

In this post, we dove into different data formats and ingestion methods for a streaming use case. By exploring strategies for handling non-JSON data formats, we examined the use of Amazon Redshift streaming to seamlessly ingest, process, and analyze these formats in near-real time using materialized views.

Furthermore, we navigated through schema-per-stream, embedded schema, assumed schema, and embedded schema ID approaches, highlighting their merits and considerations. To bridge the gap between non-JSON formats and Amazon Redshift, we explored the creation of Lambda UDFs for data parsing and conversion. This approach offers a comprehensive means to integrate diverse data streams into Amazon Redshift for subsequent analysis.

As you navigate the ever-evolving landscape of data formats and analytics, we hope this exploration provides valuable guidance to derive meaningful insights from your data streams. We welcome any thoughts or questions in the comments section.


About the Authors

M Mehrtens has been working in distributed systems engineering throughout their career, working as a Software Engineer, Architect, and Data Engineer. In the past, M has supported and built systems to process terrabytes of streaming data at low latency, run enterprise Machine Learning pipelines, and created systems to share data across teams seamlessly with varying data toolsets and software stacks. At AWS, they are a Sr. Solutions Architect supporting US Federal Financial customers.

Sindhu Achuthan is a Sr. Solutions Architect with Federal Financials at AWS. She works with customers to provide architectural guidance on analytics solutions using AWS Glue, Amazon EMR, Amazon Kinesis, and other services. Outside of work, she loves DIYs, to go on long trails, and yoga.

Build event-driven architectures with Amazon MSK and Amazon EventBridge

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/big-data/build-event-driven-architectures-with-amazon-msk-and-amazon-eventbridge/

Based on immutable facts (events), event-driven architectures (EDAs) allow businesses to gain deeper insights into their customers’ behavior, unlocking more accurate and faster decision-making processes that lead to better customer experiences. In EDAs, modern event brokers, such as Amazon EventBridge and Apache Kafka, play a key role to publish and subscribe to events. EventBridge is a serverless event bus that ingests data from your own apps, software as a service (SaaS) apps, and AWS services and routes that data to targets. Although there is overlap in their role as the backbone in EDAs, both solutions emerged from different problem statements and provide unique features to solve specific use cases. With a solid understanding of both technologies and their primary use cases, developers can create easy-to-use, maintainable, and evolvable EDAs.

If the use case is well defined and directly maps to one event bus, such as event streaming and analytics with streaming events (Kafka) or application integration with simplified and consistent event filtering, transformation, and routing on discrete events (EventBridge), the decision for a particular broker technology is straightforward. However, organizations and business requirements are often more complex and beyond the capabilities of one broker technology. In almost any case, choosing an event broker should not be a binary decision. Combining complementary broker technologies and embracing their unique strengths is a solid approach to build easy-to-use, maintainable, and evolvable EDAs. To make the integration between Kafka and EventBridge even smoother, AWS open-sourced the EventBridge Connector based on Apache Kafka. This allows you to consume from on-premises Kafka deployments and avoid point-to-point communication, while using the existing knowledge and toolset of Kafka Connect.

Streaming applications enable stateful computations over unbound datasets. This allows real-time use cases such as anomaly detection, event-time computations, and much more. These applications can be built using frameworks such as Apache Flink, Apache Spark, or Kafka Streams. Although some of those frameworks support sending events to downstream systems other than Apache Kafka, there is no standardized way of doing so across frameworks. It would require each application owner to build their own logic to send events downstream. In an EDA, the preferred way to handle such a scenario is to publish events to an event bus and then send them downstream.

There are two ways to send events from Apache Kafka to EventBridge: the preferred method using Amazon EventBridge Pipes or the EventBridge sink connector for Kafka Connect. In this post, we explore when to use which option and how to build an EDA using the EventBridge sink connector and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

EventBridge sink connector vs. EventBridge Pipes

EventBridge Pipes connects sources to targets with a point-to-point integration, supporting event filtering, transformations, enrichment, and event delivery to over 14 AWS services and external HTTPS-based targets using API Destinations. This is the preferred and most easy method to send events from Kafka to EventBridge as it simplifies the setup and operations with a delightful developer experience.

Alternatively, under the following circumstances you might want to choose the EventBridge sink connector to send events from Kafka directly to EventBridge Event Buses:

  • You are have already invested in processes and tooling around the Kafka Connect framework as the platform of your choice to integrate with other systems and services
  • You need to integrate with a Kafka-compatible schema registry e.g., the AWS Glue Schema Registry, supporting Avro and Protobuf data formats for event serialization and deserialization
  • You want to send events from on-premises Kafka environments directly to EventBridge Event Buses

Overview of solution

In this post, we show you how to use Kafka Connect and the EventBridge sink connector to send Avro-serialized events from Amazon Managed Streaming for Apache Kafka (Amazon MSK) to EventBridge. This enables a seamless and consistent data flow from Apache Kafka to dozens of supported EventBridge AWS and partner targets, such as Amazon CloudWatch, Amazon SQS, AWS Lambda, and HTTPS targets like Salesforce, Datadog, and Snowflake using EventBridge API destinations. The following diagram illustrates the event-driven architecture used in this blog post based on Amazon MSK and EventBridge.

Architecture Diagram

The workflow consists of the following steps:

  1. The demo application generates credit card transactions, which are sent to Amazon MSK using the Avro format.
  2. An analytics application running on Amazon Elastic Container Service (Amazon ECS) consumes the transactions and analyzes them if there is an anomaly.
  3. If an anomaly is detected, the application emits a fraud detection event back to the MSK notification topic.
  4. The EventBridge connector consumes the fraud detection events from Amazon MSK in Avro format.
  5. The connector converts the events to JSON and sends them to EventBridge.
  6. In EventBridge, we use JSON filtering rules to filter our events and send them to other services or another Event Bus. In this example, fraud detection events are sent to Amazon CloudWatch Logs for auditing and introspection, and to a third-party SaaS provider to showcase how easy it is to integrate with third-party APIs, such as Salesforce.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Deploy the AWS CDK stack

This walkthrough requires you to deploy an AWS CDK stack to your account. You can deploy the full stack end to end or just the required resources to follow along with this post.

  1. In your terminal, run the following command:
    git clone https://github.com/awslabs/eventbridge-kafka-connector/

  2. Navigate to the cdk directory:
    cd eventbridge-kafka-connector/cdk

  3. Deploy the AWS CDK stack based on your preferences:
  4. If you want to see the complete setup explained in this post, run the following command:
    cdk deploy —context deployment=FULL

  5. If you want to deploy the connector on your own but have the required resources already, including the MSK cluster, AWS Identity and Access Management (IAM) roles, security groups, data generator, and so on, run the following command:
    cdk deploy —context deployment=PREREQ

Deploy the EventBridge sink connector on Amazon MSK Connect

If you deployed the CDK stack in FULL mode, you can skip this section and move on to Create EventBridge rules.

The connector needs an IAM role that allows reading the data from the MSK cluster and sending records downstream to EventBridge.

Upload connector code to Amazon S3

Complete the following steps to upload the connector code to Amazon Simple Storage Service (Amazon S3):

  1. Navigate to the GitHub repo.
  2. Download the release 1.0.0 with the AWS Glue Schema Registry dependencies included.
  3. On the Amazon S3 console, choose Buckets in the navigation pane.
  4. Choose Create bucket.
  5. For Bucket name, enter eventbridgeconnector-bucket-${AWS_ACCOUNT_ID}.

As Because S3 buckets must be globally unique, replace ${AWS_ACCOUNT_ID} with your actual AWS account ID. For example, eventbridgeconnector-bucket-123456789012.

  1. Open the bucket and choose Upload.
  2. Select the .jar file that you downloaded from the GitHub repository and choose Upload.

S3 File Upload Console

Create a custom plugin

We now have our application code in Amazon S3. As a next step, we create a custom plugin in Amazon MSK Connect. Complete the following steps:

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane under MSK Connect.
  2. Choose Create custom plugin.
  3. For S3 URI – Custom plugin object, browse to the file named kafka-eventbridge-sink-with-gsr-dependencies.jar in the S3 bucket eventbridgeconnector-bucket-${AWS_ACCOUNT_ID} for the EventBridge connector.
  4. For Custom plugin name, enter msk-eventBridge-sink-plugin-v1.
  5. Enter an optional description.
  6. Choose Create custom plugin.

MSK Connect Plugin Screen

  1. Wait for plugin to transition to the status Active.

Create a connector

Complete the following steps to create a connector in MSK Connect:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the plugin msk-eventBridge-sink-plugin-v1 that you created earlier.
  4. Choose Next.
  5. For Connector name, enter msk-eventBridge-sink-connector.
  6. Enter an optional description.
  7. For Cluster type, select MSK cluster.
  8. For MSK clusters, select the cluster you created earlier.
  9. For Authentication, choose IAM.
  10. Under Connector configurations, enter the following settings (for more details on the configuration, see the GitHub repository):
    auto.offset.reset=earliest
    connector.class=software.amazon.event.kafkaconnector.EventBridgeSinkConnector
    topics=notifications
    aws.eventbridge.connector.id=avro-test-connector
    aws.eventbridge.eventbus.arn=arn:aws:events:us-east-1:123456789012:event-bus/eventbridge-sink-eventbus
    aws.eventbridge.region=us-east-1
    tasks.max=1
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    value.converter.region=us-east-1
    value.converter.registry.name=default-registry
    value.converter.avroRecordType=GENERIC_RECORD

  11. Make sure to replace aws.eventbridge.eventbus.arn, aws.eventbridge.region, and value.converter.region with the values from the prerequisite stack.
  12. In the Connector capacity section, select Autoscaled for Capacity type.
  13. Leave the default value of 1 for MCU count per worker.
  14. Keep all default values for Connector capacity.
  15. For Worker configuration, select Use the MSK default configuration.
  16. Under Access permissions, choose the custom IAM role KafkaEventBridgeSinkStack-connectorRole, which you created during the AWS CDK stack deployment.
  17. Choose Next.
  18. Choose Next again.
  19. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  20. For Log group, choose /aws/mskconnect/eventBridgeSinkConnector.
  21. Choose Next.
  22. Under Review and Create, validate all the settings and choose Create connector.

The connector will be now in the state Creating. It can take up to several minutes for the connector to transition into the status Running.

Create EventBridge rules

Now that the connector is forwarding events to EventBridge, we can use EventBridge rules to filter and send events to other targets. Complete the following steps to create a rule:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Choose Create rule.
  3. Enter eb-to-cloudwatch-logs-and-webhook for Name.
  4. Select eventbridge-sink-eventbus for Event bus.
  5. Choose Next.
  6. Select Custom pattern (JSON editor), choose Insert, and replace the event pattern with the following code:
    {
      "detail": {
        "value": {
          "eventType": ["suspiciousActivity"],
          "source": ["transactionAnalyzer"]
        }
      },
      "detail-type": [{
        "prefix": "kafka-connect-notification"
      }]
    }
    

  7. Choose Next.
  8. For Target 1, select CloudWatch log group and enter kafka-events for Log Group.
  9. Choose Add another target.
  10. (Optional: Create an API destination) For Target 2, select EventBridge API destination for Target types.
  11. Select Create a new API destination.
  12. Enter a descriptive name for Name.
  13. Add the URL and enter it as API destination endpoint. (This can be the URL of your Datadog, Salesforce, etc. endpoint)
  14. Select POST for HTTP method.
  15. Select Create a new connection for Connection.
  16. For Connection Name, enter a name.
  17. Select Other as Destination type and select API Key as Authorization Type.
  18. For API key name and Value, enter your keys.
  19. Choose Next.
  20. Validate your inputs and choose Create rule.

EventBridge Rule

The following screenshot of the CloudWatch Logs console shows several events from EventBridge.

CloudWatch Logs

Run the connector in production

In this section, we dive deeper into the operational aspects of the connector. Specifically, we discuss how the connector scales and how to monitor it using CloudWatch.

Scale the connector

Kafka connectors scale through the number of tasks. The code design of the EventBridge sink connector doesn’t limit the number of tasks that it can run. MSK Connect provides the compute capacity to run the tasks, which can be from Provisioned or Autoscaled type. During the deployment of the connector, we choose the capacity type Autoscaled and 1 MCU per worker (which represents 1vCPU and 4GiB of memory). This means MSK Connect will scale the infrastructure to run tasks but not the number of tasks. The number of tasks is defined by the connector. By default, the connector will start with the number of tasks defined in tasks.max in the connector configuration. If this value is higher than the partition count of the processed topic, the number of tasks will be set to the number of partitions during the Kafka Connect rebalance.

Monitor the connector

MSK Connect emits metrics to CloudWatch for monitoring by default. Besides MSK Connect metrics, the offset of the connector should also be monitored in production. Monitoring the offset gives insights if the connector can keep up with the data produced in the Kafka cluster.

Clean up

To clean up your resources and avoid ongoing charges, complete the following the steps:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Select the connectors you created and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster you created and choose Delete on the Actions menu.
  5. On the EventBridge console, choose Rules in the navigation pane.
  6. Choose the event bus eventbridge-sink-eventbus.
  7. Select all the rules you created and choose Delete.
  8. Confirm the removal by entering delete, then choose Delete.

If you deployed the AWS CDK stack with the context PREREQ, delete the .jar file for the connector.

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Navigate to the bucket where you uploaded your connector and delete the kafka-eventbridge-sink-with-gsr-dependencies.jar file.

Independent from the chosen deployment mode, all other AWS resources can be deleted by using AWS CDK or AWS CloudFormation. Run cdk destroy from the repository directory to delete the CloudFormation stack.

Alternatively, on the AWS CloudFormation console, select the stack KafkaEventBridgeSinkStack and choose Delete.

Conclusion

In this post, we showed how you can use MSK Connect to run the AWS open-sourced Kafka connector for EventBridge, how to configure the connector to forward a Kafka topic to EventBridge, and how to use EventBridge rules to filter and forward events to CloudWatch Logs and a webhook.

To learn more about the Kafka connector for EventBridge, refer to Amazon EventBridge announces open-source connector for Kafka Connect, as well as the MSK Connect Developer Guide and the code for the connector on the GitHub repo.


About the Authors

Florian Mair is a Senior Solutions Architect and data streaming expert at AWS. He is a technologist that helps customers in Germany succeed and innovate by solving business challenges using AWS Cloud services. Besides working as a Solutions Architect, Florian is a passionate mountaineer, and has climbed some of the highest mountains across Europe.

Benjamin Meyer is a Senior Solutions Architect at AWS, focused on Games businesses in Germany to solve business challenges by using AWS Cloud services. Benjamin has been an avid technologist for 7 years, and when he’s not helping customers, he can be found developing mobile apps, building electronics, or tending to his cacti.

Amazon MSK Introduces Managed Data Delivery from Apache Kafka to Your Data Lake

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/amazon-msk-introduces-managed-data-delivery-from-apache-kafka-to-your-data-lake/

I’m excited to announce today a new capability of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that allows you to continuously load data from an Apache Kafka cluster to Amazon Simple Storage Service (Amazon S3). We use Amazon Kinesis Data Firehose—an extract, transform, and load (ETL) service—to read data from a Kafka topic, transform the records, and write them to an Amazon S3 destination. Kinesis Data Firehose is entirely managed and you can configure it with just a few clicks in the console. No code or infrastructure is needed.

Kafka is commonly used for building real-time data pipelines that reliably move massive amounts of data between systems or applications. It provides a highly scalable and fault-tolerant publish-subscribe messaging system. Many AWS customers have adopted Kafka to capture streaming data such as click-stream events, transactions, IoT events, and application and machine logs, and have applications that perform real-time analytics, run continuous transformations, and distribute this data to data lakes and databases in real time.

However, deploying Kafka clusters is not without challenges.

The first challenge is to deploy, configure, and maintain the Kafka cluster itself. This is why we released Amazon MSK in May 2019. MSK reduces the work needed to set up, scale, and manage Apache Kafka in production. We take care of the infrastructure, freeing you to focus on your data and applications. The second challenge is to write, deploy, and manage application code that consumes data from Kafka. It typically requires coding connectors using the Kafka Connect framework and then deploying, managing, and maintaining a scalable infrastructure to run the connectors. In addition to the infrastructure, you also must code the data transformation and compression logic, manage the eventual errors, and code the retry logic to ensure no data is lost during the transfer out of Kafka.

Today, we announce the availability of a fully managed solution to deliver data from Amazon MSK to Amazon S3 using Amazon Kinesis Data Firehose. The solution is serverless–there is no server infrastructure to manage–and requires no code. The data transformation and error-handling logic can be configured with a few clicks in the console.

The architecture of the solution is illustrated by the following diagram.

Amazon MSK to Amazon S3 architecture diagram

Amazon MSK is the data source, and Amazon S3 is the data destination while Amazon Kinesis Data Firehose manages the data transfer logic.

When using this new capability, you no longer need to develop code to read your data from Amazon MSK, transform it, and write the resulting records to Amazon S3. Kinesis Data Firehose manages the reading, the transformation and compression, and the write operations to Amazon S3. It also handles the error and retry logic in case something goes wrong. The system delivers the records that can not be processed to the S3 bucket of your choice for manual inspection. The system also manages the infrastructure required to handle the data stream. It will scale out and scale in automatically to adjust to the volume of data to transfer. There are no provisioning or maintenance operations required on your side.

Kinesis Data Firehose delivery streams support both public and private Amazon MSK provisioned or serverless clusters. It also supports cross-account connections to read from an MSK cluster and to write to S3 buckets in different AWS accounts. The Data Firehose delivery stream reads data from your MSK cluster, buffers the data for a configurable threshold size and time, and then writes the buffered data to Amazon S3 as a single file. MSK and Data Firehose must be in the same AWS Region, but Data Firehose can deliver data to Amazon S3 buckets in other Regions.

Kinesis Data Firehose delivery streams can also convert data types. It has built-in transformations to support JSON to Apache Parquet and Apache ORC formats. These are columnar data formats that save space and enable faster queries on Amazon S3. For non-JSON data, you can use AWS Lambda to transform input formats such as CSV, XML, or structured text into JSON before converting the data to Apache Parquet/ORC. Additionally, you can specify data compression formats from Data Firehose, such as GZIP, ZIP, and SNAPPY, before delivering the data to Amazon S3, or you can deliver the data to Amazon S3 in its raw form.

Let’s See How It Works
To get started, I use an AWS account where there’s an Amazon MSK cluster already configured and some applications streaming data to it. To get started and to create your first Amazon MSK cluster, I encourage you to read the tutorial.

Amazon MSK - List of existing clusters

For this demo, I use the console to create and configure the data delivery stream. Alternatively, I can use the AWS Command Line Interface (AWS CLI), AWS SDKs, AWS CloudFormation, or Terraform.

I navigate to the Amazon Kinesis Data Firehose page of the AWS Management Console and then choose Create delivery stream.

Kinesis Data Firehose - Main console page

I select Amazon MSK as a data Source and Amazon S3 as a delivery Destination. For this demo, I want to connect to a private cluster, so I select Private bootstrap brokers under Amazon MSK cluster connectivity.

I need to enter the full ARN of my cluster. Like most people, I cannot remember the ARN, so I choose Browse and select my cluster from the list.

Finally, I enter the cluster Topic name I want this delivery stream to read from.

Configure the delivery stream

After the source is configured, I scroll down the page to configure the data transformation section.

On the Transform and convert records section, I can choose whether I want to provide my own Lambda function to transform records that aren’t in JSON or to transform my source JSON records to one of the two available pre-built destination data formats: Apache Parquet or Apache ORC.

Apache Parquet and ORC formats are more efficient than JSON format to query data from Amazon S3. You can select these destination data formats when your source records are in JSON format. You must also provide a data schema from a table in AWS Glue.

These built-in transformations optimize your Amazon S3 cost and reduce time-to-insights when downstream analytics queries are performed with Amazon Athena, Amazon Redshift Spectrum, or other systems.

Configure the data transformation in the delivery stream

Finally, I enter the name of the destination Amazon S3 bucket. Again, when I cannot remember it, I use the Browse button to let the console guide me through my list of buckets. Optionally, I enter an S3 bucket prefix for the file names. For this demo, I enter aws-news-blog. When I don’t enter a prefix name, Kinesis Data Firehose uses the date and time (in UTC) as the default value.

Under the Buffer hints, compression and encryption section, I can modify the default values for buffering, enable data compression, or select the KMS key to encrypt the data at rest on Amazon S3.

When ready, I choose Create delivery stream. After a few moments, the stream status changes to ✅  available.

Select the destination S3 bucket

Assuming there’s an application streaming data to the cluster I chose as a source, I can now navigate to my S3 bucket and see data appearing in the chosen destination format as Kinesis Data Firehose streams it.

S3 bucket browsers shows the files streamed from MSK

As you see, no code is required to read, transform, and write the records from my Kafka cluster. I also don’t have to manage the underlying infrastructure to run the streaming and transformation logic.

Pricing and Availability.
This new capability is available today in all AWS Regions where Amazon MSK and Kinesis Data Firehose are available.

You pay for the volume of data going out of Amazon MSK, measured in GB per month. The billing system takes into account the exact record size; there is no rounding. As usual, the pricing page has all the details.

I can’t wait to hear about the amount of infrastructure and code you’re going to retire after adopting this new capability. Now go and configure your first data stream between Amazon MSK and Amazon S3 today.

— seb

Stitch Fix seamless migration: Transitioning from self-managed Kafka to Amazon MSK

Post Syndicated from Karthik Kondamudi original https://aws.amazon.com/blogs/big-data/stitch-fix-seamless-migration-transitioning-from-self-managed-kafka-to-amazon-msk/

This post is co-written with Karthik Kondamudi and Jenny Thompson from Stitch Fix.

Stitch Fix is a personalized clothing styling service for men, women, and kids. At Stitch Fix, we have been powered by data science since its foundation and rely on many modern data lake and data processing technologies. In our infrastructure, Apache Kafka has emerged as a powerful tool for managing event streams and facilitating real-time data processing. At Stitch Fix, we have used Kafka extensively as part of our data infrastructure to support various needs across the business for over six years. Kafka plays a central role in the Stitch Fix efforts to overhaul its event delivery infrastructure and build a self-service data integration platform.

If you’d like to know more background about how we use Kafka at Stitch Fix, please refer to our previously published blog post, Putting the Power of Kafka into the Hands of Data Scientists. This post includes much more information on business use cases, architecture diagrams, and technical infrastructure.

In this post, we will describe how and why we decided to migrate from self-managed Kafka to Amazon Managed Streaming for Apache Kafka (Amazon MSK). We’ll start with an overview of our self-managed Kafka, why we chose to migrate to Amazon MSK, and ultimately how we did it.

  1. Kafka clusters overview
  2. Why migrate to Amazon MSK
  3. How we migrated to Amazon MSK
  4. Navigating challenges and lessons learned
  5. Conclusion

Kafka Clusters Overview

At Stitch Fix, we rely on several different Kafka clusters dedicated to specific purposes. This allows us to scale these clusters independently and apply more stringent SLAs and message delivery guarantees per cluster. This also reduces overall risk by minimizing the impact of changes and upgrades and allows us to isolate and fix any issues that occur within a single cluster.

Our main Kafka cluster serves as the backbone of our data infrastructure. It handles a multitude of critical functions, including managing business events, facilitating microservice communication, supporting feature generation for machine learning workflows, and much more. The stability, reliability, and performance of this cluster are of utmost importance to our operations.

Our logging cluster plays a vital role in our data infrastructure. It serves as a centralized repository for various application logs, including web server logs and Nginx server logs. These logs provide valuable insights for monitoring and troubleshooting purposes. The logging cluster ensures smooth operations and efficient analysis of log data.

Why migrate to Amazon MSK

In the past six years, our data infrastructure team has diligently managed our Kafka clusters. While our team has acquired extensive knowledge in maintaining Kafka, we have also faced challenges such as rolling deployments for version upgrades, applying OS patches, and the overall operational overhead.

At Stitch Fix, our engineers thrive on creating new features and expanding our service offerings to delight our customers. However, we recognized that allocating significant resources to Kafka maintenance was taking away precious time from innovation. To overcome this challenge, we set out to find a managed service provider that could handle maintenance tasks like upgrades and patching while granting us complete control over cluster operations, including partition management and rebalancing. We also sought an effortless scaling solution for storage volumes, keeping our costs in check while being ready to accommodate future growth.

After thorough evaluation of multiple options, we found the perfect match in Amazon MSK because it allows us to offload cluster maintenance to the highly skilled Amazon engineers. With Amazon MSK in place, our teams can now focus their energy on developing innovative applications unique and valuable to Stitch Fix, instead of getting caught up in Kafka administration tasks.

Amazon MSK streamlines the process, eliminating the need for manual configurations, additional software installations, and worries about scaling. It simply works, enabling us to concentrate on delivering exceptional value to our cherished customers.

How we migrated to Amazon MSK

While planning our migration, we desired to switch specific services to Amazon MSK individually with no downtime, ensuring that only a specific subset of services would be migrated at a time. The overall infrastructure would run in a hybrid environment where some services connect to Amazon MSK and others to the existing Kafka infrastructure.

We decided to start the migration with our less critical logging cluster first and then proceed to migrating the main cluster. Although the logs are essential for monitoring and troubleshooting purposes, they hold relatively less significance to the core business operations. Additionally, the number and types of consumers and producers for the logging cluster is smaller, making it an easier choice to start with. Then, we were able to apply our learnings from the logging cluster migration to the main cluster. This deliberate choice enabled us to execute the migration process in a controlled manner, minimizing any potential disruptions to our critical systems.

Over the years, our experienced data infrastructure team has employed Apache Kafka MirrorMaker 2 (MM2) to replicate data between different Kafka clusters. Currently, we rely on MM2 to replicate data from two different production Kafka clusters. Given its proven track record within our organization, we decided to use MM2 as the primary tool for our data migration process.

The general guidance for MM2 is as follows:

  1. Begin with less critical applications.
  2. Perform active migrations.
  3. Familiarize yourself with key best practices for MM2.
  4. Implement monitoring to validate the migration.
  5. Accumulate essential insights for migrating other applications.

MM2 offers flexible deployment options, allowing it to function as a standalone cluster or be embedded within an existing Kafka Connect cluster. For our migration project, we deployed a dedicated Kafka Connect cluster operating in distributed mode.

This setup provided the scalability we needed, allowing us to easily expand the standalone cluster if necessary. Depending on specific use cases such as geoproximity, high availability (HA), or migrations, MM2 can be configured for active-active replication, active-passive replication, or both. In our case, as we migrated from self-managed Kafka to Amazon MSK, we opted for an active-passive configuration, where MirrorMaker was used for migration purposes and subsequently taken offline upon completion.

MirrorMaker configuration and replication policy

By default, MirrorMaker renames replication topics by prefixing the name of the source Kafka cluster to the destination cluster. For instance, if we replicate topic A from the source cluster “existing” to the new cluster “newkafka,” the replicated topic would be named “existing.A” in “newkafka.” However, this default behavior can be modified to maintain consistent topic names within the newly created MSK cluster.

To maintain consistent topic names in the newly created MSK cluster and avoid downstream issues, we utilized the CustomReplicationPolicy jar provided by AWS. This jar, included in our MirrorMaker setup, allowed us to replicate topics with identical names in the MSK cluster. Additionally, we utilized MirrorCheckpointConnector to synchronize consumer offsets from the source cluster to the target cluster and MirrorHeartbeatConnector to ensure connectivity between the clusters.

Monitoring and metrics

MirrorMaker comes equipped with built-in metrics to monitor replication lag and other essential parameters. We integrated these metrics into our MirrorMaker setup, exporting them to Grafana for visualization. Since we have been using Grafana to monitor other systems, we decided to use it during migration as well. This enabled us to closely monitor the replication status during the migration process. The specific metrics we monitored will be described in more detail below.

Additionally, we monitored the MirrorCheckpointConnector included with MirrorMaker, as it periodically emits checkpoints in the destination cluster. These checkpoints contained offsets for each consumer group in the source cluster, ensuring seamless synchronization between the clusters.

Network layout

At Stitch Fix, we use several virtual private clouds (VPCs) through Amazon Virtual Private Cloud (Amazon VPC) for environment isolation in each of our AWS accounts. We have been using separate production and staging VPCs since we initially started using AWS. When necessary, peering of VPCs across accounts is handled through AWS Transit Gateway. To maintain the strong isolation between environments we have been using all along, we created separate MSK clusters in their respective VPCs for production and staging environments.

Side note: It will be easier now to quickly connect Kafka clients hosted in different virtual private clouds with recently announced Amazon MSK multi-VPC private connectivity, which was not available at the time of our migration.

Migration steps: High-level overview

In this section, we outline the high-level sequence of events for the migration process.

Kafka Connect setup and MM2 deploy

First, we deployed a new Kafka Connect cluster on an Amazon Elastic Compute Cloud (Amazon EC2) cluster as an intermediary between the existing Kafka cluster and the new MSK cluster. Next, we deployed the 3 MirrorMaker connectors to this Kafka Connect cluster. Initially, this cluster was configured to mirror all the existing topics and their configurations into the destination MSK cluster. (We eventually changed this configuration to be more granular, as described in the “Navigating challenges and lessons learned” section below.)

Monitor replication progress with MM metrics

Take advantage of the JMX metrics offered by MirrorMaker to monitor the progress of data replication. In addition to comprehensive metrics, we primarily focused on key metrics, namely replication-latency-ms and checkpoint-latency-ms. These metrics provide invaluable insights into the replication status, including crucial aspects such as replication lag and checkpoint latency. By seamlessly exporting these metrics to Grafana, you gain the ability to visualize and closely track the progress of replication, ensuring the successful reproduction of both historical and new data by MirrorMaker.

Evaluate usage metrics and provisioning

Analyze the usage metrics of the new MSK cluster to ensure proper provisioning. Consider factors such as storage, throughput, and performance. If required, resize the cluster to meet the observed usage patterns. While resizing may introduce additional time to the migration process, it is a cost-effective measure in the long run.

Sync consumer offsets between source and target clusters

Ensure that consumer offsets are synchronized between the source in-house clusters and the target MSK clusters. Once the consumer offsets are in sync, redirect the consumers of the existing in-house clusters to consume data from the new MSK cluster. This step ensures a seamless transition for consumers and allows uninterrupted data flow during the migration.

Update producer applications

After confirming that all consumers are successfully consuming data from the new MSK cluster, update the producer applications to write data directly to the new cluster. This final step completes the migration process, ensuring that all data is now being written to the new MSK cluster and taking full advantage of its capabilities.

Navigating challenges and lessons learned

During our migration, we encountered three challenges that required careful attention: scalable storage, more granular configuration of replication configuration, and memory allocation.

Initially, we faced issues with auto scaling Amazon MSK storage. We learned storage auto scaling requires a 24-hour cool-off period before another scaling event can occur. We observed this when migrating the logging cluster, and we applied our learnings from this and factored in the cool-off period during production cluster migration.

Additionally, to optimize MirrorMaker replication speed, we updated the original configuration to divide the replication jobs into batches based on volume and allocated more tasks to high-volume topics.

During the initial phase, we initiated replication using a single connector to transfer all topics from the source to target clusters, encompassing a significant number of tasks. However, we encountered challenges such as increasing replication lag for high-volume topics and slower replication for specific topics. Upon careful examination of the metrics, we adopted an alternative approach by segregating high-volume topics into multiple connectors. In essence, we divided the topics into categories of high, medium, and low volumes, assigning them to respective connectors and adjusting the number of tasks based on replication latency. This strategic adjustment yielded positive outcomes, allowing us to achieve faster and more efficient data replication across the board.

Lastly, we encountered Java virtual machine heap memory exhaustion, resulting in missing metrics while running MirrorMaker replication. To address this, we increased memory allocation and restarted the MirrorMaker process.

Conclusion

Stitch Fix’s migration from self-managed Kafka to Amazon MSK has allowed us to shift our focus from maintenance tasks to delivering value for our customers. It has reduced our infrastructure costs by 40 percent and given us the confidence that we can easily scale the clusters in the future if needed. By strategically planning the migration and using Apache Kafka MirrorMaker, we achieved a seamless transition while ensuring high availability. The integration of monitoring and metrics provided valuable insights during the migration process, and Stitch Fix successfully navigated challenges along the way. The migration to Amazon MSK has empowered Stitch Fix to maximize the capabilities of Kafka while benefiting from the expertise of Amazon engineers, setting the stage for continued growth and innovation.

Further reading


About the Authors

Karthik Kondamudi is an Engineering Manager in the Data and ML Platform Group at StitchFix. His interests lie in Distributed Systems and large-scale data processing. Beyond work, he enjoys spending time with family and hiking. A dog lover, he’s also passionate about sports, particularly cricket, tennis, and football.

Jenny Thompson is a Data Platform Engineer at Stitch Fix. She works on a variety of systems for Data Scientists, and enjoys making things clean, simple, and easy to use. She also likes making pancakes and Pavlova, browsing for furniture on Craigslist, and getting rained on during picnics.

Rahul Nammireddy is a Senior Solutions Architect at AWS, focusses on guiding digital native customers through their cloud native transformation. With a passion for AI/ML technologies, he works with customers in industries such as retail and telecom, helping them innovate at a rapid pace. Throughout his 23+ years career, Rahul has held key technical leadership roles in a diverse range of companies, from startups to publicly listed organizations, showcasing his expertise as a builder and driving innovation. In his spare time, he enjoys watching football and playing cricket.

Todd McGrath is a data streaming specialist at Amazon Web Services where he advises customers on their streaming strategies, integration, architecture, and solutions. On the personal side, he enjoys watching and supporting his 3 teenagers in their preferred activities as well as following his own pursuits such as fishing, pickleball, ice hockey, and happy hour with friends and family on pontoon boats. Connect with him on LinkedIn.

Externalize Amazon MSK Connect configurations with Terraform

Post Syndicated from Ramc Venkatasamy original https://aws.amazon.com/blogs/big-data/externalize-amazon-msk-connect-configurations-with-terraform/

Managing configurations for Amazon MSK Connect, a feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK), can become challenging, especially as the number of topics and configurations grows. In this post, we address this complexity by using Terraform to optimize the configuration of the Kafka topic to Amazon S3 Sink connector. By adopting this strategic approach, you can establish a robust and automated mechanism for handling MSK Connect configurations, eliminating the need for manual intervention or connector restarts. This efficient solution will save time, reduce errors, and provide better control over your Kafka data streaming processes. Let’s explore how Terraform can simplify and enhance the management of MSK Connect configurations for seamless integration with your infrastructure.

Solution overview

At a well-known AWS customer, the management of their constantly growing MSK Connect S3 Sink connector topics has become a significant challenge. The challenges lie in the overhead of managing configurations, as well as dealing with patching and upgrades. Manually handling Kubernetes (K8s) configs and restarting connectors can be cumbersome and error-prone, making it difficult to keep track of changes and updates. At the time of writing this post, MSK Connect does not offer native mechanisms to easily externalize the Kafka topic to S3 Sink configuration.

To address these challenges, we introduce Terraform, an infrastructure as code (IaC) tool. Terraform’s declarative approach and extensive ecosystem make it an ideal choice for managing MSK Connect configurations.

By externalizing Kafka topic to S3 configurations, organizations can achieve the following:

  • Scalability – Effortlessly manage a growing number of topics, ensuring the system can handle increasing data volumes without difficulty
  • Flexibility – Seamlessly integrate MSK Connect configurations with other infrastructure components and services, enabling adaptability to changing business needs
  • Automation – Automate the deployment and management of MSK Connect configurations, reducing manual intervention and streamlining operational tasks
  • Centralized management – Achieve improved governance with centralized management, version control, auditing, and change tracking, ensuring better control and visibility over the configurations

In the following sections, we provide a detailed guide on establishing Terraform for MSK Connect configuration management, defining and decentralizing Topic configurations, and deploying and updating configurations using Terraform.

Prerequisites

Before proceeding with the solution, ensure you have the following resources and access:

  • You need access to an AWS account with sufficient permissions to create and manage resources, including AWS Identity and Access Management (IAM) roles and MSK clusters.
  • To simplify the setup, use the provided AWS CloudFormation template. This template will create the necessary MSK cluster and required resources for this post.
  • For this post, we are using the latest Terraform version (1.5.6).

By ensuring you have these prerequisites in place, you will be ready to follow the instructions and streamline your MSK Connect configurations with Terraform. Let’s get started!

Setup

Setting up Terraform for MSK Connect configuration management includes the following:

  • Installation of Terraform and setting up the environment
  • Setting up the necessary authentication and permissions

Defining and decentralizing topic configurations using Terraform includes the following:

  • Understanding the structure of Terraform configuration files
  • Determining the required variables and resources
  • Utilizing Terraform’s modules and interpolation for flexibility

The decision to externalize the configuration was primarily driven by the customer’s business requirement. They anticipated the need to add topics periodically and wanted to avoid the need to bring down and write specific code each time. Given the limitations of MSK Connect (as of this writing), it’s important to note that MSK Connect can handle up to 300 workers. For this proof of concept (POC), we opted for a configuration with 100 topics directed to a single Amazon Simple Storage Service (Amazon S3) bucket. To ensure compatibility within the 300-worker limit, we set the MCU count to 1 and configured auto scaling with a maximum of 2 workers. This ensures that the configuration remains within the bounds of the 300-worker maximum.

To make the configuration more flexible, we specify the variables that can be utilized in the code.(variables.tf):

variable "aws_region" {
description = "The AWS region to deploy resources in."
type = string
}

variable "s3_bucket_name" {
description = "s3_bucket_name."
type = string
}

variable "topics" {
description = "topics"
type = string
}

variable "msk_connect_name" {
description = "Name of the MSK Connect instance."
type = string
}

variable "msk_connect_description" {
description = "Description of the MSK Connect instance."
type = string
}

# Rest of the variables...

To set up the AWS MSK Connector for the S3 Sink, we need to provide various configurations. Let’s examine the connector_configuration block in the code snippet provided in the main.tf file in more detail:

connector_configuration = {
"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"s3.region" = "us-east-1"
"flush.size" = "5"
"schema.compatibility" = "NONE"
"tasks.max" = "1"
"topics" = var.topics
"format.class" = "io.confluent.connect.s3.format.json.JsonFormat"
"partitioner.class" = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"value.converter.schemas.enable" = "false"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"storage.class" = "io.confluent.connect.s3.storage.S3Storage"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"s3.bucket.name" = var.s3_bucket_name
"topics.dir" = "cxdl-data/KairosTelemetry"
}

The kafka_cluster block in the code snippet defines the Kafka cluster details, including the bootstrap servers and VPC settings. You can reference the variables to specify the appropriate values:

kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = var.bootstrap_servers

vpc {
security_groups = [var.security_groups]
subnets = [var.aws_subnet_example1_id, var.aws_subnet_example2_id, var.aws_subnet_example3_id]
}
}
}

To secure the connection between Kafka and the connector, the code snippet includes configurations for authentication and encryption:

  • The kafka_cluster_client_authentication block sets the authentication type to IAM, enabling the use of IAM for authentication
  • The kafka_cluster_encryption_in_transit block enables TLS encryption for data transfer between Kafka and the connector
  kafka_cluster_client_authentication {
    authentication_type = "IAM"
  }

  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }

You can externalize the variables and provide dynamic values using a var.tfvars file. Let’s assume the content of the var.tfvars file is as follows:

aws_region = "us-east-1"
msk_connect_name = "confluentinc-MSK-connect-s3-2"
msk_connect_description = "My MSK Connect instance"
s3_bucket_name = "msk-lab-xxxxxxxxxxxx-target-bucket"
topics = "salesdb.salesdb.CUSTOMER,salesdb.salesdb.CUSTOMER_SITE,salesdb.salesdb.PRODUCT,salesdb.salesdb.PRODUCT_CATEGORY,salesdb.salesdb.SALES_ORDER,salesdb.salesdb.SALES_ORDER_ALL,salesdb.salesdb.SALES_ORDER_DETAIL,salesdb.salesdb.SALES_ORDER_DETAIL_DS,salesdb.salesdb.SUPPLIER"
bootstrap_servers = "b-2.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098,b-3.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098,b-1.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098“
aws_subnet_example1_id = "subnet-016ef7bb5f5db5759"
aws_subnet_example2_id = "subnet-0114c390d379134fa"
aws_subnet_example3_id = "subnet-0f6352ad89a1454f2"
security_groups = "sg-07eb8f8e4559334e7"
aws_mskconnect_custom_plugin_example_arn = "arn:aws:kafkaconnect:us-east-1:xxxxxxxxxxxx:custom-plugin/confluentinc-kafka-connect-s3-10-0-3/e9aeb52e-d172-4dba-9de5-f5cf73f1cb9e-2"
aws_mskconnect_custom_plugin_example_latest_revision = "1"
aws_iam_role_example_arn = "arn:aws:iam::xxxxxxxxxxxx:role/msk-connect-lab-S3ConnectorIAMRole-3LBTU7YAV9CM"

Deploy and update configurations using Terraform

Once you’ve defined your MSK Connect infrastructure using Terraform, applying these configurations is a straightforward process for creating or updating your infrastructure. This becomes particularly convenient when a new topic needs to be added. Thanks to the externalized configuration, incorporating this change is now a seamless task. The steps are as follows:

  1. Download and install Terraform from the official website (https://www.terraform.io/downloads.html) for your operating system.
  2. Confirm the installation by running the terraform version command on your command line interface.
  3. Ensure that you have configured your AWS credentials using the AWS Command Line Interface (AWS CLI) or by setting environment variables. You can use the aws configure command to configure your credentials if you’re using the AWS CLI.
  4. Place the main.tf, variables.tf, and var.tfvars files in the same Terraform directory.
  5. Open a command line interface, navigate to the directory containing the Terraform files, and run the command terraform init to initialize Terraform and download the required providers.
  6. Run the command terraform plan -var-file="var.tfvars" to review the run plan.

This command shows the changes that Terraform will make to the infrastructure based on the provided variables. This step is optional but is often used as a preview of the changes Terraform will make.

  1. If the plan looks correct, run the command terraform apply -var-file="var.tfvars" to apply the configuration.

Terraform will create the MSK_Connect in your AWS account. This will prompt you for confirmation before proceeding.

  1. After the terraform apply command is complete, verify the infrastructure has been created or updated on the console.
  2. For any changes or updates, modify your Terraform files (main.tf, variables.tf, var.tfvars) as needed, and then rerun the terraform plan and terraform apply commands.
  3. When you no longer need the infrastructure, you can use terraform destroy -var-file="var.tfvars" to remove all resources created by your Terraform files.

Be careful with this command because it will delete all the resources defined in your Terraform files.

Conclusion

In this post, we addressed the challenges faced by a customer in managing MSK Connect configurations and described a Terraform-based solution. By externalizing Kafka topic to Amazon S3 configurations, you can streamline your configuration management processes, achieve scalability, enhance flexibility, automate deployments, and centralize management. We encourage you to use Terraform to optimize your MSK Connect configurations and explore further possibilities in managing your streaming data pipelines efficiently.

To get started with externalizing MSK Connect configurations using Terraform, refer to the provided implementation steps and the Getting Started with Terraform guide, MSK Connect documentation, Terraform documentation, and example GitHub repository.

Using Terraform to externalize the Kafka topic to Amazon S3 Sink configuration in MSK Connect offers a powerful solution for managing and scaling your streaming data pipelines. By automating the deployment, updating, and central management of configurations, you can ensure efficiency, flexibility, and scalability in your data processing workflows.


About the Author

RamC Venkatasamy is a Solutions Architect based in Bloomington, Illinois. He helps AWS Strategic customers transform their businesses in the cloud. With a fervent enthusiasm for Serverless, Event-Driven Architecture and GenAI.

Securely process near-real-time data from Amazon MSK Serverless using an AWS Glue streaming ETL job with IAM authentication

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/securely-process-near-real-time-data-from-amazon-msk-serverless-using-an-aws-glue-streaming-etl-job-with-iam-authentication/

Streaming data has become an indispensable resource for organizations worldwide because it offers real-time insights that are crucial for data analytics. The escalating velocity and magnitude of collected data has created a demand for real-time analytics. This data originates from diverse sources, including social media, sensors, logs, and clickstreams, among others. With streaming data, organizations gain a competitive edge by promptly responding to real-time events and making well-informed decisions.

In streaming applications, a prevalent approach involves ingesting data through Apache Kafka and processing it with Apache Spark Structured Streaming. However, managing, integrating, and authenticating the processing framework (Apache Spark Structured Streaming) with the ingesting framework (Kafka) poses significant challenges, necessitating a managed and serverless framework. For example, integrating and authenticating a client like Spark streaming with Kafka brokers and zookeepers using a manual TLS method requires certificate and keystore management, which is not an easy task and requires a good knowledge of TLS setup.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. In this post, we use Amazon MSK Serverless, a cluster type for Amazon MSK that makes it possible for you to run Apache Kafka without having to manage and scale cluster capacity. To further enhance security and streamline authentication and authorization processes, MSK Serverless enables you to handle both authentication and authorization using AWS Identity and Access Management (IAM) in your cluster. This integration eliminates the need for separate mechanisms for authentication and authorization, simplifying and strengthening data protection. For example, when a client tries to write to your cluster, MSK Serverless uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

To process data effectively, we use AWS Glue, a serverless data integration service that uses the Spark Structured Streaming framework and enables near-real-time data processing. An AWS Glue streaming job can handle large volumes of incoming data from MSK Serverless with IAM authentication. This powerful combination ensures that data is processed securely and swiftly.

The post demonstrates how to build an end-to-end implementation to process data from MSK Serverless using an AWS Glue streaming extract, transform, and load (ETL) job with IAM authentication to connect MSK Serverless from the AWS Glue job and query the data using Amazon Athena.

Solution overview

The following diagram illustrates the architecture that you implement in this post.

The workflow consists of the following steps:

  1. Create an MSK Serverless cluster with IAM authentication and an EC2 Kafka client as the producer to ingest sample data into a Kafka topic. For this post, we use the kafka-console-producer.sh Kafka console producer client.
  2. Set up an AWS Glue streaming ETL job to process the incoming data. This job extracts data from the Kafka topic, loads it into Amazon Simple Storage Service (Amazon S3), and creates a table in the AWS Glue Data Catalog. By continuously consuming data from the Kafka topic, the ETL job ensures it remains synchronized with the latest streaming data. Moreover, the job incorporates the checkpointing functionality, which tracks the processed records, enabling it to resume processing seamlessly from the point of interruption in the event of a job run failure.
  3. Following the data processing, the streaming job stores data in Amazon S3 and generates a Data Catalog table. This table acts as a metadata layer for the data. To interact with the data stored in Amazon S3, you can use Athena, a serverless and interactive query service. Athena enables the run of SQL-like queries on the data, facilitating seamless exploration and analysis.

For this post, we create the solution resources in the us-east-1 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of ingestion and processing part according to your use case and if you have requirements to create specific process resources only.

  • vpc-mskserverless-client.yaml – This template sets up data the ingestion service resources such as a VPC, MSK Serverless cluster, and S3 bucket
  • gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database, connection, and streaming job

Create data ingestion resources

The vpc-mskserverless-client.yaml stack creates a VPC, private and public subnets, security groups, S3 VPC Endpoint, MSK Serverless cluster, EC2 instance with Kafka client, and S3 bucket. To create the solution resources for data ingestion, complete the following steps:

  1. Launch the stack vpc-mskserverless-client using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.
Parameters Description Sample Value
EnvironmentName Environment name that is prefixed to resource names .
PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone .
PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone .
PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone .
PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone .
VpcCIDR IP range (CIDR notation) for this VPC .
InstanceType Instance type for the EC2 instance t2.micro
LatestAmiId AMI used for the EC2 instance /aws/service/ami-amazon-linux- latest/amzn2-ami-hvm-x86_64-gp2
  1. When the stack creation is complete, retrieve the EC2 instance PublicDNS from the vpc-mskserverless-client stack’s Outputs tab.

The stack creation process can take around 15 minutes to complete.

  1. On the Amazon EC2 console, access the EC2 instance that you created using the CloudFormation template.
  2. Choose the EC2 instance whose InstanceId is shown on the stack’s Outputs tab.

Next, you log in to the EC2 instance using Session Manager, a capability of AWS Systems Manager.

  1. On the Amazon EC2 console, select the instanceid and on the Session Manager tab, choose Connect.


After you log in to the EC2 instance, you create a Kafka topic in the MSK Serverless cluster from the EC2 instance.

  1. In the following export command, provide the MSKBootstrapServers value from the vpc-mskserverless- client stack output for your endpoint:
    $ sudo su – ec2-user
    $ BS=<your-msk-serverless-endpoint (e.g.) boot-xxxxxx.yy.kafka-serverless.us-east-1.a>

  2. Run the following command on the EC2 instance to create a topic called msk-serverless-blog. The Kafka client is already installed in the ec2-user home directory (/home/ec2-user).
    $ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-topics.sh \
    --bootstrap-server $BS \
    --command-config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
    --create –topic msk-serverless-blog \
    --partitions 1
    
    Created topic msk-serverless-blog

After you confirm the topic creation, you can push the data to the MSK Serverless.

  1. Run the following command on the EC2 instance to create a console producer to produce records to the Kafka topic. (For source data, we use nycflights.csv downloaded at the ec2-user home directory /home/ec2-user.)
$ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-console-producer.sh \
--broker-list $BS \
--producer.config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
--topic msk-serverless-blog < nycflights.csv

Next, you set up the data processing service resources, specifically AWS Glue components like the database, table, and streaming job to process the data.

Create data processing resources

The gluejob-setup.yaml CloudFormation template creates a database, table, AWS Glue connection, and AWS Glue streaming job. Retrieve the values for VpcId, GluePrivateSubnet, GlueconnectionSubnetAZ, SecurityGroup, S3BucketForOutput, and S3BucketForGlueScript from the vpc-mskserverless-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup:

  1. Provide parameter values as listed in the following table.
Parameters Description Sample value
EnvironmentName Environment name that is prefixed to resource names. Gluejob-setup
VpcId ID of the VPC for security group. Use the VPC ID created with the first stack. Refer to the first stack’s output.
GluePrivateSubnet Private subnet used for creating the AWS Glue connection. Refer to the first stack’s output.
SecurityGroupForGlueConnection Security group used by the AWS Glue connection. Refer to the first stack’s output.
GlueconnectionSubnetAZ Availability Zone for the first private subnet used for the AWS Glue connection. .
GlueDataBaseName Name of the AWS Glue Data Catalog database. glue_kafka_blog_db
GlueTableName Name of the AWS Glue Data Catalog table. blog_kafka_tbl
S3BucketNameForScript Bucket Name for Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws-gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
GlueWorkerType Worker type for AWS Glue job. For example, G.1X. G.1X
NumberOfWorkers Number of workers in the AWS Glue job. 3
S3BucketNameForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
TopicName MSK topic name that needs to be processed. msk-serverless-blog
MSKBootstrapServers Kafka bootstrap server. boot-30vvr5lg.c1.kafka-serverless.us- east-1.amazonaws.com:9098

The stack creation process can take around 1–2 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created a Kafka type AWS Glue connection, which consists of broker information like the MSK bootstrap server, topic name, and VPC in which the MSK Serverless cluster is created. Most importantly, it specifies the IAM authentication option, which helps AWS Glue authenticate and authorize using IAM authentication while consuming the data from the MSK topic. For further clarity, you can examine the AWS Glue connection and the associated AWS Glue table generated through AWS CloudFormation.

After successfully creating the CloudFormation stack, you can now proceed with processing data using the AWS Glue streaming job with IAM authentication.

Run the AWS Glue streaming job

To process the data from the MSK topic using the AWS Glue streaming job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue streaming job from the GlueJobName row. In the following screenshot, the name is GlueStreamingJob-glue-streaming-job.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue streaming job named GlueStreamingJob-glue-streaming-job.
  3. Choose the job name to open its details page.
  4. Choose Run to start the job.
  5. On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template outputs.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

  1. On the AWS Glue console, choose the AWS Glue streaming job you ran, then choose Stop job run.

Because this is a streaming job, it will continue to run indefinitely until manually stopped. After you verify the data is present in the S3 output bucket, you can stop the job to save cost.

Validate the data in Athena

After the AWS Glue streaming job has successfully created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database and table that the AWS Glue streaming job created.
  4. To validate the data, run the following query to find the flight number, origin, and destination that covered the highest distance in a year:
SELECT distinct(flight),distance,origin,dest,year from "glue_kafka_blog_db"."output" where distance= (select MAX(distance) from "glue_kafka_blog_db"."output")

The following screenshot shows the output of our example query.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-mskserverless-client.

Conclusion

In this post, we demonstrated a use case for building a serverless ETL pipeline for streaming with IAM authentication, which allows you to focus on the outcomes of your analytics. You can also modify the AWS Glue streaming ETL code in this post with transformations and mappings to ensure that only valid data gets loaded to Amazon S3. This solution enables you to harness the prowess of AWS Glue streaming, seamlessly integrated with MSK Serverless through the IAM authentication method. It’s time to act and revolutionize your streaming processes.

Appendix

This section provides more information about how to create the AWS Glue connection on the AWS Glue console, which helps establish the connection to the MSK Serverless cluster and allow the AWS Glue streaming job to authenticate and authorize using IAM authentication while consuming the data from the MSK topic.

  1. On the AWS Glue console, in the navigation pane, under Data catalog, choose Connections.
  2. Choose Create connection.
  3. For Connection name, enter a unique name for your connection.
  4. For Connection type, choose Kafka.
  5. For Connection access, select Amazon managed streaming for Apache Kafka (MSK).
  6. For Kafka bootstrap server URLs, enter a comma-separated list of bootstrap server URLs. Include the port number. For example, boot-xxxxxxxx.c2.kafka-serverless.us-east- 1.amazonaws.com:9098.

  1. For Authentication, choose IAM Authentication.
  2. Select Require SSL connection.
  3. For VPC, choose the VPC that contains your data source.
  4. For Subnet, choose the private subnet within your VPC.
  5. For Security groups, choose a security group to allow access to the data store in your VPC subnet.

Security groups are associated to the ENI attached to your subnet. You must choose at least one security group with a self-referencing inbound rule for all TCP ports.

  1. Choose Save changes.

After you create the AWS Glue connection, you can use the AWS Glue streaming job to consume data from the MSK topic using IAM authentication.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specialized in AWS Glue and Amazon Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS with a specialization in AWS Glue. He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS.

Build streaming data pipelines with Amazon MSK Serverless and IAM authentication

Post Syndicated from Marvin Gersho original https://aws.amazon.com/blogs/big-data/build-streaming-data-pipelines-with-amazon-msk-serverless-and-iam-authentication/

Currently, MSK Serverless only directly supports IAM for authentication using Java. This example shows how to use this mechanism. Additionally, it provides a pattern creating a proxy that can easily be integrated into solutions built in languages other than Java.

The rising trend in today’s tech landscape is the use of streaming data and event-oriented structures. They are being applied in numerous ways, including monitoring website traffic, tracking industrial Internet of Things (IoT) devices, analyzing video game player behavior, and managing data for cutting-edge analytics systems.

Apache Kafka, a top-tier open-source tool, is making waves in this domain. It’s widely adopted by numerous users for building fast and efficient data pipelines, analyzing streaming data, merging data from different sources, and supporting essential applications.

Amazon’s serverless Apache Kafka offering, Amazon Managed Streaming for Apache Kafka (Amazon MSK) Serverless, is attracting a lot of interest. It’s appreciated for its user-friendly approach, ability to scale automatically, and cost-saving benefits over other Kafka solutions. However, a hurdle encountered by many users is the requirement of MSK Serverless to use AWS Identity and Access Management (IAM) access control. At the time of writing, the Amazon MSK library for IAM is exclusive to Kafka libraries in Java, creating a challenge for users of other programming languages. In this post, we aim to address this issue and present how you can use Amazon API Gateway and AWS Lambda to navigate around this obstacle.

SASL/SCRAM authentication vs. IAM authentication

Compared to the traditional authentication methods like Salted Challenge Response Authentication Mechanism (SCRAM), the IAM extension into Apache Kafka through MSK Serverless provides a lot of benefits. Before we delve into those, it’s important to understand what SASL/SCRAM authentication is. Essentially, it’s a traditional method used to confirm a user’s identity before giving them access to a system. This process requires users or clients to provide a user name and password, which the system then cross-checks against stored credentials (for example, via AWS Secrets Manager) to decide whether or not access should be granted.

Compared to this approach, IAM simplifies permission management across AWS environments, enables the creation and strict enforcement of detailed permissions and policies, and uses temporary credentials rather than the typical user name and password authentication. Another benefit of using IAM is that you can use IAM for both authentication and authorization. If you use SASL/SCRAM, you have to additionally manage ACLs via a separate mechanism. In IAM, you can use the IAM policy attached to the IAM principal to define the fine-grained access control for that IAM principal. All of these improvements make the IAM integration a more efficient and secure solution for most use cases.

However, for applications not built in Java, utilizing MSK Serverless becomes tricky. The standard SASL/SCRAM authentication isn’t available, and non-Java Kafka libraries don’t have a way to use IAM access control. This calls for an alternative approach to connect to MSK Serverless clusters.

But there’s an alternative pattern. Without having to rewrite your existing application in Java, you can employ API Gateway and Lambda as a proxy in front of a cluster. They can handle API requests and relay them to Kafka topics instantly. API Gateway takes in producer requests and channels them to a Lambda function, written in Java using the Amazon MSK IAM library. It then communicates with the MSK Serverless Kafka topic using IAM access control. After the cluster receives the message, it can be further processed within the MSK Serverless setup.

You can also utilize Lambda on the consumer side of MSK Serverless topics, bypassing the Java requirement on the consumer side. You can do this by setting Amazon MSK as an event source for a Lambda function. When the Lambda function is triggered, the data sent to the function includes an array of records from the Kafka topic—no need for direct contact with Amazon MSK.

Solution overview

This example walks you through how to build a serverless real-time stream producer application using API Gateway and Lambda.

For testing, this post includes a sample AWS Cloud Development Kit (AWS CDK) application. This creates a demo environment, including an MSK Serverless cluster, three Lambda functions, and an API Gateway that consumes the messages from the Kafka topic.

The following diagram shows the architecture of the resulting application including its data flows.

The data flow contains the following steps:

  1. The infrastructure is defined in an AWS CDK application. By running this application, a set of AWS CloudFormation templates is created.
  2. AWS CloudFormation creates all infrastructure components, including a Lambda function that runs during the deployment process to create a topic in the MSK Serverless cluster and to retrieve the authentication endpoint needed for the producer Lambda function. On destruction of the CloudFormation stack, the same Lambda function gets triggered again to delete the topic from the cluster.
  3. An external application calls an API Gateway endpoint.
  4. API Gateway forwards the request to a Lambda function.
  5. The Lambda function acts as a Kafka producer and pushes the message to a Kafka topic using IAM authentication.
  6. The Lambda event source mapping mechanism triggers the Lambda consumer function and forwards the message to it.
  7. The Lambda consumer function logs the data to Amazon CloudWatch.

Note that we don’t need to worry about Availability Zones. MSK Serverless automatically replicates the data across multiple Availability Zones to ensure high availability of the data.

The demo additionally shows how to use Lambda Powertools for Java to streamline logging and tracing and the IAM authenticator for the simple authentication process outlined in the introduction.

The following sections take you through the steps to deploy, test, and observe the example application.

Prerequisites

The example has the following prerequisites:

  • An AWS account. If you haven’t signed up, complete the following steps:
  • The following software installed on your development machine, or use an AWS Cloud9 environment, which comes with all requirements preinstalled:
  • Appropriate AWS credentials for interacting with resources in your AWS account.

Deploy the solution

Complete the following steps to deploy the solution:

  1. Clone the project GitHub repository and change the directory to subfolder serverless-kafka-iac:
git clone https://github.com/aws-samples/apigateway-lambda-msk-serverless-integration
cd apigateway-lambda-msk-serverless-integration/serverless-kafka-iac
  1. Configure environment variables:
export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query 'Account' --output text)
export CDK_DEFAULT_REGION=$(aws configure get region)
  1. Prepare the virtual Python environment:
python3 -m venv .venv

source .venv/bin/activate

pip3 install -r requirements.txt
  1. Bootstrap your account for AWS CDK usage:
cdk bootstrap aws://$CDK_DEFAULT_ACCOUNT/$CDK_DEFAULT_REGION
  1. Run cdk synth to build the code and test the requirements (ensure docker daemon is running on your machine):
cdk synth
  1. Run cdk deploy to deploy the code to your AWS account:
cdk deploy --all

Test the solution

To test the solution, we generate messages for the Kafka topics by sending calls through the API Gateway from our development machine or AWS Cloud9 environment. We then go to the CloudWatch console to observe incoming messages in the log files of the Lambda consumer function.

  1. Open a terminal on your development machine to test the API with the Python script provided under /serverless_kafka_iac/test_api.py:
python3 test-api.py

  1. On the Lambda console, open the Lambda function named ServerlessKafkaConsumer.

  1. On the Monitor tab, choose View CloudWatch logs to access the logs of the Lambda function.

  1. Choose the latest log stream to access the log files of the last run.

You can review the log entry of the received Kafka messages in the log of the Lambda function.

Trace a request

All components integrate with AWS X-Ray. With AWS X-Ray, you can trace the entire application, which is useful to identify bottlenecks when load testing. You can also trace method runs at the Java method level.

Lambda Powertools for Java allows you to shortcut this process by adding the @Trace annotation to a method to see traces on the method level in X-Ray.

To trace a request end to end, complete the following steps:

  1. On the CloudWatch console, choose Service map in the navigation pane.
  2. Select a component to investigate (for example, the Lambda function where you deployed the Kafka producer).
  3. Choose View traces.

  1. Choose a single Lambda method invocation and investigate further at the Java method level.

Implement a Kafka producer in Lambda

Kafka natively supports Java. To stay open, cloud native, and without third-party dependencies, the producer is written in that language. Currently, the IAM authenticator is only available to Java. In this example, the Lambda handler receives a message from an API Gateway source and pushes this message to an MSK topic called messages.

Typically, Kafka producers are long-living and pushing a message to a Kafka topic is an asynchronous process. Because Lambda is ephemeral, you must enforce a full flush of a submitted message until the Lambda function ends by calling producer.flush():

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package software.amazon.samples.kafka.lambda;
 
// This class is part of the AWS samples package and specifically deals with Kafka integration in a Lambda function.
// It serves as a simple API Gateway to Kafka Proxy, accepting requests and forwarding them to a Kafka topic.
public class SimpleApiGatewayKafkaProxy implements RequestHandler<APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent> {
 
    // Specifies the name of the Kafka topic where the messages will be sent
    public static final String TOPIC_NAME = "messages";
 
    // Logger instance for logging events of this class
    private static final Logger log = LogManager.getLogger(SimpleApiGatewayKafkaProxy.class);
    
    // Factory to create properties for Kafka Producer
    public KafkaProducerPropertiesFactory kafkaProducerProperties = new KafkaProducerPropertiesFactoryImpl();
    
    // Instance of KafkaProducer
    private KafkaProducer<String, String>[KT1]  producer;
 
    // Overridden method from the RequestHandler interface to handle incoming API Gateway proxy events
    @Override
    @Tracing
    @Logging(logEvent = true)
    public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent input, Context context) {
        
        // Creating a response object to send back 
        APIGatewayProxyResponseEvent response = createEmptyResponse();
        try {
            // Extracting the message from the request body
            String message = getMessageBody(input);
 
            // Create a Kafka producer
            KafkaProducer<String, String> producer = createProducer();
 
            // Creating a record with topic name, request ID as key and message as value 
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME, context.getAwsRequestId(), message);
 
            // Sending the record to Kafka topic and getting the metadata of the record
            Future<RecordMetadata>[KT2]  send = producer.send(record);
            producer.flush();
 
            // Retrieve metadata about the sent record
            RecordMetadata metadata = send.get();
 
            // Logging the partition where the message was sent
            log.info(String.format("Message was send to partition %s", metadata.partition()));
 
            // If the message was successfully sent, return a 200 status code
            return response.withStatusCode(200).withBody("Message successfully pushed to kafka");
        } catch (Exception e) {
            // In case of exception, log the error message and return a 500 status code
            log.error(e.getMessage(), e);
            return response.withBody(e.getMessage()).withStatusCode(500);
        }
    }
 
    // Creates a Kafka producer if it doesn't already exist
    @Tracing
    private KafkaProducer<String, String> createProducer() {
        if (producer == null) {
            log.info("Connecting to kafka cluster");
            producer = new KafkaProducer<String, String>(kafkaProducerProperties.getProducerProperties());
        }
        return producer;
    }
 
    // Extracts the message from the request body. If it's base64 encoded, it's decoded first.
    private String getMessageBody(APIGatewayProxyRequestEvent input) {
        String body = input.getBody();
 
        if (input.getIsBase64Encoded()) {
            body = decode(body);
        }
        return body;
    }
 
    // Creates an empty API Gateway proxy response event with predefined headers.
    private APIGatewayProxyResponseEvent createEmptyResponse() {
        Map<String, String> headers = new HashMap<>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Custom-Header", "application/json");
        APIGatewayProxyResponseEvent response = new APIGatewayProxyResponseEvent().withHeaders(headers);
        return response;
    }
}

Connect to Amazon MSK using IAM authentication

This post uses IAM authentication to connect to the respective Kafka cluster. For information about how to configure the producer for connectivity, refer to IAM access control.

Because you configure the cluster via IAM, grant Connect and WriteData permissions to the producer so that it can push messages to Kafka:

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {            
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”
            ],
            “Resource”: “arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid “
        }
    ]
}
 
 
{
    “Version”: “2012-10-17”,
    “Statement”: [
        {            
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”,
                “kafka-cluster: DescribeTopic”,
            ],
            “Resource”: “arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name“
        }
    ]
}

This shows the Kafka excerpt of the IAM policy, which must be applied to the Kafka producer. When using IAM authentication, be aware of the current limits of IAM Kafka authentication, which affect the number of concurrent connections and IAM requests for a producer. Refer to Amazon MSK quota and follow the recommendation for authentication backoff in the producer client:

        Map<String, String> configuration = Map.of(
                “key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “bootstrap.servers”, getBootstrapServer(),
                “security.protocol”, “SASL_SSL”,
                “sasl.mechanism”, “AWS_MSK_IAM”,
                “sasl.jaas.config”, “software.amazon.msk.auth.iam.IAMLoginModule required;”,
                “sasl.client.callback.handler.class”,
				“software.amazon.msk.auth.iam.IAMClientCallbackHandler”,
                “connections.max.idle.ms”, “60”,
                “reconnect.backoff.ms”, “1000”
        );

Additional considerations

Each MSK Serverless cluster can handle 100 requests per second. To reduce IAM authentication requests from the Kafka producer, place it outside of the handler. For frequent calls, there is a chance that Lambda reuses the previously created class instance and only reruns the handler.

For bursting workloads with a high number of concurrent API Gateway requests, this can lead to dropped messages. Although this might be tolerable for some workloads, for others this might not be the case.

In these cases, you can extend the architecture with a buffering technology like Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis Data Streams between API Gateway and Lambda.

To reduce latency, reduce cold start times for Java by changing the tiered compilation level to 1, as described in Optimizing AWS Lambda function performance for Java. Provisioned concurrency ensures that polling Lambda functions don’t need to warm up before requests arrive.

Conclusion

In this post, we showed how to create a serverless integration Lambda function between API Gateway and MSK Serverless as a way to do IAM authentication when your producer is not written in Java. You also learned about the native integration of Lambda and Amazon MSK on the consumer side. Additionally, we showed how to deploy such an integration with the AWS CDK.

The general pattern is suitable for many use cases where you want to use IAM authentication but your producers or consumers are not written in Java, but you still want to take advantage of the benefits of MSK Serverless, like its ability to scale up and down with unpredictable or spikey workloads or its little to no operational overhead of running Apache Kafka.

You can also use MSK Serverless to reduce operational complexity by automating provisioning and the management of capacity needs, including the need to constantly monitor brokers and storage.

For more serverless learning resources, visit Serverless Land.

For more information on MSK Serverless, check out the following:


About the Authors

Philipp Klose is a Global Solutions Architect at AWS based in Munich. He works with enterprise FSI customers and helps them solve business problems by architecting serverless platforms. In this free time, Philipp spends time with his family and enjoys every geek hobby possible.

Daniel Wessendorf is a Global Solutions Architect at AWS based in Munich. He works with enterprise FSI customers and is primarily specialized in machine learning and data architectures. In his free time, he enjoys swimming, hiking, skiing, and spending quality time with his family.

Marvin Gersho is a Senior Solutions Architect at AWS based in New York City. He works with a wide range of startup customers. He previously worked for many years in engineering leadership and hands-on application development, and now focuses on helping customers architect secure and scalable workloads on AWS with a minimum of operational overhead. In his free time, Marvin enjoys cycling and strategy board games.

Nathan Lichtenstein is a Senior Solutions Architect at AWS based in New York City. Primarily working with startups, he ensures his customers build smart on AWS, delivering creative solutions to their complex technical challenges. Nathan has worked in cloud and network architecture in the media, financial services, and retail spaces. Outside of work, he can often be found at a Broadway theater.

Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion

Post Syndicated from Muthu Pitchaimani original https://aws.amazon.com/blogs/big-data/introducing-amazon-msk-as-a-source-for-amazon-opensearch-ingestion/

Ingesting a high volume of streaming data has been a defining characteristic of operational analytics workloads with Amazon OpenSearch Service. Many of these workloads involve either self-managed Apache Kafka or Amazon Managed Streaming for Apache Kafka (Amazon MSK) to satisfy their data streaming needs. Consuming data from Amazon MSK and writing to OpenSearch Service has been a challenge for customers. AWS Lambda, custom code, Kafka Connect, and Logstash have been used for ingesting this data. These methods involve tools that must be built and maintained. In this post, we introduce Amazon MSK as a source to Amazon OpenSearch Ingestion, a serverless, fully managed, real-time data collector for OpenSearch Service that makes this ingestion even easier.

Solution overview

The following diagram shows the flow from data sources to Amazon OpenSearch Service.

The flow contains the following steps:

  1. Data sources produce data and send that data to Amazon MSK
  2. OpenSearch Ingestion consumes the data from Amazon MSK.
  3. OpenSearch Ingestion transforms, enriches, and writes the data into OpenSearch Service.
  4. Users search, explore, and analyze the data with OpenSearch Dashboards.

Prerequisites

You will need a provisioned MSK cluster created with appropriate data sources. The sources, as producers, write data into Amazon MSK. The cluster should be created with the appropriate Availability Zone, storage, compute, security and other configurations to suit your workload needs. To provision your MSK cluster and have your sources producing data, see Getting started using Amazon MSK.

As of this writing, OpenSearch Ingestion supports Amazon MSK provisioned, but not Amazon MSK Serverless. However, OpenSearch Ingestion can reside in the same or different account where Amazon MSK is present. OpenSearch Ingestion uses AWS PrivateLink to read data, so you must turn on multi-VPC connectivity on your MSK cluster. For more information, see Amazon MSK multi-VPC private connectivity in a single Region. OpenSearch Ingestion can write data to Amazon Simple Storage Service (Amazon S3), provisioned OpenSearch Service, and Amazon OpenSearch Service. In this solution, we use a provisioned OpenSearch Service domain as a sink for OSI. Refer to Getting started with Amazon OpenSearch Service to create a provisioned OpenSearch Service domain. You will need appropriate permission to read data from Amazon MSK and write data to OpenSearch Service. The following sections outline the required permissions.

Permissions required

To read from Amazon MSK and write to Amazon OpenSearch Service, you need to create a an AWS Identity and Access Management (IAM) role used by Amazon OpenSearch Ingestion. In this post we use a role called pipeline-Role for this purpose. To create this role please see Creating IAM roles.

Reading from Amazon MSK

OpenSearch Ingestion will need permission to create a PrivateLink connection and other actions that can be performed on your MSK cluster. Edit your MSK cluster policy to include the following snippet with appropriate permissions. If your OpenSearch Ingestion pipeline resides in an account different from your MSK cluster, you will need a second section to allow this pipeline. Use proper semantic conventions when providing the cluster, topic, and group permissions and remove the comments from the policy before using.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "osis-pipelines.aws.internal"
      },
      "Action": [
        "kafka:CreateVpcConnection",
        "kafka:GetBootstrapBrokers",
        "kafka:DescribeCluster"
      ],
      # Change this to your msk arn
      "Resource": "arn:aws:kafka:us-east-1:XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx"
    },    
    ### Following permissions are required if msk cluster is in different account than osi pipeline
    {
      "Effect": "Allow",
      "Principal": {
        # Change this to your sts role arn used in the pipeline
        "AWS": "arn:aws:iam:: XXXXXXXXXXXX:role/PipelineRole"
      },
      "Action": [
        "kafka-cluster:*",
        "kafka:*"
      ],
      "Resource": [
        # Change this to your msk arn
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx",
        # Change this as per your cluster name & kafka topic name
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:topic/test-cluster/xxxxxxxx-xxxx-xx/*",
        # Change this as per your cluster name
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:group/test-cluster/*"
      ]
    }
  ]
}

Edit the pipeline role’s inline policy to include the following permissions. Ensure that you have removed the comments before using the policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": [
                # Change this to your msk arn
                "arn:aws:kafka:us-east-1:XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                # Change this to your kafka topic and cluster name
                "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:topic/test-cluster/xxxxxxxx-xxxx-xx/topic-to-consume"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                # change this as per your cluster name
                "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:group/test-cluster/*"
            ]
        }
    ]
}

Writing to OpenSearch Service

In this section, you provide the pipeline role with necessary permissions to write to OpenSearch Service. As a best practice, we recommend using fine-grained access control in OpenSearch Service. Use OpenSearch dashboards to map a pipeline role to an appropriate backend role. For more information on mapping roles to users, see Managing permissions. For example, all_access is a built-in role that grants administrative permission to all OpenSearch functions. When deploying to a production environment, ensure that you use a role with enough permissions to write to your OpenSearch domain.

Creating OpenSearch Ingestion pipelines

The pipeline role now has the correct set of permissions to read from Amazon MSK and write to OpenSearch Service. Navigate to the OpenSearch Service console, choose Pipelines, then choose Create pipeline.

Choose a suitable name for the pipeline. and se the pipeline capacity with appropriate minimum and maximum OpenSearch Compute Unit (OCU). Then choose ‘AWS-MSKPipeline’ from the dropdown menu as shown below.

Use the provided template to fill in all the required fields. The snippet in the following section shows the fields that needs to be filled in red.

Configuring Amazon MSK source

The following sample configuration snippet shows every setting you need to get the pipeline running:

msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                     # Default is false  
      topics: 
         - name: "<topic name>" 
           group_id: "<consumer group id>" 
           serde_format: json                   # Remove, if Schema Registry is used. (Other option is plaintext)  
 
           # Below defaults can be tuned as needed 
           # fetch_max_bytes: 52428800          Optional 
           # fetch_max_wait: 500                Optional (in msecs) 
           # fetch_min_bytes: 1                 Optional (in MB) 
           # max_partition_fetch_bytes: 1048576 Optional 
           # consumer_max_poll_records: 500     Optional                                
           # auto_offset_reset: "earliest"      Optional (other option is "earliest") 
           # key_mode: include_as_field         Optional (other options are include_as_field, discard)  
 
       
           serde_format: json                   # Remove, if Schema Registry is used. (Other option is plaintext)   
 
      # Enable this configuration if Glue schema registry is used            
      # schema:                                 
      #   type: aws_glue 
 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        # sts_role_arn: "arn:aws:iam::XXXXXXXXXXXX:role/Example-Role" 
        # Provide the region of the domain. 
        # region: "us-west-2" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-west-2:XXXXXXXXXXXX:cluster/msk-prov-1/id" 
 
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          # hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
          # sts_role_arn: "arn:aws:iam::XXXXXXXXXXXX:role/Example-Role" 
          # Provide the region of the domain. 
          # region: "us-east-1" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_${getMetadata(\"kafka_topic\")}-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          # distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          # dlq: 
            # s3: 
            # Provide an S3 bucket 

We use the following parameters:

  • acknowledgements – Set to true for OpenSearch Ingestion to ensure that the data is delivered to the sinks before committing the offsets in Amazon MSK. The default value is set to false.
  • name – This specifies topic OpenSearch Ingestion can read from. You can read a maximum of four topics per pipeline.
  • group_id – This parameter specifies that the pipeline is part of the consumer group. With this setting, a single consumer group can be scaled to as many pipelines as needed for very high throughput.
  • serde_format – Specifies a deserialization method to be used for the data read from Amazon MSK. The options are JSON and plaintext.
  • AWS sts_role_arn and OpenSearch sts_role_arn – Specifies the role OpenSearch Ingestion uses for reading and writing. Specify the ARN of the role you created from the last section. OpenSearch Ingestion currently uses the same role for reading and writing.
  • MSK arn – Specifies the MSK cluster to consume data from.
  • OpenSearch host and index – Specifies the OpenSearch domain URL and where the index should write.

When you have configured the Kafka source, choose the network access type and log publishing options. Public pipelines do not involve PrivateLink and they will not incur a cost associated with PrivateLink. Choose Next and review all configurations. When you are satisfied, choose Create pipeline.

Log in to OpenSearch Dashboards to see your indexes and search the data.

Recommended compute units (OCUs) for the MSK pipeline

Each compute unit has one consumer per topic. Brokers will balance partitions among these consumers for a given topic. However, when the number of partitions is greater than the number of consumers, Amazon MSK will host multiple partitions on every consumer. OpenSearch Ingestion has built-in auto scaling to scale up or down based on CPU usage or number of pending records in the pipeline. For optimal performance, partitions should be distributed across many compute units for parallel processing. If topics have a large number of partitions, for example, more than 96 (maximum OCUs per pipeline), we recommend configuring a pipeline with 1–96 OCUs because it will auto scale as needed. If a topic has a low number of partitions, for example, less than 96, then keep the maximum compute unit to same as the number of partitions. When pipeline has more than one topic, user can pick a topic with highest number of partitions as a reference to configure maximum computes units. By adding another pipeline with a new set of OCUs to the same topic and consumer group, you can scale the throughput almost linearly.

Clean up

To avoid future charges, clean up any unused resources from your AWS account.

Conclusion

In this post, you saw how to use Amazon MSK as a source for OpenSearch Ingestion. This not only addresses the ease of data consumption from Amazon MSK, but it also relieves you of the burden of self-managing and manually scaling consumers for varying and unpredictable high-speed, streaming operational analytics data. Please refer to the ‘sources’ list under ‘supported plugins’ section for exhaustive list of sources from which you can ingest data.


About the authors

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Arjun Nambiar is a Product Manager with Amazon OpenSearch Service. He focusses on ingestion technologies that enable ingesting data from a wide variety of sources into Amazon OpenSearch Service at scale. Arjun is interested in large scale distributed systems and cloud-native technologies and is based out of Seattle, Washington.

Raj Sharma is a Sr. SDM with Amazon OpenSearch Service. He builds large-scale distributed applications and solutions. Raj is interested in the topics of Analytics, databases, networking and security, and is based out of Palo Alto, California.

Best practices for implementing event-driven architectures in your organization

Post Syndicated from Emanuele Levi original https://aws.amazon.com/blogs/architecture/best-practices-for-implementing-event-driven-architectures-in-your-organization/

Event-driven architectures (EDA) are made up of components that detect business actions and changes in state, and encode this information in event notifications. Event-driven patterns are becoming more widespread in modern architectures because:

  • they are the main invocation mechanism in serverless patterns.
  • they are the preferred pattern for decoupling microservices, where asynchronous communications and event persistence are paramount.
  • they are widely adopted as a loose-coupling mechanism between systems in different business domains, such as third-party or on-premises systems.

Event-driven patterns have the advantage of enabling team independence through the decoupling and decentralization of responsibilities. This decentralization trend in turn, permits companies to move with unprecedented agility, enhancing feature development velocity.

In this blog, we’ll explore the crucial components and architectural decisions you should consider when adopting event-driven patterns, and provide some guidance on organizational structures.

Division of responsibilities

The communications flow in EDA (see What is EDA?) is initiated by the occurrence of an event. Most production-grade event-driven implementations have three main components, as shown in Figure 1: producers, message brokers, and consumers.

Three main components of an event-driven architecture

Figure 1. Three main components of an event-driven architecture

Producers, message brokers, and consumers typically assume the following roles:

Producers

Producers are responsible for publishing the events as they happen. They are the owners of the event schema (data structure) and semantics (meaning of the fields, such as the meaning of the value of an enum field). As this is the only contract (coupling) between producers and the downstream components of the system, the schema and its semantics are crucial in EDA. Producers are responsible for implementing a change management process, which involves both non-breaking and breaking changes. With introduction of breaking changes, consumers are able to negotiate the migration process with producers.

Producers are “consumer agnostic”, as their boundary of responsibility ends when an event is published.

Message brokers

Message brokers are responsible for the durability of the events, and will keep an event available for consumption until it is successfully processed. Message brokers ensure that producers are able to publish events for consumers to consume, and they regulate access and permissions to publish and consume messages.

Message brokers are largely “events agnostic”, and do not generally access or interpret the event content. However, some systems provide a routing mechanism based on the event payload or metadata.

Consumers

Consumers are responsible for consuming events, and own the semantics of the effect of events. Consumers are usually bounded to one business context. This means the same event will have different effect semantics for different consumers. Crucial architectural choices when implementing a consumer involve the handling of unsuccessful message deliveries or duplicate messages. Depending on the business interpretation of the event, when recovering from failure a consumer might permit duplicate events, such as with an idempotent consumer pattern.

Crucially, consumers are “producer agnostic”, and their boundary of responsibility begins when an event is ready for consumption. This allows new consumers to onboard into the system without changing the producer contracts.

Team independence

In order to enforce the division of responsibilities, companies should organize their technical teams by ownership of producers, message brokers, and consumers. Although the ownership of producers and consumers is straightforward in an EDA implementation, the ownership of the message broker may not be. Different approaches can be taken to identify message broker ownership depending on your organizational structure.

Decentralized ownership

Ownership of the message broker in a decentralized ownership organizational structure

Figure 2. Ownership of the message broker in a decentralized ownership organizational structure

In a decentralized ownership organizational structure (see Figure 2), the teams producing events are responsible for managing their own message brokers and the durability and availability of the events for consumers.

The adoption of topic fanout patterns based on Amazon Simple Queue Service (SQS) and Amazon Simple Notification Service (SNS) (see Figure 3), can help companies implement a decentralized ownership pattern. A bus-based pattern using Amazon EventBridge can also be similarly utilized (see Figure 4).

Topic fanout pattern based on Amazon SQS and Amazon SNS

Figure 3. Topic fanout pattern based on Amazon SQS and Amazon SNS

Events bus pattern based on Amazon EventBridge

Figure 4. Events bus pattern based on Amazon EventBridge

The decentralized ownership approach has the advantage of promoting team independence, but it is not a fit for every organization. In order to be implemented effectively, a well-established DevOps culture is necessary. In this scenario, the producing teams are responsible for managing the message broker infrastructure and the non-functional requirements standards.

Centralized ownership

Ownership of the message broker in a centralized ownership organizational structure

Figure 5. Ownership of the message broker in a centralized ownership organizational structure

In a centralized ownership organizational structure, a central team (we’ll call it the platform team) is responsible for the management of the message broker (see Figure 5). Having a specialized platform team offers the advantage of standardized implementation of non-functional requirements, such as reliability, availability, and security. One disadvantage is that the platform team is a single point of failure in both the development and deployment lifecycle. This could become a bottleneck and put team independence and operational efficiency at risk.

Streaming pattern based on Amazon MSK and Kinesis Data Streams

Figure 6. Streaming pattern based on Amazon MSK and Kinesis Data Streams

On top of the implementation patterns mentioned in the previous section, the presence of a dedicated team makes it easier to implement streaming patterns. In this case, a deeper understanding on how the data is partitioned and how the system scales is required. Streaming patterns can be implemented using services such as Amazon Managed Streaming for Apache Kafka (MSK) or Amazon Kinesis Data Streams (see Figure 6).

Best practices for implementing event-driven architectures in your organization

The centralized and decentralized ownership organizational structures enhance team independence or standardization of non-functional requirements respectively. However, they introduce possible limits to the growth of the engineering function in a company. Inspired by the two approaches, you can implement a set of best practices which are aimed at minimizing those limitations.

Best practices for implementing event-driven architectures

Figure 7. Best practices for implementing event-driven architectures

  1. Introduce a cloud center of excellence (CCoE). A CCoE standardizes non-functional implementation across engineering teams. In order to promote a strong DevOps culture, the CCoE should not take the form of an external independent team, but rather be a collection of individual members representing the various engineering teams.
  2. Decentralize team ownership. Decentralize ownership and maintenance of the message broker to producing teams. This will maximize team independence and agility. It empowers the team to use the right tool for the right job, as long as they conform to the CCoE guidelines.
  3. Centralize logging standards and observability strategies. Although it is a best practice to decentralize team ownership of the components of an event-driven architecture, logging standards and observability strategies should be centralized and standardized across the engineering function. This centralization provides for end-to-end tracing of requests and events, which are powerful diagnosis tools in case of any failure.

Conclusion

In this post, we have described the main architectural components of an event-driven architecture, and identified the ownership of the message broker as one of the most important architectural choices you can make. We have described a centralized and decentralized organizational approach, presenting the strengths of the two approaches, as well as the limits they impose on the growth of your engineering organization. We have provided some best practices you can implement in your organization to minimize these limitations.

Further reading:
To start your journey building event-driven architectures in AWS, explore the following:

Orca Security’s journey to a petabyte-scale data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/orca-securitys-journey-to-a-petabyte-scale-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Eliad Gat and Oded Lifshiz from Orca Security.

With data becoming the driving force behind many industries today, having a modern data architecture is pivotal for organizations to be successful. One key component that plays a central role in modern data architectures is the data lake, which allows organizations to store and analyze large amounts of data in a cost-effective manner and run advanced analytics and machine learning (ML) at scale.

Orca Security is an industry-leading Cloud Security Platform that identifies, prioritizes, and remediates security risks and compliance issues across your AWS Cloud estate. Orca connects to your environment in minutes with patented SideScanning technology to provide complete coverage across vulnerabilities, malware, misconfigurations, lateral movement risk, weak and leaked passwords, overly permissive identities, and more.

The Orca Platform is powered by a state-of-the-art anomaly detection system that uses cutting-edge ML algorithms and big data capabilities to detect potential security threats and alert customers in real time, ensuring maximum security for their cloud environment. At the core of Orca’s anomaly detection system is its transactional data lake, which enables the company’s data scientists, analysts, data engineers, and ML specialists to extract valuable insights from vast amounts of data and deliver innovative cloud security solutions to its customers.

In this post, we describe Orca’s journey building a transactional data lake using Amazon Simple Storage Service (Amazon S3), Apache Iceberg, and AWS Analytics. We explore why Orca chose to build a transactional data lake and examine the key considerations that guided the selection of Apache Iceberg as the preferred table format.

In addition, we describe the Orca Platform architecture and the technologies used. Lastly, we discuss the challenges encountered throughout the project, present the solutions used to address them, and share valuable lessons learned.

Why did Orca build a data lake?

Prior to the creation of the data lake, Orca’s data was distributed among various data silos, each owned by a different team with its own data pipelines and technology stack. This setup led to several issues, including scaling difficulties as the data size grew, maintaining data quality, ensuring consistent and reliable data access, high costs associated with storage and processing, and difficulties supporting streaming use cases. Moreover, running advanced analytics and ML on disparate data sources proved challenging. To overcome these issues, Orca decided to build a data lake.

A data lake is a centralized data repository that enables organizations to store and manage large volumes of structured and unstructured data, eliminating data silos and facilitating advanced analytics and ML on the entire data. By decoupling storage and compute, data lakes promote cost-effective storage and processing of big data.

Why did Orca choose Apache Iceberg?

Orca considered several table formats that have evolved in recent years to support its transactional data lake. Amongst the options, Apache Iceberg stood out as the ideal choice because it met all of Orca’s requirements.

First, Orca sought a transactional table format that ensures data consistency and fault tolerance. Apache Iceberg’s transactional and ACID guarantees, which allow concurrent read and write operations while ensuring data consistency and simplified fault handling, fulfill this requirement. Furthermore, Apache Iceberg’s support for time travel and rollback capabilities makes it highly suitable for addressing data quality issues by reverting to a previous state in a consistent manner.

Second, a key requirement was to adopt an open table format that integrates with various processing engines. This was to avoid vendor lock-in and allow teams to choose the processing engine that best suits their needs. Apache Iceberg’s engine-agnostic and open design meets this requirement by supporting all popular processing engines, including Apache Spark, Amazon Athena, Apache Flink, Trino, Presto, and more.

In addition, given the substantial data volumes handled by the system, an efficient table format was required that can support querying petabytes of data very fast. Apache Iceberg’s architecture addresses this need by efficiently filtering and reducing scanned data, resulting in accelerated query times.

An additional requirement was to allow seamless schema changes without impacting end-users. Apache Iceberg’s range of features, including schema evolution, hidden partitions, and partition evolution, addresses this requirement.

Lastly, it was important for Orca to choose a table format that is widely adopted. Apache Iceberg’s growing and active community aligned with the requirement for a popular and community-backed table format.

Solution overview

Orca’s data lake is based on open-source technologies that seamlessly integrate with Apache Iceberg. The system ingests data from various sources such as cloud resources, cloud activity logs, and API access logs, and processes billions of messages, resulting in terabytes of data daily. This data is sent to Apache Kafka, which is hosted on Amazon Managed Streaming for Apache Kafka (Amazon MSK). It is then processed using Apache Spark Structured Streaming running on Amazon EMR and stored in the data lake. Amazon EMR streamlines the process of loading all required Iceberg packages and dependencies, ensuring that the data is stored in Apache Iceberg format and ready for consumption as quickly as possible.

The data lake is built on top of Amazon S3 using Apache Iceberg table format with Apache Parquet as the underlying file format. In addition, the AWS Glue Data Catalog enables data discovery, and AWS Identity and Access Management (IAM) enforces secure access controls for the lake and its operations.

The data lake serves as the foundation for a variety of capabilities that are supported by different engines.

Data pipelines built on Apache Spark and Athena SQL analyze and process the data stored in the data lake. These data pipelines generate valuable insights and curated data that are stored in Apache Iceberg tables for downstream usage. This data is then used by various applications for streaming analytics, business intelligence, and reporting.

Amazon SageMaker is used to build, train, and deploy a range of ML models. Specifically, the system uses Amazon SageMaker Processing jobs to process the data stored in the data lake, employing the AWS SDK for Pandas (previously known as AWS Wrangler) for various data transformation operations, including cleaning, normalization, and feature engineering. This ensures that the data is suitable for training purposes. Additionally, SageMaker training jobs are employed for training the models. After the models are trained, they are deployed and used to identify anomalies and alert customers in real time to potential security threats. The following diagram illustrates the solution architecture.

Orca security Data Lake Architecture

Challenges and lessons learned

Orca faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing EMR streaming ingestion for high throughput
  • Taming the small files problem for fast reads
  • Maximizing performance with Athena version 3
  • Maintaining Apache Iceberg tables
  • Managing data retention
  • Monitoring the data lake infrastructure and operations
  • Mitigating data quality issues

In this section, we describe each of these challenges and the solutions implemented to address them.

Determining optimal table partitioning

Determining optimal partitioning for each table is very important in order to optimize query performance and minimize the impact on teams querying the tables when partitioning changes. Apache Iceberg’s hidden partitions combined with partition transformations proved to be valuable in achieving this goal because it allowed for transparent changes to partitioning without impacting end-users. Additionally, partition evolution enables experimentation with various partitioning strategies to optimize cost and performance without requiring a rewrite of the table’s data every time.

For example, with these features, Orca was able to easily change several of its table partitioning from DAY to HOUR with no impact on user queries. Without this native Iceberg capability, they would have needed to coordinate the new schema with all the teams that query the tables and rewrite the entire data, which would have been a costly, time-consuming, and error-prone process.

Optimizing EMR streaming ingestion for high throughput

As mentioned previously, the system ingests billions of messages daily, resulting in terabytes of data processed and stored each day. Therefore, optimizing the EMR clusters for this type of load while maintaining high throughput and low costs has been an ongoing challenge. Orca addressed this in several ways.

First, Orca chose to use instance fleets with its EMR clusters because they allow optimized resource allocation by combining different instance types and sizes. Instance fleets improve resilience by allowing multiple Availability Zones to be configured. As a result, the cluster will launch in an Availability Zone with all the required instance types, preventing capacity limitations. Additionally, instance fleets can use both Amazon Elastic Compute Cloud (Amazon EC2) On-Demand and Spot instances, resulting in cost savings.

The process of sizing the cluster for high throughput and lower costs involved adjusting the number of core and task nodes, selecting suitable instance types, and fine-tuning CPU and memory configurations. Ultimately, Orca was able to find an optimal configuration consisting of on-demand core nodes and spot task nodes of varying sizes, which provided high throughput but also ensured compliance with SLAs.

Orca also found that using different Kafka Spark Structured Streaming properties, such as minOffsetsPerTrigger, maxOffsetsPerTrigger, and minPartitions, provided higher throughput and better control of the load. Using minPartitions, which enables better parallelism and distribution across a larger number of tasks, was particularly useful for consuming high lags quickly.

Lastly, when dealing with a high data ingestion rate, Amazon S3 may throttle the requests and return 503 errors. To address this scenario, Iceberg offers a table property called write.object-storage.enabled, which incorporates a hash prefix into the stored S3 object path. This approach effectively mitigates throttling problems.

Taming the small files problem for fast reads

A common challenge often encountered when ingesting streaming data into the data lake is the creation of many small files. This can have a negative impact on read performance when querying the data with Athena or Apache Spark. Having a high number of files leads to longer query planning and runtimes due to the need to process and read each file, resulting in overhead for file system operations and network communication. Additionally, this can result in higher costs due to the large number of S3 PUT and GET requests required.

To address this challenge, Apache Spark Structured Streaming provides the trigger mechanism, which can be used to tune the rate at which data is committed to Apache Iceberg tables. The commit rate has a direct impact on the number of files being produced. For instance, a higher commit rate, corresponding to a shorter time interval, results in lots of data files being produced.

In certain cases, launching the Spark cluster on an hourly basis and configuring the trigger to AvailableNow facilitated the processing of larger data batches and reduced the number of small files created. Although this approach led to cost savings, it did involve a trade-off of reduced data freshness. However, this trade-off was deemed acceptable for specific use cases.

In addition, to address preexisting small files within the data lake, Apache Iceberg offers a data files compaction operation that combines these smaller files into larger ones. Running this operation on a schedule is highly recommended to optimize the number and size of the files. Compaction also proves valuable in handling late-arriving data and enables the integration of this data into consolidated files.

Maximizing performance with Athena version 3

Orca was an early adopter of Athena version 3, Amazon’s implementation of the Trino query engine, which provides extensive support for Apache Iceberg. Whenever possible, Orca preferred using Athena over Apache Spark for data processing. This preference was driven by the simplicity and serverless architecture of Athena, which led to reduced costs and easier usage, unlike Spark, which typically required provisioning and managing a dedicated cluster at higher costs.

In addition, Orca used Athena as part of its model training and as the primary engine for ad hoc exploratory queries conducted by data scientists, business analysts, and engineers. However, for maintaining Iceberg tables and updating table properties, Apache Spark remained the more scalable and feature-rich option.

Maintaining Apache Iceberg tables

Ensuring optimal query performance and minimizing storage overhead became a significant challenge as the data lake grew to a petabyte scale. To address this challenge, Apache Iceberg offers several maintenance procedures, such as the following:

  • Data files compaction – This operation, as mentioned earlier, involves combining smaller files into larger ones and reorganizing the data within them. This operation not only reduces the number of files but also enables data sorting based on different columns or clustering similar data using z-ordering. Using Apache Iceberg’s compaction results in significant performance improvements, especially for large tables, making a noticeable difference in query performance between compacted and uncompacted data.
  • Expiring old snapshots – This operation provides a way to remove outdated snapshots and their associated data files, enabling Orca to maintain low storage costs.

Running these maintenance procedures efficiently and cost-effectively using Apache Spark, particularly the compaction operation, which operates on terabytes of data daily, requires careful consideration. This entails appropriately sizing the Spark cluster running on EMR and adjusting various settings such as CPU and memory.

In addition, using Apache Iceberg’s metadata tables proved to be very helpful in identifying issues related to the physical layout of Iceberg’s tables, which can directly impact query performance. Metadata tables offer insights into the physical data storage layout of the tables and offer the convenience of querying them with Athena version 3. By accessing the metadata tables, crucial information about tables’ data files, manifests, history, partitions, snapshots, and more can be obtained, which aids in understanding and optimizing the table’s data layout.

For instance, the following queries can uncover valuable information about the underlying data:

  • The number of files and their average size per partition:
    >SELECT partition, file_count, (total_size / file_count) AS avg_file_size FROM "db"."table$partitions"

  • The number of data files pointed to by each manifest:
    SELECT path, added_data_files_count + existing_data_files_count AS number_of_data_files FROM "db"."table$manifests"

  • Information about the data files:
    SELECT file_path, file_size_in_bytes FROM "db"."table$files"

  • Information related to data completeness:
    SELECT record_count, partition FROM "db"."table$partitions"

Managing data retention

Effective management of data retention in a petabyte-scale data lake is crucial to ensure low storage costs as well as to comply with GDPR. However, implementing such a process can be challenging when dealing with Iceberg data stored in S3 buckets, because deleting files based on simple S3 lifecycle policies could potentially cause table corruption. This is because Iceberg’s data files are referenced in manifest files, so any changes to data files must also be reflected in the manifests.

To address this challenge, certain considerations must be taken into account while handling data retention properly. Apache Iceberg provides two modes for handling deletes, namely copy-on-write (CoW), and merge-on-read (MoR). In CoW mode, Iceberg rewrites data files at the time of deletion and creates new data files, whereas in MoR mode, instead of rewriting the data files, a delete file is written that lists the position of deleted records in files. These files are then reconciled with the remaining data during read time.

In favor of faster read times, CoW mode is preferable and when used in conjunction with the expiring old snapshots operation, it allows for the hard deletion of data files that have exceeded the set retention period.

In addition, by storing the data sorted based on the field that will be utilized for deletion (for example, organizationID), it’s possible to reduce the number of files that require rewriting. This optimization significantly enhances the efficiency of the deletion process, resulting in improved deletion times.

Monitoring the data lake infrastructure and operations

Managing a data lake infrastructure is challenging due to the various components it encompasses, including those responsible for data ingestion, storage, processing, and querying.

Effective monitoring of all these components involves tracking resource utilization, data ingestion rates, query runtimes, and various other performance-related metrics, and is essential for maintaining optimal performance and detecting issues as soon as possible.

Monitoring Amazon EMR was crucial because it played a vital role in the system for data ingestion, processing, and maintenance. Orca monitored the cluster status and resource usage of Amazon EMR by utilizing the available metrics through Amazon CloudWatch. Furthermore, it used JMX Exporter and Prometheus to scrape specific Apache Spark metrics and create custom metrics to further improve the pipelines’ observability.

Another challenge emerged when attempting to further monitor the ingestion progress through Kafka lag. Although Kafka lag tracking is the standard method for monitoring ingestion progress, it posed a challenge because Spark Structured Streaming manages its offsets internally and doesn’t commit them back to Kafka. To overcome this, Orca utilized the progress of the Spark Structured Streaming Query Listener (StreamingQueryListener) to monitor the processed offsets, which were then committed to a dedicated Kafka consumer group for lag monitoring.

In addition, to ensure optimal query performance and identify potential performance issues, it was essential to monitor Athena queries. Orca addressed this by using key metrics from Athena and the AWS SDK for Pandas, specifically TotalExecutionTime and ProcessedBytes. These metrics helped identify any degradation in query performance and keep track of costs, which were based on the size of the data scanned.

Mitigating data quality issues

Apache Iceberg’s capabilities and overall architecture played a key role in mitigating data quality challenges.

One of the ways Apache Iceberg addresses these challenges is through its schema evolution capability, which enables users to modify or add columns to a table’s schema without rewriting the entire data. This feature prevents data quality issues that may arise due to schema changes, because the table’s schema is managed as part of the manifest files, ensuring safe changes.

Furthermore, Apache Iceberg’s time travel feature provides the ability to review a table’s history and roll back to a previous snapshot. This functionality has proven to be extremely useful in identifying potential data quality issues and swiftly resolving them by reverting to a previous state with known data integrity.

These robust capabilities ensure that data within the data lake remains accurate, consistent, and reliable.

Conclusion

Data lakes are an essential part of a modern data architecture, and now it’s easier than ever to create a robust, transactional, cost-effective, and high-performant data lake by using Apache Iceberg, Amazon S3, and AWS Analytics services such as Amazon EMR and Athena.

Since building the data lake, Orca has observed significant improvements. The data lake infrastructure has allowed Orca’s platform to have seamless scalability while reducing the cost of running its data pipelines by over 50% utilizing Amazon EMR. Additionally, query costs were reduced by more than 50% using the efficient querying capabilities of Apache Iceberg and Athena version 3.

Most importantly, the data lake has made a profound impact on Orca’s platform and continues to play a key role in its success, supporting new use cases such as change data capture (CDC) and others, and enabling the development of cutting-edge cloud security solutions.

If Orca’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how this solution can address them.
  • Reach out to experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. This hands-on experience will provide valuable insights into the complexities of a transactional data lake.

Embarking on a journey to a transactional data lake using Amazon S3, Apache Iceberg, and AWS Analytics can vastly improve your organization’s data infrastructure, enabling advanced analytics and machine learning, and unlocking insights that drive innovation.


About the Authors

Eliad Gat is a Big Data & AI/ML Architect at Orca Security. He has over 15 years of experience designing and building large-scale cloud-native distributed systems, specializing in big data, analytics, AI, and machine learning.

Oded Lifshiz is a Principal Software Engineer at Orca Security. He enjoys combining his passion for delivering innovative, data-driven solutions with his expertise in designing and building large-scale machine learning pipelines.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan also leads the Apache Iceberg Israel community.

Carlos Rodrigues is a Big Data Specialist Solutions Architect at Amazon Web Services. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Hudi and Apache Iceberg.

Sofia Zilberman is a Sr. Analytics Specialist Solutions Architect at Amazon Web Services. She has a track record of 15 years of creating large-scale, distributed processing systems. She remains passionate about big data technologies and architecture trends, and is constantly on the lookout for functional and technological innovations.

Let’s Architect! Open-source technologies on AWS

Post Syndicated from Vittorio Denti original https://aws.amazon.com/blogs/architecture/lets-architect-open-source-technologies-on-aws/

We brought you a Let’s Architect! blog post about open-source on AWS that covered some technologies with development led by AWS/Amazon, as well as well-known solutions available on managed AWS services. Today, we’re following the same approach to share more insights about the process itself for developing open-source. That’s why the first topic we discuss in this post is a re:Invent talk from Heitor Lessa, Principal Solutions Architect at AWS, explaining some interesting approaches for developing and scaling successful open-source projects.

This edition of Let’s Architect! also touches on observability with Open Telemetry, Apache Kafka on AWS, and Infrastructure as Code with an hands-on workshop on AWS Cloud Development Kit (AWS CDK).

Powertools for AWS Lambda: Lessons from the road to 10 million downloads

Powertools for AWS Lambda is an open-source library to help engineering teams implement serverless best practices. In two years, Powertools went from an initial prototype to a fast-growing project in the open-source world. Rapid growth along with support from a wide community led to challenges from balancing new features with operational excellence to triaging bug reports and RFCs and scaling and redesigning documentation.

In this session, you can learn about Powertools for AWS Lambda to understand what it is and the problems it solves. Moreover, there are many valuable lessons to learn how to create and scale a successful open-source project. From managing the trade-off between releasing new features and achieving operational stability to measuring the impact of the project, there are many challenges in open-source projects that require careful thought.

Take me to this video!

Heitor Lessa describing one the key lessons: development and releasing new features should be as important as the other activities (governance, operational excellence, and more)

Heitor Lessa describing one of the key lessons: development and releasing new features should be as important as the other activities (governance, operational excellence, and more).

Observability the open-source way

The recent blog post Let’s Architect! Monitoring production systems at scale talks about the importance of monitoring. Setting up observability is critical to maintain application and infrastructure health, but instrumenting applications to collect monitoring signals such as metrics and logs can be challenging when using vendor-specific SDKs.

This video introduces you to OpenTelemetry, an open-source observability framework. OpenTelemetry provides a flexible, single vendor-agnostic SDK based on open-source specifications that developers can use to instrument and collect signals from applications. This resource explains how it works in practice and how to monitor microservice-based applications with the OpenTelemetry SDK.

Take me to this video!

With AWS Distro for OpenTelemetry, you can collect data from your AWS resources.

With AWS Distro for OpenTelemetry, you can collect data from your AWS resources.

Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost

Apache Kafka is an open-source streaming data store that decouples applications producing streaming data (producers) into its data store from applications consuming streaming data (consumers) from its data store. Amazon Managed Streaming for Apache Kafka (Amazon MSK) allows you to use the open-source version of Apache Kafka with the service managing infrastructure and operations for you.

This blog post explains how the underlying infrastructure configuration can affect Apache Kafka performance. You can learn strategies on how to size the clusters to meet the desired throughput, availability, and latency requirements. This resource helps you discover strategies to find the optimal sizing for your resources, and learn the mental models adopted to conduct the investigation and derive the conclusions.

Take me to this blog!

Comparisons of put latencies for three clusters with different broker sizes

Comparisons of put latencies for three clusters with different broker sizes

AWS Cloud Development Kit workshop

AWS Cloud Development Kit (AWS CDK) is an open-source software development framework that allows you to provision cloud resources programmatically (Infrastructure as Code or IaC) by using familiar programming languages such as Python, Typescript, Javascript, Java, Go, and C#/.Net.

CDK allows you to create reusable template and assets, test your infrastructure, make deployments repeatable, and make your cloud environment stable by removing manual (and error-prone) operations. This workshop introduces you to CDK, where you can learn how to provision an initial simple application as well as become familiar with more advanced concepts like CDK constructs.

Take me to this workshop!

This construct can be attached to any Lambda function that is used as an API Gateway backend. It counts how many requests were issued to each URL.

This construct can be attached to any Lambda function that is used as an API Gateway backend. It counts how many requests were issued to each URL.

See you next time!

Thanks for joining our conversation! To find all the blogs from this series, you can check out the Let’s Architect! list of content on the AWS Architecture Blog.

Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka Quotas – Part 1

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/multi-tenancy-apache-kafka-clusters-in-amazon-msk-with-iam-access-control-and-kafka-quotas-part-1/

With Amazon Managed Streaming for Apache Kafka (Amazon MSK), you can build and run applications that use Apache Kafka to process streaming data. To process streaming data, organizations either use multiple Kafka clusters based on their application groupings, usage scenarios, compliance requirements, and other factors, or a dedicated Kafka cluster for the entire organization. It doesn’t matter what pattern is used, Kafka clusters are typically multi-tenant, allowing multiple producer and consumer applications to consume and produce streaming data simultaneously.

With multi-tenant Kafka clusters, however, one of the challenges is to make sure that data consumer and producer applications don’t overuse cluster resources. There is a possibility that a few poorly behaved applications may overuse cluster resources, affecting the well-behaved applications as a result. Therefore, teams who manage multi-tenant Kafka clusters need a mechanism to prevent applications from overconsuming cluster resources in order to avoid issues. This is where Kafka quotas come into play. Kafka quotas control the amount of resources client applications can use within a Kafka cluster.

In Part 1 of this two-part series, we explain the concepts of how to enforce Kafka quotas in MSK multi-tenant Kafka clusters while using AWS Identity and Access Management (IAM) access control for authentication and authorization. In Part 2, we cover detailed implementation steps along with sample Kafka client applications.

Brief introduction to Kafka quotas

Kafka quotas control the amount of resources client applications can use within a Kafka cluster. It’s possible for the multi-tenant Kafka cluster to experience performance degradation or a complete outage due to resource constraints if one or more client applications produce or consume large volumes of data or generate requests at a very high rate for a continuous period of time, monopolizing Kafka cluster’s resources.

To prevent applications from overwhelming the cluster, Apache Kafka allows configuring quotas that determine how much traffic each client application produces and consumes per Kafka broker in a cluster. Kafka brokers throttle the client applications’ requests in accordance with their allocated quotas. Kafka quotas can be configured for specific users, or specific client IDs, or both. The client ID is a logical name defined in the application code that Kafka brokers use to identify which application sent messages. The user represents the authenticated user principal of a client application in a secure Kafka cluster with authentication enabled.

There are two types of quotas supported in Kafka:

  • Network bandwidth quotas – The byte-rate thresholds define how much data client applications can produce to and consume from each individual broker in a Kafka cluster measured in bytes per second.
  • Request rate quotas – This limits the percentage of time each individual broker spends processing client applications requests.

Depending on the business requirements, you can use either of these quota configurations. However, the use of network bandwidth quotas is common because it allows organizations to cap platform resources consumption according to the amount of data produced and consumed by applications per second.

Because this post uses an MSK cluster with IAM access control, we specifically discuss configuring network bandwidth quotas based on the applications’ client IDs and authenticated user principals.

Considerations for Kafka quotas

Keep the following in mind when working with Kafka quotas:

  • Enforcement level – Quotas are enforced at the broker level rather than at the cluster level. Suppose there are six brokers in a Kafka cluster and you specify a 12 MB/sec produce quota for a client ID and user. The producer application using the client ID and user can produce a max of 12MB/sec on each broker at the same time, for a total of max 72 MB/sec across all six brokers. However, if leadership for every partition of a topic resides on one broker, the same producer application can only produce a max of 12 MB/sec. Due to the fact that throttling occurs per broker, it’s essential to maintain an even balance of topics’ partitions leadership across all the brokers.
  • Throttling – When an application reaches its quota, it is throttled, not failed, meaning the broker doesn’t throw an exception. Clients who reach their quota on a broker will begin to have their requests throttled by the broker to prevent exceeding the quota. Instead of sending an error when a client exceeds a quota, the broker attempts to slow it down. Brokers calculate the amount of delay necessary to bring clients under quotas and delay responses accordingly. As a result of this approach, quota violations are transparent to clients, and clients don’t have to implement any special backoff or retry policies. However, when using an asynchronous producer and sending messages at a rate greater than the broker can accept due to quota, the messages will be queued in the client application memory first. The client will eventually run out of buffer space if the rate of sending messages continues to exceed the rate of accepting messages, causing the next Producer.send() call to be blocked. Producer.send() will eventually throw a TimeoutException if the timeout delay isn’t sufficient to allow the broker to catch up to the producer application.
  • Shared quotas – If more than one client application has the same client ID and user, the quota configured for the client ID and user will be shared among all those applications. Suppose you configure a produce quota of 5 MB/sec for the combination of client-id="marketing-producer-client" and user="marketing-app-user". In this case, all producer applications that have marketing-producer-client as a client ID and marketing-app-user as an authenticated user principal will share the 5 MB/sec produce quota, impacting each other’s throughput.
  • Produce throttling – The produce throttling behavior is exposed to producer clients via client metrics such as produce-throttle-time-avg and produce-throttle-time-max. If these are non-zero, it indicates that the destination brokers are slowing the producer down and the quotas configuration should be reviewed.
  • Consume throttling – The consume throttling behavior is exposed to consumer clients via client metrics such as fetch-throttle-time-avg and fetch-throttle-time-max. If these are non-zero, it indicates that the origin brokers are slowing the consumer down and the quotas configuration should be reviewed.

Note that client metrics are metrics exposed by clients connecting to Kafka clusters.

  • Quota configuration – It’s possible to configure Kafka quotas either statically through the Kafka configuration file or dynamically through kafka-config.sh or the Kafka Admin API. The dynamic configuration mechanism is much more convenient and manageable because it allows quotas for the new producer and consumer applications to be configured at any time without having to restart brokers. Even while application clients are producing or consuming data, dynamic configuration changes take effect in real time.
  • Configuration keys – With the kafka-config.sh command-line tool, you can set dynamic consume, produce, and request quotas using the following three configuration keys, respectively: consumer_byte_rate, producer_byte_rate, and request_percentage.

For more information about Kafka quotas, refer to Kafka documentation.

Enforce network bandwidth quotas with IAM access control

Following our understanding of Kafka quotas, let’s look at how to enforce them in an MSK cluster while using IAM access control for authentication and authorization. IAM access control in Amazon MSK eliminates the need for two separate mechanisms for authentication and authorization.

The following figure shows an MSK cluster that is configured to use IAM access control in the demo account. Each producer and consumer application has a quota that determines how much data they can produce or consume in bytes per second. For example, ProducerApp-1 has a produce quota of 1024 bytes/sec, and ConsumerApp-1 and ConsumerApp-2 each have a consume quota of 5120 and 1024 bytes/sec, respectively. It’s important to note that Kafka quotas are set on the Kafka cluster rather than in the client applications.

The preceding figure illustrates how Kafka client applications (ProducerApp-1, ConsumerApp-1, and ConsumerApp-2) access Topic-B in the MSK cluster by assuming write and read IAM roles. The workflow is as follows:

  • P1ProducerApp-1 (via its ProducerApp-1-Role IAM role) assumes the Topic-B-Write-Role IAM role to send messages to Topic-B in the MSK cluster.
  • P2 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B.
  • C1ConsumerApp-1 (via its ConsumerApp-1-Role IAM role) and ConsumerApp-2 (via its ConsumerApp-2-Role IAM role) assume the Topic-B-Read-Role IAM role to read messages from Topic-B in the MSK cluster.
  • C2 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B.

ConsumerApp-1 and ConsumerApp-2 are two separate consumer applications. They do not belong to the same consumer group.

Configuring client IDs and understanding authenticated user principal

As explained earlier, Kafka quotas can be configured for specific users, specific client IDs, or both. Let’s explore client ID and user concepts and configurations required for Kafka quota allocation.

Client ID

A client ID representing an application’s logical name can be configured within an application’s code. In Java applications, for example, you can set the producer’s and consumer’s client IDs using ProducerConfig.CLIENT_ID_CONFIG and ConsumerConfig.CLIENT_ID_CONFIG configurations, respectively. The following code snippet illustrates how ProducerApp-1 sets the client ID to this-is-me-producerapp-1 using ProducerConfig.CLIENT_ID_CONFIG:

Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG,"this-is-me-producerapp-1");

User

The user refers to an authenticated user principal of the client application in the Kafka cluster with authentication enabled. As shown in the solution architecture, producer and consumer applications assume the Topic-B-Write-Role and Topic-B-Read-Role IAM roles, respectively, to perform write and read operations on Topic-B. Therefore, their authenticated user principal will look like the following IAM identifier:

arn:aws:sts::<AWS Account Id>:assumed-role/<assumed Role Name>/<role session name>

For more information, refer to IAM identifiers.

The role session name is a string identifier that uniquely identifies a session when IAM principals, federated identities, or applications assume an IAM role. In our case, ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications assume an IAM role using the AWS Security Token Service (AWS STS) SDK, and provide a role session name in the AWS STS SDK call. For example, if ProducerApp-1 assumes the Topic-B-Write-Role IAM role and uses this-is-producerapp-1-role-session as its role session name, its authenticated user principal will be as follows:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Write-Role/this-is-producerapp-1-role-session

The following is an example code snippet from the ProducerApp-1 application using this-is-producerapp-1-role-session as the role session name while assuming the Topic-B-Write-Role IAM role using the AWS STS SDK:

StsClient stsClient = StsClient.builder().region(region).build();
AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
          .roleArn("<Topic-B-Write-Role ARN>")
          .roleSessionName("this-is-producerapp-1-role-session") //role-session-name string literal
          .build();

Configure network bandwidth (produce and consume) quotas

The following commands configure the produce and consume quotas dynamically for client applications based on their client ID and authenticated user principal in the MSK cluster configured with IAM access control.

The following code configures the produce quota:

kafka-configs.sh --bootstrap-server <MSK cluster bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "producer_byte_rate=<number of bytes per second>" \
--entity-type clients --entity-name <ProducerApp client Id> \
--entity-type users --entity-name <ProducerApp user principal>

The producer_byes_rate refers to the number of messages, in bytes, that a producer client identified by client ID and user is allowed to produce to a single broker per second. The option --command-config points to config_iam.properties, which contains the properties required for IAM access control.

The following code configures the consume quota:

kafka-configs.sh --bootstrap-server <MSK cluster bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=<number of bytes per second>" \
--entity-type clients --entity-name <ConsumerApp client Id> \
--entity-type users --entity-name <ConsumerApp user principal>

The consumer_bytes_rate refers to the number of messages, in bytes, that a consumer client identified by client ID and user allowed to consume from a single broker per second.

Let’s look at some example quota configuration commands for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 client applications:

  • ProducerApp-1 produce quota configuration – Let’s assume ProducerApp-1 has this-is-me-producerapp-1 configured as the client ID in the application code and uses this-is-producerapp-1-role-session as the role session name when assuming the Topic-B-Write-Role IAM role. The following command sets the produce quota for ProducerApp-1 to 1024 bytes per second:
kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "producer_byte_rate=1024" \
--entity-type clients --entity-name this-is-me-producerapp-1 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Write-Role/this-is-producerapp-1-role-session
  • ConsumerApp-1 consume quota configuration – Let’s assume ConsumerApp-1 has this-is-me-consumerapp-1 configured as the client ID in the application code and uses this-is-consumerapp-1-role-session as the role session name when assuming the Topic-B-Read-Role IAM role. The following command sets the consume quota for ConsumerApp-1 to 5120 bytes per second:
kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=5120" \
--entity-type clients --entity-name this-is-me-consumerapp-1 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/this-is-consumerapp-1-role-session


ConsumerApp-2 consume quota configuration
– Let’s assume ConsumerApp-2 has this-is-me-consumerapp-2 configured as the client ID in the application code and uses this-is-consumerapp-2-role-session as the role session name when assuming the Topic-B-Read-Role IAM role. The following command sets the consume quota for ConsumerApp-2 to 1024 bytes per second per broker:

kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=1024" \
--entity-type clients --entity-name this-is-me-consumerapp-2 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/this-is-consumerapp-2-role-session

As a result of the preceding commands, the ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 client applications will be throttled by the MSK cluster using IAM access control if they exceed their assigned produce and consume quotas, respectively.

Implement the solution

Part 2 of this series showcases the step-by-step detailed implementation of Kafka quotas configuration with IAM access control along with the sample producer and consumer client applications.

Conclusion

Kafka quotas offer teams the ability to set limits for producer and consumer applications. With Amazon MSK, Kafka quotas serve two important purposes: eliminating guesswork and preventing issues caused by poorly designed producer or consumer applications by limiting their quota, and allocating operational costs of a central streaming data platform across different cost centers and tenants (application and product teams).

In this post, we learned how to configure network bandwidth quotas within Amazon MSK while using IAM access control. We also covered some sample commands and code snippets to clarify how the client ID and authenticated principal are used when configuring quotas. Although we only demonstrated Kafka quotas using IAM access control, you can also configure them using other Amazon MSK-supported authentication mechanisms.

In Part 2 of this series, we demonstrate how to configure network bandwidth quotas with IAM access control in Amazon MSK and provide you with example producer and consumer applications so that you can see them in action.

Check out the following resources to learn more:


About the Author

Vikas Bajaj is a Senior Manager, Solutions Architects, Financial Services at Amazon Web Services. Having worked with financial services organizations and digital native customers, he advises financial services customers in Australia on technology decisions, architectures, and product roadmaps.

Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka quotas – Part 2

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/multi-tenancy-apache-kafka-clusters-in-amazon-msk-with-iam-access-control-and-kafka-quotas-part-2/

Kafka quotas are integral to multi-tenant Kafka clusters. They prevent Kafka cluster performance from being negatively affected by poorly behaved applications overconsuming cluster resources. Furthermore, they enable the central streaming data platform to be operated as a multi-tenant platform and used by downstream and upstream applications across multiple business lines. Kafka supports two types of quotas: network bandwidth quotas and request rate quotas. Network bandwidth quotas define byte-rate thresholds such as how much data client applications can produce to and consume from each individual broker in a Kafka cluster measured in bytes per second. Request rate quotas limit the percentage of time each individual broker spends processing client applications requests. Depending on your configuration, Kafka quotas can be set for specific users, specific client IDs, or both.

In Part 1 of this two-part series, we discussed the concepts of how to enforce Kafka quotas in Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters while using AWS Identity and Access Management (IAM) access control.

In this post, we walk you through the step-by-step implementation of setting up Kafka quotas in an MSK cluster while using IAM access control and testing them through sample client applications.

Solution overview

The following figure, which we first introduced in Part 1, illustrates how Kafka client applications (ProducerApp-1, ConsumerApp-1, and ConsumerApp-2) access Topic-B in the MSK cluster by assuming write and read IAM roles. Each producer and consumer client application has a quota that determines how much data they can produce or consume in bytes/second. The ProducerApp-1 quota allows it to produce up to 1024 bytes/second per broker. Similarly, the ConsumerApp-1 and ConsumerApp-2 quotas allow them to consume 5120 and 1024 bytes/second per broker, respectively. The following is a brief explanation of the flow shown in the architecture diagram:

  • P1ProducerApp-1 (via its ProducerApp-1-Role IAM role) assumes the Topic-B-Write-Role IAM role to send messages to Topic-B
  • P2 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B
  • C1ConsumerApp-1 (via its ConsumerApp-1-Role IAM role) and ConsumerApp-2 (via its ConsumerApp-2-Role IAM role) assume the Topic-B-Read-Role IAM role to read messages from Topic-B
  • C2 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B

Note that this post uses the AWS Command Line Interface (AWS CLI), AWS CloudFormation templates, and the AWS Management Console for provisioning and modifying AWS resources, and resources provisioned will be billed to your AWS account.

The high-level steps are as follows:

  1. Provision an MSK cluster with IAM access control and Amazon Elastic Compute Cloud (Amazon EC2) instances for client applications.
  2. Create Topic-B on the MSK cluster.
  3. Create IAM roles for the client applications to access Topic-B.
  4. Run the producer and consumer applications without setting quotas.
  5. Configure the produce and consume quotas for the client applications.
  6. Rerun the applications after setting the quotas.

Prerequisites

It is recommended that you read Part 1 of this series before continuing. In order to get started, you need the following:

  • An AWS account that will be referred to as the demo account in this post, assuming that its account ID is 1111 1111 1111
  • Permissions to create, delete, and modify AWS resources in the demo account

Provision an MSK cluster with IAM access control and EC2 instances

This step involves provisioning an MSK cluster with IAM access control in a VPC in the demo account. Additionally, we create four EC2 instances to make configuration changes to the MSK cluster and host producer and consumer client applications.

Deploy CloudFormation stack

  1. Clone the GitHub repository to download the CloudFormation template files and sample client applications:
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git
  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose Create stack.
  3. For Prepare template, select Template is ready.
  4. For Template source, select Upload a template file.
  5. Upload the cfn-msk-stack-1.yaml file from amazon-msk-kafka-quotas/cfn-templates directory, then choose Next.
  6. For Stack name, enter MSKStack.
  7. Leave the parameters as default and choose Next.
  8. Scroll to the bottom of the Configure stack options page and choose Next to continue.
  9. Scroll to the bottom of the Review page, select the check box I acknowledge that CloudFormation may create IAM resources, and choose Submit.

It will take approximately 30 minutes for the stack to complete. After the stack has been successfully created, the following resources will be created:

  • A VPC with three private subnets and one public subnet
  • An MSK cluster with three brokers with IAM access control enabled
  • An EC2 instance called MSKAdminInstance for modifying MSK cluster settings as well as creating and modifying AWS resources
  • EC2 instances for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2, one for each client application
  • A separate IAM role for each EC2 instance that hosts the client application, as shown in the architecture diagram
  1. From the stack’s Outputs tab, note the MSKClusterArn value.

Create a topic on the MSK cluster

To create Topic-B on the MSK cluster, complete the following steps:

  1. On the Amazon EC2 console, navigate to your list of running EC2 instances.
  2. Select the MSKAdminInstance EC2 instance and choose Connect.
  3. On the Session Manager tab, choose Connect.
  4. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Add Kafka binaries to the path
sed -i 's|HOME/bin|HOME/bin:~/kafka/bin|' .bash_profile

# Set your AWS region
aws configure set region <AWS Region>
  1. Set the environment variable to point to the MSK Cluster brokers IAM endpoint:
MSK_CLUSTER_ARN=<Use the value of MSKClusterArn that you noted earlier>
echo "export BOOTSTRAP_BROKERS_IAM=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerStringSaslIam)" >> .bash_profile
source .bash_profile
echo $BOOTSTRAP_BROKERS_IAM
  1. Take note of the value of BOOTSTRAP_BROKERS_IAM.
  2. Run the following Kafka CLI command to create Topic-B on the MSK cluster:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--create --topic Topic-B \
--partitions 3 --replication-factor 3 \
--command-config config_iam.properties

Because the MSK cluster is provisioned with IAM access control, the option --command-config points to config_iam.properties, which contains the properties required for IAM access control, created by the MSKStack CloudFormation stack.

The following warnings may appear when you run the Kafka CLI commands, but you may ignore them:

The configuration 'sasl.jaas.config' was supplied but isn't a known config. 
The configuration 'sasl.client.callback.handler.class' was supplied but isn't a known config.
  1. To verify that Topic-B has been created, list all the topics:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties --list

Create IAM roles for client applications to access Topic-B

This step involves creating Topic-B-Write-Role and Topic-B-Read-Role as shown in the architecture diagram. Topic-B-Write-Role enables write operations on Topic-B, and can be assumed by the ProducerApp-1 . In a similar way, the ConsumerApp-1 and ConsumerApp-2 can assume Topic-B-Read-Role to perform read operations on Topic-B. To perform read operations on Topic-B, ConsumerApp-1 and ConsumerApp-2 must also belong to the consumer groups specified during the MSKStack stack update in the subsequent step.

Create the roles with the following steps:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Select MSKStack and choose Update.
  3. For Prepare template, select Replace current template.
  4. For Template source, select Upload a template file.
  5. Upload the cfn-msk-stack-2.yaml file from amazon-msk-kafka-quotas/cfn-templates directory, then choose Next.
  6. Provide the following additional stack parameters:
    • For Topic B ARN, enter the Topic-B ARN.

The ARN must be formatted as arn:aws:kafka:region:account-id:topic/msk-cluster-name/msk-cluster-uuid/Topic-B. Use the cluster name and cluster UUID from the MSK cluster ARN you noted earlier and provide your AWS Region. For more information, refer to the IAM access control for Amazon MSK.

    • For ConsumerApp-1 Consumer Group name, enter ConsumerApp-1 consumer group ARN.

It must be formatted as arn:aws:kafka:region:account-id:group/msk-cluster-name/msk-cluster-uuid/consumer-group-name

    • For ConsumerApp-2 Consumer Group name, enter ConsumerApp-2 consumer group ARN.

Use a similar format as the previous ARN.

  1. Choose Next to continue.
  2. Scroll to the bottom of the Configure stack options page and choose Next to continue.
  3. Scroll to the bottom of the Review page, select the check box I acknowledge that CloudFormation may create IAM resources, and choose Update stack.

It will take approximately 3 minutes for the stack to update. After the stack has been successfully updated, the following resources will be created:

  • Topic-B-Write-Role – An IAM role with permission to perform write operations on Topic-B. Its trust policy allows the ProducerApp-1-Role IAM role to assume it.
  • Topic-B-Read-Role – An IAM role with permission to perform read operations on Topic-B. Its trust policy allows the ConsumerApp-1-Role and ConsumerApp-2-Role IAM roles to assume it. Furthermore, ConsumerApp-1 and ConsumerApp-2 must also belong to the consumer groups you specified when updating the stack to perform read operations on Topic-B.
  1. From the stack’s Outputs tab, note the TopicBReadRoleARN and TopicBWriteRoleARN values.

Run the producer and consumer applications without setting quotas

Here, we run ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 without setting their quotas. From the previous steps, you will need BOOTSTRAP_BROKERS_IAM value, Topic-B-Write-Role ARN, and Topic-B-Read-Role ARN. The source code of client applications and their packaged versions are available in the GitHub repository.

Run the ConsumerApp-1 application

To run the ConsumerApp-1 application, complete the following steps:

  1. On the Amazon EC2 console, select the ConsumerApp-1 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ConsumerApp-1 application to start consuming messages from Topic-B:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-1 consumer group name> \
--role-session-name <role session name for ConsumerApp-1 to use during STS assume role call> \
--client-id <ConsumerApp-1 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

You can find the source code on GitHub for your reference. The command line parameter details are as follows:

  • –bootstrap-servers – MSK cluster bootstrap brokers IAM endpoint.
  • –assume-role-arnTopic-B-Read-Role IAM role ARN. Assuming this role, ConsumerApp-1 will read messages from the topic.
  • –region – Region you’re using.
  • –topic-name – Topic name from which ConsumerApp-1 will read messages. The default is Topic-B.
  • –consumer-group – Consumer group name for ConsumerApp-1, as specified during the stack update.
  • –role-session-name ConsumerApp-1 assumes the Topic-B-Read-Role using the AWS Security Token Service (AWS STS) SDK. ConsumerApp-1 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID for ConsumerApp-1 .
  • –print-consumer-quota-metrics – Flag indicating whether client metrics should be printed on the terminal by ConsumerApp-1.
  • –cw-dimension-nameAmazon CloudWatch dimension name that will be used to publish client throttling metrics from ConsumerApp-1.
  • –cw-dimension-value – CloudWatch dimension value that will be used to publish client throttling metrics from ConsumerApp-1.
  • –cw-namespace – Namespace where ConsumerApp-1 will publish CloudWatch metrics in order to monitor throttling.
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-1-cg \
--role-session-name consumerapp-1-role-session \
--client-id consumerapp-1-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-1 \
--cw-namespace ConsumerApps

The fetch-throttle-time-avg and fetch-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ConsumerApp-1. Remember that we haven’t set the consume quota for ConsumerApp-1 yet. Let it run for a while.

Run the ConsumerApp-2 application

To run the ConsumerApp-2 application, complete the following steps:

  1. On the Amazon EC2 console, select the ConsumerApp-2 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ConsumerApp-2 application to start consuming messages from Topic-B:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-2 consumer group name> \
--role-session-name <role session name for ConsumerApp-2 to use during STS assume role call> \
--client-id <ConsumerApp-2 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

The code has similar command line parameters details as ConsumerApp-1 discussed previously, except for the following:

  • –consumer-group – Consumer group name for ConsumerApp-2, as specified during the stack update.
  • –role-session-name ConsumerApp-2 assumes the Topic-B-Read-Role using the AWS STS SDK. ConsumerApp-2 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID for ConsumerApp-2 .
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-2-cg \
--role-session-name consumerapp-2-role-session \
--client-id consumerapp-2-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-2 \
--cw-namespace ConsumerApps

The fetch-throttle-time-avg and fetch-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ConsumerApp-2. Remember that we haven’t set the consume quota for ConsumerApp-2 yet. Let it run for a while.

Run the ProducerApp-1 application

To run the ProducerApp-1 application, complete the following steps:

  1. On the Amazon EC2 console, select the ProducerApp-1 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ProducerApp-1 application to start sending messages to Topic-B:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Write-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--num-messages <Number of events> \
--role-session-name <role session name for ProducerApp-1 to use during STS assume role call> \
--client-id <ProducerApp-1 client.id> \
--producer-type <Producer Type, options are sync or async> \
--print-producer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

You can find the source code on GitHub for your reference. The command line parameter details are as follows:

  • –bootstrap-servers – MSK cluster bootstrap brokers IAM endpoint.
  • –assume-role-arnTopic-B-Write-Role IAM role ARN. Assuming this role, ProducerApp-1 will write messages to the topic.
  • –topic-nameProducerApp-1 will send messages to this topic. The default is Topic-B.
  • –region – AWS Region you’re using.
  • –num-messages – Number of messages the ProducerApp-1 application will send to the topic.
  • –role-session-name ProducerApp-1 assumes the Topic-B-Write-Role using the AWS STS SDK. ProducerApp-1 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID of ProducerApp-1 .
  • –producer-typeProducerApp-1can be run either synchronously or asynchronously. Options are sync or async.
  • –print-producer-quota-metrics – Flag indicating whether the client metrics should be printed on the terminal by ProducerApp-1.
  • –cw-dimension-name – CloudWatch dimension name that will be used to publish client throttling metrics from ProducerApp-1.
  • –cw-dimension-value – CloudWatch dimension value that will be used to publish client throttling metrics from ProducerApp-1.
  • –cw-namespace – The namespace where ProducerApp-1 will publish CloudWatch metrics in order to monitor throttling.
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment. To run a synchronous Kafka producer, it uses the option --producer-type sync:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBWriteRole-xxxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--num-messages 10000000 \
--role-session-name producerapp-1-role-session \
--client-id producerapp-1-client-id \
--producer-type sync \
--print-producer-quota-metrics Y \
--cw-dimension-name ProducerApp \
--cw-dimension-value ProducerApp-1 \
--cw-namespace ProducerApps

Alternatively, use --producer-type async to run an asynchronous producer. For more details, refer to Asynchronous send.

The produce-throttle-time-avg and produce-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ProducerApp-1. Remember that we haven’t set the produce quota for ProducerApp-1 yet. Check that ConsumerApp-1 and ConsumerApp-2 can consume messages and notice they are not throttled. Stop the consumer and producer client applications by pressing Ctrl+C in their respective browser tabs.

Set produce and consume quotas for client applications

Now that we have run the producer and consumer applications without quotas, we set their quotas and rerun them.

Open the Sessions Manager terminal for the MSKAdminInstance EC2 instance as described earlier and run the following commands to find the default configuration of one of the brokers in the MSK cluster. MSK clusters are provisioned with the default Kafka quotas configuration.

# Describe Broker-1 default configurations
kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--entity-type brokers \
--entity-name 1 \
--all --describe > broker1_default_configurations.txt
cat broker1_default_configurations.txt | grep quota.consumer.default
cat broker1_default_configurations.txt | grep quota.producer.default

The following screenshot shows the Broker-1 default values for quota.consumer.default and quota.producer.default.

ProducerApp-1 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

According to the architecture diagram discussed earlier, set the ProducerApp-1 produce quota to 1024 bytes/second. For <ProducerApp-1 Client Id> and <ProducerApp-1 Role Session>, make sure you use the same values that you used while running ProducerApp-1 earlier (producerapp-1-client-id and producerapp-1-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'producer_byte_rate=1024' \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

Verify the ProducerApp-1 produce quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

You can remove the ProducerApp-1 produce quota by using the following command, but don’t run the command as we’ll test the quotas next.

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config producer_byte_rate \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

ConsumerApp-1 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

Let’s set a consume quota of 5120 bytes/second for ConsumerApp-1. For <ConsumerApp-1 Client Id> and <ConsumerApp-1 Role Session>, make sure you use the same values that you used while running ConsumerApp-1 earlier (consumerapp-1-client-id and consumerapp-1-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=5120' \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

Verify the ConsumerApp-1 consume quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

You can remove the ConsumerApp-1 consume quota, by using the following command, but don’t run the command as we’ll test the quotas next.

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config consumer_byte_rate \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

ConsumerApp-2 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

Let’s set a consume quota of 1024 bytes/second for ConsumerApp-2. For <ConsumerApp-2 Client Id> and <ConsumerApp-2 Role Session>, make sure you use the same values that you used while running ConsumerApp-2 earlier (consumerapp-2-client-id and consumerapp-2-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=1024' \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

Verify the ConsumerApp-2 consume quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

As with ConsumerApp-1, you can remove the ConsumerApp-2 consume quota using the same command with ConsumerApp-2 client and user details.

Rerun the producer and consumer applications after setting quotas

Let’s rerun the applications to verify the effect of the quotas.

Rerun ProducerApp-1

Rerun ProducerApp-1 in synchronous mode with the same command that you used earlier. The following screenshot illustrates that when ProducerApp-1 reaches its quota on any of the brokers, the produce-throttle-time-avg and produce-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ProducerApp-1 is throttled. Allow ProducerApp-1 to run for a few seconds and then stop it by using Ctrl+C.

You can also test the effect of the produce quota by rerunning ProducerApp-1 again in asynchronous mode (--producer-type async). Similar to a synchronous run, the following screenshot illustrates that when ProducerApp-1 reaches its quota on any of the brokers, the produce-throttle-time-avg and produce-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ProducerApp-1 is throttled. Allow asynchronous ProducerApp-1 to run for a while.

You will eventually see a TimeoutException stating org.apache.kafka.common.errors.TimeoutException: Expiring xxxxx record(s) for Topic-B-2:xxxxxxx ms has passed since batch creation

When using an asynchronous producer and sending messages at a rate greater than the broker can accept due to the quota, the messages will be queued in the client application memory first. The client will eventually run out of buffer space if the rate of sending messages continues to exceed the rate of accepting messages, causing the next Producer.send() call to be blocked. Producer.send() will eventually throw a TimeoutException if the timeout delay is not sufficient to allow the broker to catch up to the producer application. Stop ProducerApp-1 by using Ctrl+C.

Rerun ConsumerApp-1

Rerun ConsumerApp-1 with the same command that you used earlier. The following screenshot illustrates that when ConsumerApp-1 reaches its quota, the fetch-throttle-time-avg and fetch-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ConsumerApp-1 is throttled.

Allow ConsumerApp-1 to run for a few seconds and then stop it by using Ctrl+C.

Rerun ConsumerApp-2

Rerun ConsumerApp-2 with the same command that you used earlier. Similarly, when ConsumerApp-2 reaches its quota, the fetch-throttle-time-avg and fetch-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ConsumerApp-2 is throttled. Allow ConsumerApp-2 to run for a few seconds and then stop it by pressing Ctrl+C.

Client quota metrics in Amazon CloudWatch

In Part 1, we explained that client metrics are metrics exposed by clients connecting to Kafka clusters. Let’s examine the client metrics in CloudWatch.

  1. On the CloudWatch console, choose All metrics.
  2. Under Custom Namespaces, choose the namespace you provided while running the client applications.
  3. Choose the dimension name and select produce-throttle-time-max, produce-throttle-time-avg, fetch-throttle-time-max, and fetch-throttle-time-avg metrics for all the applications.

These metrics indicate throttling behavior for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications tested with the quota configurations in the previous section. The following screenshots indicate the throttling of ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 based on network bandwidth quotas. ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications feed their respective client metrics to CloudWatch. You can find the source code on GitHub for your reference.

Secure client ID and role session name

We discussed how to configure Kafka quotas using an application’s client ID and authenticated user principal. When a client application assumes an IAM role to access Kafka topics on a MSK cluster with IAM authentication enabled, its authenticated user principal is represented in the following format (for more information, refer to IAM identifiers):

arn:aws:sts::111111111111:assumed-role/Topic-B-Write-Role/producerapp-1-role-session

It contains the role session name (in this case, producerapp-1-role-session) used in the client application while assuming an IAM role through the AWS STS SDK. The client application source code is available for your reference. The client ID is a logical name string (for example, producerapp-1-client-id) that is configured in the application code by the application team. Therefore, an application can impersonate another application if it obtains the client ID and role session name of the other application, and if it has permission to assume the same IAM role.

As shown in the architecture diagram, ConsumerApp-1 and ConsumerApp-2 are two separate client applications with their respective quota allocations. Because both have permission to assume the same IAM role (Topic-B-Read-Role) in the demo account, they are allowed to consume messages from Topic-B. Thus, MSK cluster brokers distinguish them based on their client IDs and users (which contain their respective role session name values). If ConsumerApp-2 somehow obtains the ConsumerApp-1 role session name and client ID, it can impersonate ConsumerApp-1 by specifying the ConsumerApp-1 role session name and client ID in the application code.

Let’s assume ConsumerApp-1 uses consumerapp-1-client-id and consumerapp-1-role-session as its client ID and role session name, respectively. Therefore, ConsumerApp-1's authenticated user principal will appear as follows when it assumes the Topic-B-Read-Role IAM role:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

Similarly, ConsumerApp-2 uses consumerapp-2-client-id and consumerapp-2-role-session as its client ID and role session name, respectively. Therefore, ConsumerApp-2's authenticated user principal will appear as follows when it assumes the Topic-B-Read-Role IAM role:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-2-role-session

If ConsumerApp-2 obtains ConsumerApp-1's client ID and role session name and specifies them in its application code, MSK cluster brokers will treat it as ConsumerApp-1 and view its client ID as consumerapp-1-client-id, and the authenticated user principal as follows:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

This allows ConsumerApp-2 to consume data from the MSK cluster at a maximum rate of 5120 bytes per second rather than 1024 bytes per second as per its original quota allocation. Consequently, ConsumerApp-1's throughput will be negatively impacted if ConsumerApp-2 runs concurrently.

Enhanced architecture

You can introduce AWS Secrets Manager and AWS Key Management Service (AWS KMS) in the architecture to secure applications’ client IDs and role session names. To provide stronger governance, the applications’ client ID and role session name must be stored as encrypted secrets in the Secrets Manager. The IAM resource policies associated with encrypted secrets and a KMS customer managed key (CMK) will allow applications to access and decrypt only their respective client ID and role session name. In this way, applications will not be able to access each other’s client ID and role session name and impersonate one another. The following image shows the enhanced architecture.

The updated flow has the following stages:

  • P1ProducerApp-1 retrieves its client-id and role-session-name secrets from Secrets Manager
  • P2ProducerApp-1 configures the secret client-id as CLIENT_ID_CONFIG in the application code, and assumes Topic-B-Write-Role (via its ProducerApp-1-Role IAM role) by passing the secret role-session-name to the AWS STS SDK assumeRole function call
  • P3 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B
  • C1 ConsumerApp-1 and ConsumerApp-2 retrieve their respective client-id and role-session-name secrets from Secrets Manager
  • C2ConsumerApp-1 and ConsumerApp-2 configure their respective secret client-id as CLIENT_ID_CONFIG in their application code, and assume Topic-B-Write-Role (via ConsumerApp-1-Role and ConsumerApp-2-Role IAM roles, respectively) by passing their secret role-session-name in the AWS STS SDK assumeRole function call
  • C3 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B

Refer to the documentation for AWS Secrets Manager and AWS KMS to get a better understanding of how they fit into the architecture.

Clean up resources

Navigate to the CloudFormation console and delete the MSKStack stack. All resources created during this post will be deleted.

Conclusion

In this post, we covered detailed steps to configure Amazon MSK quotas and demonstrated their effect through sample client applications. In addition, we discussed how you can use client metrics to determine if a client application is throttled. We also highlighted a potential issue with plaintext client IDs and role session names. We recommend implementing Kafka quotas with Amazon MSK using Secrets Manager and AWS KMS as per the revised architecture diagram to ensure a zero-trust architecture.

If you have feedback or questions about this post, including the revised architecture, we’d be happy to hear from you. We hope you enjoyed reading this post.


About the Author

Vikas Bajaj is a Senior Manager, Solutions Architects, Financial Services at Amazon Web Services. With over two decades of experience in financial services and working with digital-native businesses, he advises customers on product design, technology roadmaps, and application architectures.

Best practices for running production workloads using Amazon MSK tiered storage

Post Syndicated from Nagarjuna Koduru original https://aws.amazon.com/blogs/big-data/best-practices-for-running-production-workloads-using-amazon-msk-tiered-storage/

In the second post of the series, we discussed some core concepts of the Amazon Managed Streaming for Apache Kafka (Amazon MSK) tiered storage feature and explained how read and write operations work in a tiered storage enabled cluster.

This post focuses on how to properly size your MSK tiered storage cluster, which metrics to monitor, and the best practices to consider when running a production workload.

Sizing a tiered storage cluster

Sizing and capacity planning are critical aspects of designing and operating a distributed system. It involves estimating the resources required to handle the expected workload and ensure the system can scale and perform efficiently. In the context of a distributed system like Kafka, sizing involves determining the number of brokers, the number of partitions, and the amount of storage and memory required for each broker. Capacity planning involves estimating the expected workload, including the number of producers, consumers, and the throughput requirements.

Let’s assume a scenario where the producers are evenly balancing the load between brokers, brokers host the same number of partitions, there are enough partitions to ingest the throughput, and consumers consume directly from the tip of the stream. The brokers are receiving the same load and doing the same work. We therefore just focus on Broker1 in the following diagram of a data flow within a cluster.

Theoretical sustained throughput with tiered storage

We derive the following formula for the theoretical sustained throughput limit tcluster given the infrastructure characteristics of a specific cluster with tiered storage enabled on all topics:

max(tcluster) <= min {

max(tstorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tNetworkAttachedStorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tEC2network) * #brokers/(#tip_consumer_groups + r + #remote_consumer_groups)

}

This formula contains the following values:

  • tCluster – Total ingress produce throughput sent to the cluster
  • tStorage – Storage volume throughput supported
  • tNetworkAttachedStorage – Network attached storage to the Amazon Elastic Compute Cloud (Amazon EC2) instance network throughput
  • tEC2network – EC2 instance network bandwidth
  • non_tip_local_consumer_groups – Number of consumer groups reading from network attached storage at ingress rate
  • tip_consumer_groups – Number of consumer groups reading from page cache at ingress rate
  • remote_consumer_groups – Number of consumer groups reading from remote tier at ingress rate
  • r – Replication factor of the Kafka topic

Note that in the first post , we didn’t differentiate between different types of consumer groups. With tiered storage, some consumer groups might be consuming from remote. These remote consumers might ultimately catch up and start reading from local storage and finally catch up to the tip. Therefore, we model these three different consumer groups in the equation to account for their impact on infrastructure usage. In the following sections, we provide derivations of this equation.

Derivation of throughput limit from network attached storage bottleneck

Because Amazon MSK uses network attached storage for local storage, both network attached storage throughput and bandwidth should be accounted for. Total throughput bandwidth requirement is a combination of ingress and egress from the network attached storage backend. The ingress throughput of the storage backend depends on the data that producers are sending directly to the broker plus the replication traffic the broker is receiving from its peers. With tiered storage, Amazon MSK also uses network attached storage to read and upload rolled segments to the remote tier. This doesn’t come from the page cache and needs to be accounted for at the rate of ingress. Any non-tip consumers at ingress rate also consume network attached storage throughput and are accounted for in the equation. Therefore, max throughput is bounded by network attached storage based on the following equation:

max(tcluster) <= min {

max(tstorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tNetworkAttachedStorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups)

}

Derivation of throughput limit from EC2 network bottleneck

Unlike network attached storage, the network is full duplex, meaning that if the EC2 instance supports X MB/s network, it supports X MB/s in and X MB/s out. The network throughput requirement depends on the data that producers are sending directly to the broker plus the replication traffic the broker is receiving from its peers. It also includes the replication traffic out and consumers traffic out from this broker. With tiered storage, we need to reserve additional ingress rate for uploads to the remote tier and support reads from the remote tier for consumer groups reading from remote offset. Both of these add to the network out requirements, which is bounded by the following equation:

max(tcluster) <= min {

max(tEC2network) * #brokers/(#tip_consumer_groups + r + #remote_consumer_groups)

}

Combining the second and third equations provides the first formula, which determines the max throughput bound based on broker infrastructure limits.

How to apply this formula to size your cluster

With this formula, you can calculate the upper bound for throughput you can achieve for your workloads. In practice, the workloads may be bottlenecked by other broker resources like CPU, memory, and disk, so it’s important to do load tests. To simplify your sizing estimate, you can use the MSK Sizing and Pricing spreadsheet (for more information, refer to Best Practices).

Let’s consider a workload where your ingress and egress rates are 20MB/s, with a replication factor of 3, and you want to retain data in your Kafka cluster for 7 days. This workload requires 6x m5.large brokers, with 34.6 TB local storage, which will cost $6,034.00 monthly (estimated). But if you use tiered storage for the same workload with local retention of 4 hours and overall data retention of 7 days, it requires 3x m5.large brokers, with 0.8 TB local storage and 12 TB of tiered storage, which will cost $1,958.00 monthly(estimated). If you want to read all the historic data one time, it will cost $17.00 ($0.0015 per GB retrieval cost). In this example with tiered storage, you save around 67.6% of your overall cost.

We recommend planning for Availability Zone redundancy in production workloads considering the broker safety factor in the calculation, which is 1 in this example. We also recommend running performance tests to ensure CPU is less than 70% on your brokers at the target throughput derived based on this formula or Excel calculation. In addition, you should also use the per-broker partition limit in your calculation to account for other bottlenecks based on the partition count.

The following figure shows an example of Amazon MSK sizing.

Monitoring and continuous optimization for a tiered storage enabled cluster

In previous sections, we emphasized the importance of determining the correct initial cluster size. However, it’s essential to recognize that sizing efforts shouldn’t cease after the initial setup. Continual monitoring and evaluation of your workload are necessary to ensure that the broker size remains appropriate. Amazon MSK offers metric monitoring and alarm capabilities to provide visibility into cluster performance. In the post Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost, we discussed key metrics to focus on. In this post, we delve deeper into additional metrics related to tiered storage and other optimization considerations for a tiered storage enabled cluster:

  • TotalTierBytesLag indicates the total number of bytes of the data that is eligible for tiering on the broker and hasn’t been transferred to the tiered storage yet. This metric shows the efficiency of upstream data transfer. As the lag increases, the amount of data that hasn’t yet persisted in the tiered storage increases. The impact is the network attached storage disk may fill up, which you should monitor. You should also monitor this metric and generate an alarm if the lag is continuously growing and you see increased network attached storage usage. If the tiering lag is too high, you can reduce ingress traffic to allow the tiered storage to catch up.
  • Although tiered storage provides on-demand, virtually unlimited storage capacity without provisioning any additional resources, you should still do proper capacity planning for your local storage, configure alerts for KafkaDataLogsDiskUsed metrics, and have a buffer on network attached storage capacity planning. Monitor this metric and generate an alarm if the metric reaches or exceeds 60%. For a tiered storage enabled topic, configure local retention accordingly to reduce network attached storage usage.
  • The theoretical max ingress we can achieve on an MSK cluster with tiered storage is 20–25% lower than a non-tiered storage enabled cluster due to additional network attached storage bandwidth required to transparently move data from the local to the remote tier. Plan for the capacity (brokers, storage, gp2 vs. gp3) using the formula we discussed to derive max ingress for your cluster based on the number of consumer groups and load test your workloads to identify the sustained throughput limit. Exercising excess ingress to the cluster or egress from the remote tier above the planned capacity can impact your tip produce or consume traffic.
  • The gp3 volume type offers SSD-performance at a 20% lower cost per GB than gp2 volumes. Furthermore, by decoupling storage performance from capacity, you can easily provision higher IOPS and throughput without the need to provision additional block storage capacity. Therefore, we recommend using gp3 for a tiered storage enabled cluster by specifying provisioned throughput for larger instance types.
  • If you specified a custom cluster configuration, check the num.replica.fetchers, num.io.threads, and num.network.threads configuration parameters on your cluster. We recommend leaving it as the default Amazon MSK configuration unless you have specific use case.

This is only the most relevant guidance related to tiered storage. For further guidance on monitoring and best practices of your cluster, refer to Best practices.

Conclusion

You should now have a solid understanding of how Amazon MSK tiered storage works and the best practices to consider for your production workload when utilizing this cost-effective storage tier. With tiered storage, we remove the compute and storage coupling, which can benefit workloads that need larger disk capacity and are underutilizing compute just to provision storage.

We are eager to learn about your current approach in building real-time data streaming applications. If you’re starting your journey with Amazon MSK tiered storage, we suggest following the comprehensive Getting Started guide available in Tiered storage. This guide provides detailed instructions and practical steps to help you gain hands-on experience and effectively take advantage of the benefits of tiered storage for your streaming applications.

If you have any questions or feedback, please leave them in the comments section.


About the authors

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for realtime tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

AWS Glue streaming application to process Amazon MSK data using AWS Glue Schema Registry

Post Syndicated from Vivekanand Tiwari original https://aws.amazon.com/blogs/big-data/aws-glue-streaming-application-to-process-amazon-msk-data-using-aws-glue-schema-registry/

Organizations across the world are increasingly relying on streaming data, and there is a growing need for real-time data analytics, considering the growing velocity and volume of data being collected. This data can come from a diverse range of sources, including Internet of Things (IoT) devices, user applications, and logging and telemetry information from applications, to name a few. By harnessing the power of streaming data, organizations are able to stay ahead of real-time events and make quick, informed decisions. With the ability to monitor and respond to real-time events, organizations are better equipped to capitalize on opportunities and mitigate risks as they arise.

One notable trend in the streaming solutions market is the widespread use of Apache Kafka for data ingestion and Apache Spark for streaming processing across industries. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data.

However, as data volume and velocity grow, organizations may need to enrich their data with additional information from multiple sources, leading to a constantly evolving schema. The AWS Glue Schema Registry addresses this complexity by providing a centralized platform for discovering, managing, and evolving schemas from diverse streaming data sources. Acting as a bridge between producer and consumer apps, it enforces the schema, reduces the data footprint in transit, and safeguards against malformed data.

To process data effectively, we turn to AWS Glue, a serverless data integration service that provides an Apache Spark-based engine and offers seamless integration with numerous data sources. AWS Glue is an ideal solution for running stream consumer applications, discovering, extracting, transforming, loading, and integrating data from multiple sources.

This post explores how to use a combination of Amazon MSK, the AWS Glue Schema Registry, AWS Glue streaming ETL jobs, and Amazon Simple Storage Service (Amazon S3) to create a robust and reliable real-time data processing platform.

Overview of solution

In this streaming architecture, the initial phase involves registering a schema with the AWS Glue Schema Registry. This schema defines the data being streamed while providing essential details like columns and data types, and a table is created in the AWS Glue Data Catalog based on this schema. This schema serves as a single source of truth for producer and consumer and you can leverage the schema evolution feature of AWS Glue Schema Registry to keep it consistent as the data changes over time. Refer appendix section for more information on this feature. The producer application is able to retrieve the schema from the Schema Registry, and uses it to serialize the records into the Avro format and ingest the data into an MSK cluster. This serialization ensures that the records are properly structured and ready for processing.

Next, an AWS Glue streaming ETL (extract, transform, and load) job is set up to process the incoming data. This job extracts data from the Kafka topics, deserializes it using the schema information from the Data Catalog table, and loads it into Amazon S3. It’s important to note that the schema in the Data Catalog table serves as the source of truth for the AWS Glue streaming job. Therefore, it’s crucial to keep the schema definition in the Schema Registry and the Data Catalog table in sync. Failure to do so may result in the AWS Glue job being unable to properly deserialize records, leading to null values. To avoid this, it’s recommended to use a data quality check mechanism to identify such anomalies and take appropriate action in case of unexpected behavior. The ETL job continuously consumes data from the Kafka topics, so it’s always up to date with the latest streaming data. Additionally, the job employs checkpointing, which keeps track of the processed records and allows it to resume processing from where it left off in the event of a restart. For more information about checkpointing, see the appendix at the end of this post.

After the processed data is stored in Amazon S3, we create an AWS Glue crawler to create a Data Catalog table that acts as a metadata layer for the data. The table can be queried using Amazon Athena, a serverless, interactive query service that enables running SQL-like queries on data stored in Amazon S3.

The following diagram illustrates our solution architecture.

architecture diagram

For this post, we are creating the solution resources in the us-east-1 region using AWS CloudFormation templates. In the following sections, we will show you how to configure your resources and implement the solution.

Prerequisites

Create and download a valid key to SSH into an Amazon Elastic Compute Cloud (Amazon EC2) instance from your local machine. For instructions, see Create a key pair using Amazon EC2.

Configure resources with AWS CloudFormation

To create your solution resources, complete the following steps:

  1. Launch the stack vpc-subnet-and-mskclient using the CloudFormation template vpc-subnet-and-mskclient.template. This stack creates an Amazon VPC, private and public subnets, security groups, interface endpoints, an S3 bucket, an AWS Secrets Manager secret, and an EC2 instance.
    launch stack 1
  2. Provide parameter values as listed in the following table.

    Parameters Description
    EnvironmentName Environment name that is prefixed to resource names.
    VpcCIDR IP range (CIDR notation) for this VPC.
    PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone.
    PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone.
    PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone.
    PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone.
    KeyName Key pair name used to log in to the EC2 instance.
    SshAllowedCidr CIDR block for allowing SSH connection to the instance. Check your public IP using http://checkip.amazonaws.com/ and add /32 at the end of the IP address.
    InstanceType Instance type for the EC2 instance.
  3. When stack creation is complete, retrieve the EC2 instance PublicDNS and S3 bucket name (for key BucketNameForScript) from the stack’s Outputs tab.Cloudformation stack 1 - output
  4. Log in to the EC2 instance using the key pair you created as a prerequisite.
  5. Clone the GitHub repository, and upload the ETL script from the glue_job_script folder to the S3 bucket created by the CloudFormation template:
    $ git clone https://github.com/aws-samples/aws-glue-msk-with-schema-registry.git 
    $ cd aws-glue-msk-with-schema-registry 
    $ aws s3 cp glue_job_script/mskprocessing.py s3://{BucketNameForScript}/

  6. Launch another stack amazon-msk-and-glue using template amazon-msk-and-glue.template. This stack creates an MSK cluster, schema registry, schema definition, database, table, AWS Glue crawler, and AWS Glue streaming job.
    launch stack 1
  7. Provide parameter values as listed in the following table.

    Parameters Description Sample value
    EnvironmentName Environment name that is prefixed to resource names. amazon-msk-and-glue
    VpcId ID of the VPC for security group. Use the VPC ID created with the first stack. Refer to the first stack’s output.
    PrivateSubnet1 Subnet used for creating the MSK cluster and AWS Glue connection. Refer to the first stack’s output.
    PrivateSubnet2 Second subnet for the MSK cluster. Refer to the first stack’s output.
    SecretArn Secrets Manager secret ARN for Amazon MSK SASL/SCRAM authentication. Refer to the first stack’s output.
    SecurityGroupForGlueConnection Security group used by the AWS Glue connection. Refer to the first stack’s output.
    AvailabilityZoneOfPrivateSubnet1 Availability Zone for the first private subnet used for the AWS Glue connection.
    SchemaRegistryName Name of the AWS Glue schema registry. test-schema-registry
    MSKSchemaName Name of the schema. test_payload_schema
    GlueDataBaseName Name of the AWS Glue Data Catalog database. test_glue_database
    GlueTableName Name of the AWS Glue Data Catalog table. output
    ScriptPath AWS Glue ETL script absolute S3 path. For example, s3://bucket-name/mskprocessing.py. Use the target S3 path from the previous steps.
    GlueWorkerType Worker type for AWS Glue job. For example, Standard, G.1X, G.2X, G.025X. G.1X
    NumberOfWorkers Number of workers in the AWS Glue job. 5
    S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glue-msk-output-{accId}-{region}
    TopicName MSK topic name that needs to be processed. test

    The stack creation process can take around 15–20 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

    Cloudformation stack 2 output

    The following table summarizes the resources that are created as a part of this post.

    Logical ID Type
    VpcEndoint AWS::EC2::VPCEndpoint
    VpcEndoint AWS::EC2::VPCEndpoint
    DefaultPublicRoute AWS::EC2::Route
    EC2InstanceProfile AWS::IAM::InstanceProfile
    EC2Role AWS::IAM::Role
    InternetGateway AWS::EC2::InternetGateway
    InternetGatewayAttachment AWS::EC2::VPCGatewayAttachment
    KafkaClientEC2Instance AWS::EC2::Instance
    KeyAlias AWS::KMS::Alias
    KMSKey AWS::KMS::Key
    KmsVpcEndoint AWS::EC2::VPCEndpoint
    MSKClientMachineSG AWS::EC2::SecurityGroup
    MySecretA AWS::SecretsManager::Secret
    PrivateRouteTable1 AWS::EC2::RouteTable
    PrivateSubnet1 AWS::EC2::Subnet
    PrivateSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PrivateSubnet2 AWS::EC2::Subnet
    PrivateSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PublicRouteTable AWS::EC2::RouteTable
    PublicSubnet1 AWS::EC2::Subnet
    PublicSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PublicSubnet2 AWS::EC2::Subnet
    PublicSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    S3Bucket AWS::S3::Bucket
    S3VpcEndoint AWS::EC2::VPCEndpoint
    SecretManagerVpcEndoint AWS::EC2::VPCEndpoint
    SecurityGroup AWS::EC2::SecurityGroup
    SecurityGroupIngress AWS::EC2::SecurityGroupIngress
    VPC AWS::EC2::VPC
    BootstrapBrokersFunctionLogs AWS::Logs::LogGroup
    GlueCrawler AWS::Glue::Crawler
    GlueDataBase AWS::Glue::Database
    GlueIamRole AWS::IAM::Role
    GlueSchemaRegistry AWS::Glue::Registry
    MSKCluster AWS::MSK::Cluster
    MSKConfiguration AWS::MSK::Configuration
    MSKPayloadSchema AWS::Glue::Schema
    MSKSecurityGroup AWS::EC2::SecurityGroup
    S3BucketForOutput AWS::S3::Bucket
    CleanupResourcesOnDeletion AWS::Lambda::Function
    BootstrapBrokersFunction AWS::Lambda::Function

Build and run the producer application

After successfully creating the CloudFormation stack, you can now proceed with building and running the producer application to publish records on MSK topics from the EC2 instance, as shown in the following code. Detailed instructions including supported arguments and their usage are outlined in the README.md page in the GitHub repository.

$ cd amazon_msk_producer 
$ mvn clean package 
$ BROKERS={OUTPUT_VAL_OF_MSKBootstrapServers – Ref. Step 6}
$ REGISTRY_NAME={VAL_OF_GlueSchemaRegistryName - Ref. Step 6}
$ SCHEMA_NAME={VAL_OF_SchemaName– Ref. Step 6}
$ TOPIC_NAME="test"
$ SECRET_ARN={OUTPUT_VAL_OF_SecretArn – Ref. Step 3}
$ java -jar target/amazon_msk_producer-1.0-SNAPSHOT-jar-with-dependencies.jar -brokers $BROKERS -secretArn $SECRET_ARN -region us-east-1 -registryName $REGISTRY_NAME -schema $SCHEMA_NAME -topic $TOPIC_NAME -numRecords 10

If the records are successfully ingested into the Kafka topics, you may see a log similar to the following screenshot.

kafka log

Grant permissions

Confirm if your AWS Glue Data Catalog is being managed by AWS Lake Formation and grant necessary permissions. To check if Lake Formation is managing the permissions for the newly created tables, we can navigate to the Settings page on the Lake Formation console, or we can use the Lake Formation CLI command get-data-lake-settings.

If the check boxes on the Lake Formation Data Catalog settings page are unselected (see the following screenshot), that means that the Data Catalog permissions are being managed by LakeFormation.

Lakeformation status

If using the Lake Formation CLI, check if the values of CreateDatabaseDefaultPermissions and CreateTableDefaultPermissions are NULL in the output. If so, this confirms that the Data Catalog permissions are being managed by AWS Lake Formation.

If we can confirm that the Data Catalog permissions are being managed by AWS Lake Formation, we have to grant DESCRIBE and CREATE TABLE permissions for the database, and SELECT, ALTER, DESCRIBE and INSERT permissions for the table to the AWS Identity and Access Management role (IAM role) used by AWS Glue streaming ETL job before starting the job. Similarly, we have to grant DESCRIBE permissions for the database and DESCRIBE AND SELECT permissions for the table to the IAM principals using Amazon Athena to query the data. We can get the AWS Glue service IAM role, database, table, streaming job name, and crawler names from the Outputs tab of the CloudFormation stack amazon-msk-and-glue. For instructions on granting permissions via AWS Lake Formation, refer to Granting Data Catalog permissions using the named resource method.

Run the AWS Glue streaming job

To process the data from the MSK topic, complete the following steps:

  1. Retrieve the name of the AWS Glue streaming job from the amazon-msk-and-glue stack output.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Choose the job name to open its details page.
  4. Choose Run job to start the job.

Because this is a streaming job, it will continue to run indefinitely until manually stopped.

Run the AWS Glue crawler

Once AWS Glue streaming job starts processing the data, you can use the following steps to check the processed data, and create a table using AWS Glue Crawler to query it

  1. Retrieve the name of the output bucket S3BucketForOutput from the stack output and validate if output folder has been created and contains data.
  2. Retrieve the name of the Crawler from the stack output.
  3. Navigate to the AWS Glue Console.
  4. In the left pane, select Crawlers.
  5. Run the crawler.

In this post, we run the crawler one time to create the target table for demo purposes. In a typical scenario, you would run the crawler periodically or create or manage the target table another way. For example, you could use the saveAsTable() method in Spark to create the table as part of the ETL job itself, or you could use enableUpdateCatalog=True in the AWS Glue ETL job to enable Data Catalog updates. For more information about this AWS Glue ETL feature, refer to Creating tables, updating the schema, and adding new partitions in the Data Catalog from AWS Glue ETL jobs.

Validate the data in Athena

After the AWS Glue crawler has successfully created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database and table that the crawler created.
  4. Enter a SQL query to validate the data.
  5. Run the query.

The following screenshot shows the output of our example query.

Athena output

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack amazon-msk-and-glue.
  2. Delete the CloudFormation stack vpc-subnet-and-mskclient.

Conclusion

This post provided a solution for building a robust streaming data processing platform using a combination of Amazon MSK, the AWS Glue Schema Registry, an AWS Glue streaming job, and Amazon S3. By following the steps outlined in this post, you can create and control your schema in the Schema Registry, integrate it with a data producer to ingest data into an MSK cluster, set up an AWS Glue streaming job to extract and process data from the cluster using the Schema Registry, store processed data in Amazon S3, and query it using Athena.

Let’s start using AWS Glue Schema Registry to manage schema evolution for streaming data ETL with AWS Glue. If you have any feedback related to this post, please feel free to leave them in the comments section below.

Appendix

This appendix section provides more information about Apache Spark Structured Streaming Checkpointing feature and a brief summary on how schema evolution can be handled using AWS Glue Schema Registry.

Checkpointing

Checkpointing is a mechanism in Spark streaming applications to persist enough information in a durable storage to make the application resilient and fault-tolerant. The items stored in checkpoint locations are mainly the metadata for application configurations and the state of processed offsets. Spark uses synchronous checkpointing, meaning it ensures that the checkpoint state is updated after every micro-batch run. It stores the end offset value of each partition under the offsets folder for the corresponding micro-batch run before processing, and logs the record of processed batches under the commits folder. In the event of a restart, the application can recover from the last successful checkpoint, provided the offset hasn’t expired in the source Kafka topic. If the offset has expired, we have to set the property failOnDataLoss to false so that the streaming query doesn’t fail as a result of this.

Schema evolution

As the schema of data evolves over time, it needs to be incorporated into producer and consumer applications to avert application failure due to data encoding issues. The AWS Glue Schema Registry offers a rich set of options for schema compatibility such as backward, forward, and full to update the schema in the Schema Registry. Refer to Schema versioning and compatibility for the full list.

The default option is backward compatibility, which satisfies the majority of use cases. This option allows you to delete any existing fields and add optional fields. Steps to implement schema evolution using the default compatibility are as follows:

  1. Register the new schema version to update the schema definition in the Schema Registry.
  2. Upon success, update the AWS Glue Data Catalog table using the updated schema.
  3. Restart the AWS Glue streaming job to incorporate the changes in the schema for data processing.
  4. Update the producer application code base to build and publish the records using the new schema, and restart it.

About the Authors

Author Headshot - Vivekanand TiwariVivekanand Tiwari is a Cloud Architect at AWS. He finds joy in assisting customers on their cloud journey, especially in designing and building scalable, secure, and optimized data and analytics workloads on AWS. During his leisure time, he prioritizes spending time with his family.

Author Headshot - Subramanya VajirayaSubramanya Vajiraya is a Sr. Cloud Engineer (ETL) at AWS Sydney specialized in AWS Glue. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. Outside of work, he enjoys going on bike rides and taking long walks with his dog Ollie, a 2-year-old Corgi.

Author Headshot - Akash DeepAkash Deep is a Cloud Engineer (ETL) at AWS with a specialization in AWS Glue. He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS. In his free time, he prioritizes spending quality time with his family.

Deep dive on Amazon MSK tiered storage

Post Syndicated from Nagarjuna Koduru original https://aws.amazon.com/blogs/big-data/deep-dive-on-amazon-msk-tiered-storage/

In the first post of the series, we described some core concepts of Apache Kafka cluster sizing, the best practices for optimizing the performance, and the cost of your Kafka workload.

This post explains how the underlying infrastructure affects Kafka performance when you use Amazon Managed Streaming for Apache Kafka (Amazon MSK) tiered storage. We delve deep into the core components of Amazon MSK tiered storage and address questions such as: How does read and write work in a tiered storage-enabled cluster?

In the subsequent post, we’ll discuss the latency impact, recommended metrics to monitor, and conclude with guidance on key considerations in a production tiered storage-enabled cluster.

How Amazon MSK tiered storage works

To understand the internal architecture of Amazon MSK tiered storage, let’s first discuss some fundamentals of Kafka topics, partitions, and how read and write works.

A logical stream of data in Kafka is referred to as a topic. A topic is broken down into partitions, which are physical entities used to distribute load across multiple server instances (brokers) that serve reads and writes.

A partition––also designated as topic-partition as it’s relative to a given topic––can be replicated, which means there are several copies of the data in the group of brokers forming a cluster. Each copy is called a replica or a log. One of these replicas, called the leader, serves as the reference. It’s where the ingress traffic is accepted for the topic-partition.

A log is an append-only sequence of log segments. Log segments contain Kafka data records, which are added to the end of the log or the active segment.

Log segments are stored as regular files. On the file system, Kafka identifies the file of a log segment by putting the offset of the first data record it contains in its file name. The offset of a record is simply a monotonic index assigned to a record by Kafka when it’s appended to the log. The segment files of a log are stored in a directory dedicated to the associated topic-partition.

When Kafka reads data from an arbitrary offset, it first looks up the segment that contains that offset from the segment file name, then the specific record location inside that file using an offset index. Offset indexes are materialized on a dedicated file stored with segment files in the topic-partition directory. There is also timeindex to seek by timestamp.

For every partition, Kafka also stores a journal of leadership changes in a file called leader-epoch-checkpoint. This file contains mapping of leader epoch to startOffset of the epoch. Whenever a new leader is elected for a partition by the Kafka controller, this data is updated and propagated to all brokers. A leader epoch is a 32-bit, monotonically increasing number representing continuous period of leadership of a single partition. It’s marked on all the Kafka records. The following code is the local storage layout of topic cars and partition 0 containing two segments (0, 35):

$ ls /kafka-cluster/broker-1/data/cars-0/

00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000000035.log
00000000000000000035.index
00000000000000000035.timeindex
leader-epoch-checkpoint

Kafka manages the lifecycle of these segment files. It creates a new one when a new segment needs to be created, for instance, if the current segment reaches its configured max size. It deletes one when the target retention period of the data it contains is reached, or the total maximal size of the log is reached. Data is deleted from the tail of the logs and corresponds to the oldest data of the append-only log of the topic-partition.

KIP-405 or tiered storage in Apache Kafka

The ability to tier data, in other words, transfer data (log, index, timeindex, and leader-epoch-checkpoint) from a local file system to another storage system based on time and size-based retention policies, is a feature built in Apache Kafka as part of KIP-405.

The KIP-405 isn’t in official Kafka version yet. Amazon MSK internally implemented tiered storage functionality on top of official Kafka version 2.8.2. Amazon MSK exposes this functionality on AWS specific 2.8.2.tiered Kafka version. With this feature, you can separate retention settings for local and remote retention. Data in the local tier is retained until the data gets copied to the remote tier even after the local retention expires. Data in the remote tier is retained until the remote retention expires. KIP-405 proposes a pluggable architecture allowing you to plugin custom remote storage and metadata storage backends. The following diagram illustrates the broker three key components.

The components are as follows:

  • RemoteLogManager (RLM) – A new component corresponding to LogManager for the local tier. It delegates copy, fetch, and delete of completed and non-active partition segments to a pluggable RemoteStorageManager implementation and maintains respective remote log segment metadata through pluggable RemoteLogMetadataManager implementation.
  • RemoteStorageManager (RSM) – A pluggable interface that provides the lifecycle of remote log segments.
  • RemoteLogMetadataManager (RLMM) – A pluggable interface that provides the lifecycle of metadata about remote log segments.

How data is moved to the remote tier for a tiered storage-enabled topic

In a tiered storage-enabled topic, each completed segment for a topic-partition triggers the leader of the partition to copy the data to the remote storage tier. The completed log segment is removed from the local disks when Amazon MSK finishes moving that log segment to the remote tier and after it meets the local retention policy. This frees up local storage space.

Let’s consider a hypothetical scenario: you have a topic with one partition. Prior to enabling tiered storage for this topic, there are three log segments. One of the segments is active and receiving data, and the other two segments are complete.

After you enable tiered storage for this topic with two days of local retention and five days of overall retention, Amazon MSK copies log segment 1 and 2 to tiered storage. Amazon MSK also retains the primary storage copy of segments 1 and 2. The active segment 3 isn’t eligible to copy over to tiered storage yet. In this timeline, none of the retention settings are applied yet for any of the messages in segment 1 and segment 2.

After 2 days, the primary retention settings take effect for the segment 1 and segment 2 that Amazon MSK copied to the tiered storage. Segments 1 and 2 now expire from the local storage. Active segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s an active segment.

After 5 days, overall retention settings take effect, and Amazon MSK clears log segments 1 and 2 from tiered storage. Segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s active.

That’s how the data lifecycle works on a tiered storage-enabled cluster.

Amazon MSK immediately starts moving data to tiered storage as soon as a segment is closed. The local disks are freed up when Amazon MSK finishes moving that log segment to remote tier and after it meets the local retention policy.

How read works in a tiered storage-enabled topic

For any read request, ReplicaManager tries to process the request by sending it to ReadFromLocalLog. And if the process returns offset out of range exception, it delegates the read call to RemoteLogManager to read from tiered storage. On the read path, the RemoteStorageManager starts fetching the data in chunks from remote storage, which means that for the first few bytes, your consumer experiences higher latency, but as the system starts buffering the segment locally, your consumer experiences latency similar to reading from local storage. One of the advantages of this approach is that the data is served instantly from the local buffer if there are multiple consumers reading from the same segment.

If your consumer is configured to read from the closest replica, there might be a possibility that the consumer from a different consumer group reads the same remote segment using a different broker. In that case, they experience the same latency behavior we described previously.

Conclusion

In this post, we discussed the core components of the Amazon MSK tiered storage feature and explained how the data lifecycle works in a cluster enabled with tiered storage. Stay tuned for our upcoming post, in which we delve into the best practices for sizing and running a tiered storage-enabled cluster in production.

We would love to hear how you’re building your real-time data streaming applications today. If you’re just getting started with Amazon MSK tiered storage, we recommend getting hands-on with the guidelines available in the tiered storage documentation.

If you have any questions or feedback, please leave them in the comments section.


About the authors

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for realtime tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Stream data with Amazon MSK Connect using an open-source JDBC connector

Post Syndicated from Manish Virwani original https://aws.amazon.com/blogs/big-data/stream-data-with-amazon-msk-connect-using-an-open-source-jdbc-connector/

Customers are adopting Amazon Managed Service for Apache Kafka (Amazon MSK) as a fast and reliable streaming platform to build their enterprise data hub. In addition to streaming capabilities, setting up Amazon MSK enables organizations to use a pub/sub model for data distribution with loosely coupled and independent components.

To publish and distribute the data between Apache Kafka clusters and other external systems including search indexes, databases, and file systems, you’re required to set up Apache Kafka Connect, which is the open-source component of Apache Kafka framework, to host and run connectors for moving data between various systems. As the number of upstream and downstream applications grow, so does the complexity to manage, scale, and administer the Apache Kafka Connect clusters. To address these scalability and manageability concerns, Amazon MSK Connect provides the functionality to deploy fully managed connectors built for Apache Kafka Connect, with the capability to automatically scale to adjust with the workload changes and pay only for the resources consumed.

In this post, we walk through a solution to stream data from Amazon Relation Database Service (Amazon RDS) for MySQL to an MSK cluster in real time by configuring and deploying a connector using Amazon MSK Connect.

Solution overview

For our use case, an enterprise wants to build a centralized data repository that has multiple producer and consumer applications. To support streaming data from applications with different tools and technologies, Amazon MSK is selected as the streaming platform. One of the primary applications that currently writes data to Amazon RDS for MySQL would require major design changes to publish data to MSK topics and write to the database at the same time. Therefore, to minimize the design changes, this application would continue writing the data to Amazon RDS for MySQL with the additional requirement to synchronize this data with the centralized streaming platform Amazon MSK to enable real-time analytics for multiple downstream consumers.

To solve this use case, we propose the following architecture that uses Amazon MSK Connect, a feature of Amazon MSK, to set up a fully managed Apache Kafka Connect connector for moving data from Amazon RDS for MySQL to an MSK cluster using the open-source JDBC connector from Confluent.

Set up the AWS environment

To set up this solution, you need to create a few AWS resources. The AWS CloudFormation template provided in this post creates all the AWS resources required as prerequisites:

The following table lists the parameters you must provide for the template.

Parameter Name Description Keep Default Value
Stack name Name of CloudFormation stack. No
DBInstanceID Name of RDS for MySQL instance. No
DBName Database name to store sample data for streaming. Yes
DBInstanceClass Instance type for RDS for MySQL instance. No
DBAllocatedStorage Allocated size for DB instance (GiB). No
DBUsername Database user for MySQL database access. No
DBPassword Password for MySQL database access. No
JDBCConnectorPluginBukcetName Bucket for storing MSK Connect connector JAR files and plugin. No
ClientIPCIDR IP address of client machine to connect to EC2 instance. No
EC2KeyPair Key pair to be used in your EC2 instance. This EC2 instance will be used as a proxy to connect from your local machine to the EC2 client instance. No
EC2ClientImageId Latest AMI ID of Amazon Linux 2. You can keep the default value for this post. Yes
VpcCIDR IP range (CIDR notation) for this VPC. No
PrivateSubnetOneCIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. No
PrivateSubnetTwoCIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. No
PrivateSubnetThreeCIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. No
PublicSubnetCIDR IP range (CIDR notation) for the public subnet. No

To launch the CloudFormation stack, choose Launch Stack:

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the resource details.

Validate sample data in the RDS for MySQL instance

To prepare the sample data for this use case, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>

  2. Run the following commands to validate the data has been loaded successfully:
    $ mysql -h <rds_instance_endpoint_name> -u <user_name> -p
    
    MySQL [(none)]> use dms_sample;
    
    MySQL [dms_sample]> select mlb_id, mlb_name, mlb_pos, mlb_team_long, bats, throws from mlb_data limit 5;

Synchronize all tables’ data from Amazon RDS to Amazon MSK

To sync all tables from Amazon RDS to Amazon MSK, create an Amazon MSK Connect managed connector with the following steps:

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane under MSK Connect.
  2. Choose Create custom plugin.
  3. For S3 URI – Custom plugin object, browse to the ZIP file named confluentinc-kafka-connect-jdbc-plugin.zip (created by the CloudFormation template) for the JDBC connector in the S3 bucket bkt-msk-connect-plugins-<aws_account_id>.
  4. For Custom plugin name, enter msk-confluent-jdbc-plugin-v1.
  5. Enter an optional description.
  6. Choose Create custom plugin.

After the custom plugin has been successfully created, it will be available in Active status

  1. Choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the plugin msk-confluent-jdbc-plugin-v1 that you created earlier.
  4. Choose Next.
  5. For Connector name, enter msk-jdbc-connector-rds-to-msk.
  6. Enter an optional description.
  7. For Cluster type, select MSK cluster.
  8. For MSK clusters, select the cluster you created earlier.
  9. For Authentication, choose IAM.
  10. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    ### Provide the configuration properties to connect to source and destination endpoints including authentication
    
    ### mechanism, credentials and task details such as polling interval, source and destination object names, data
    
    ### transfer mode, parallelism
    
    ### Many of these properties are connector and end-point specific, so please review the connector documentation ### for more details
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-
    
    mode=bulk
    
    connection.attempts=1
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

The following table provides a brief summary of all the preceding configuration options.

Configuration Options Description
connector.class JAVA class for the connector
connection.user User name to authenticate with the MySQL endpoint
connection.url JDBC URL identifying the hostname and port number for the MySQL endpoint
connection.password Password to authenticate with the MySQL endpoint
tasks.max Maximum number of tasks to be launched for this connector
poll.interval.ms Time interval in milliseconds between subsequent polls for each table to pull new data
topic.prefix Custom prefix value to append with each table name when creating topics in the MSK cluster
mode The operation mode for each poll, such as bulk, timestamp, incrementing, or timestamp+incrementing
connection.attempts Maximum number of retries for JDBC connection
security.protocol Sets up TLS for encryption
sasl.mechanism Identifies the SASL mechanism to use
ssl.truststore.location Location for storing trusted certificates
ssl.keystore.location Location for storing private keys
sasl.client.callback.handler.class Encapsulates constructing a SigV4 signature based on extracted credentials
sasl.jaas.config Binds the SASL client implementation

  1. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  2. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  3. For Worker configuration, select Use the MSK default configuration.
  4. Under Access permissions, choose the custom IAM role msk-connect-rds-jdbc-MSKConnectServiceIAMRole-* created earlier.
  5. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  6. For Log group, choose the log group msk-jdbc-source-connector created earlier.
  7. Choose Next.
  8. Under Review and Create, validate all the settings and choose Create connector.

After the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

Validate the data

To validate and compare the data, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>

  2. To connect to the MSK cluster with IAM authentication, enter the latest version of the aws-msk-iam-auth JAR file in the class path:
    $ export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar

  3. On the Amazon MSK console, choose Clusters in the navigation pane and choose the cluster MSKConnect-msk-connect-rds-jdbc.
  4. On the Cluster summary page, choose View client information.
  5. In the View client information section, under Bootstrap servers, copy the private endpoint for Authentication type IAM.

  1. Set up additional environment variables for working with the latest version of Apache Kafka installation and connecting to Amazon MSK bootstrap servers, where <bootstrap servers> is the list of bootstrap servers that allow connecting to the MSK cluster with IAM authentication:
    $ export PATH=~/kafka/bin:$PATH
    
    $ cp ~/aws-msk-iam-auth-1.1.0-all.jar ~/kafka/libs/.
    
    $ export BOOTSTRAP_SERVERS=<bootstrap servers>

  2. Set up a config file named client/properties to be used for authentication:
    $ cd /home/ec2-user/kafka/config/
    
    $ vi client.properties
    
    # Sets up TLS for encryption and SASL for authN.
    
    security.protocol = SASL_SSL
    
    # Identifies the SASL mechanism to use.
    
    sasl.mechanism = AWS_MSK_IAM
    
    # Binds SASL client implementation.
    
    sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
    
    # Encapsulates constructing a SigV4 signature based on extracted credentials.
    
    # The SASL client bound by "sasl.jaas.config" invokes this class.
    
    sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

  3. Validate the list of topics created in the MSK cluster:
    $ cd /home/ec2-user/kafka/
    
    $ bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVERS --command-config /home/ec2-user/kafka/config/client.properties

  1. Validate that data has been loaded to the topics in the MSK cluster:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-seat --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Synchronize data using a query to Amazon RDS and write to Amazon MSK

To synchronize the results of a query that flattens data by joining multiple tables in Amazon RDS for MySQL, create an Amazon MSK Connect managed connector with the following steps:

  1. On Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the pluginmsk-confluent-jdbc-plugin-v1.
  4. For Connector name, enter msk-jdbc-connector-rds-to-msk-query.
  5. Enter an optional description.
  6. For Cluster type, select MSK cluster.
  7. For MSK clusters, select the cluster you created earlier.
  8. For Authentication, choose IAM.
  9. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-query-topic
    
    mode=bulk
    
    connection.attempts=1
    
    query=select last_name, name as team_name, sport_type_name, sport_league_short_name, sport_division_short_name from dms_sample.sport_team join dms_sample.player on player.sport_team_id = sport_team.id;
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

  10. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  11. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  12. For Worker configuration, select Use the MSK default configuration.
  13. Under Access permissions, choose the custom IAM role role_msk_connect_serivce_exec_custom.
  14. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  15. For Log group, choose the log group created earlier.
  16. Choose Next.
  17. Under Review and Create, validate all the settings and choose Create connector.

Once the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

  1. For data validation, SSH into the EC2 client instance MSKEC2Client and run the following command to see the data in the topic:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-query-topic --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Clean up

To clean up your resources and avoid ongoing charges, complete the following the steps:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Select the connectors you created and choose Delete.
  3. On the Amazon S3 console, choose Buckets in the navigation pane.
  4. Search for the bucket with the naming convention bkt-msk-connect-plugins-<aws_account_id>.
  5. Delete all the folders and objects in this bucket.
  6. Delete the bucket after all contents have been removed.
  7. To delete all other resources created using the CloudFormation stack, delete the stack via the AWS CloudFormation console.

Conclusion

Amazon MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we saw how to set up the open-source JDBC connector from Confluent to stream data between an RDS for MySQL instance and an MSK cluster. We also explored different options to synchronize all the tables as well as use the query-based approach to stream denormalized data into the MSK topics.

For more information about Amazon MSK Connect, see Getting started using MSK Connect.


About the Authors

Manish Virwani is a Sr. Solutions Architect at AWS. He has more than a decade of experience designing and implementing large-scale big data and analytics solutions. He provides technical guidance, design advice, and thought leadership to some of the key AWS customers and partners.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

How Zoom implemented streaming log ingestion and efficient GDPR deletes using Apache Hudi on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/how-zoom-implemented-streaming-log-ingestion-and-efficient-gdpr-deletes-using-apache-hudi-on-amazon-emr/

In today’s digital age, logging is a critical aspect of application development and management, but efficiently managing logs while complying with data protection regulations can be a significant challenge. Zoom, in collaboration with the AWS Data Lab team, developed an innovative architecture to overcome these challenges and streamline their logging and record deletion processes. In this post, we explore the architecture and the benefits it provides for Zoom and its users.

Application log challenges: Data management and compliance

Application logs are an essential component of any application; they provide valuable information about the usage and performance of the system. These logs are used for a variety of purposes, such as debugging, auditing, performance monitoring, business intelligence, system maintenance, and security. However, although these application logs are necessary for maintaining and improving the application, they also pose an interesting challenge. These application logs may contain personally identifiable data, such as user names, email addresses, IP addresses, and browsing history, which creates a data privacy concern.

Laws such as the General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA) require organizations to retain application logs for a specific period of time. The exact length of time required for data storage varies depending on the specific regulation and the type of data being stored. The reason for these data retention periods is to ensure that companies aren’t keeping personal data longer than necessary, which could increase the risk of data breaches and other security incidents. This also helps ensure that companies aren’t using personal data for purposes other than those for which it was collected, which could be a violation of privacy laws. These laws also give individuals the right to request the deletion of their personal data, also known as the “right to be forgotten.” Individuals have the right to have their personal data erased, without undue delay.

So, on one hand, organizations need to collect application log data to ensure the proper functioning of their services, and keep the data for a specific period of time. But on the other hand, they may receive requests from individuals to delete their personal data from the logs. This creates a balancing act for organizations because they must comply with both data retention and data deletion requirements.

This issue becomes increasingly challenging for larger organizations that operate in multiple countries and states, because each country and state may have their own rules and regulations regarding data retention and deletion. For example, the Personal Information Protection and Electronic Documents Act (PIPEDA) in Canada and the Australian Privacy Act in Australia are similar laws to GDPR, but they may have different retention periods or different exceptions. Therefore, organizations big or small must navigate this complex landscape of data retention and deletion requirements, while also ensuring that they are in compliance with all applicable laws and regulations.

Zoom’s initial architecture

During the COVID-19 pandemic, the use of Zoom skyrocketed as more and more people were asked to work and attend classes from home. The company had to rapidly scale its services to accommodate the surge and worked with AWS to deploy capacity across most Regions globally. With a sudden increase in the large number of application endpoints, they had to rapidly evolve their log analytics architecture and worked with the AWS Data Lab team to quickly prototype and deploy an architecture for their compliance use case.

At Zoom, the data ingestion throughput and performance needs are very stringent. Data had to be ingested from several thousand application endpoints that produced over 30 million messages every minute, resulting in over 100 TB of log data per day. The existing ingestion pipeline consisted of writing the data to Apache Hadoop HDFS storage through Apache Kafka first and then running daily jobs to move the data to persistent storage. This took several hours while also slowing the ingestion and creating the potential for data loss. Scaling the architecture was also an issue because HDFS data would have to be moved around whenever nodes were added or removed. Furthermore, transactional semantics on billions of records were necessary to help meet compliance-related data delete requests, and the existing architecture of daily batch jobs was operationally inefficient.

It was at this time, through conversations with the AWS account team, that the AWS Data Lab team got involved to assist in building a solution for Zoom’s hyper-scale.

Solution overview

The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data, analytics, artificial intelligence (AI), machine learning (ML), serverless, and container modernization initiatives. The Data Lab has three offerings: the Build Lab, the Design Lab, and Resident Architect. During the Build and Design Labs, AWS Data Lab Solutions Architects and AWS experts supported Zoom specifically by providing prescriptive architectural guidance, sharing best practices, building a working prototype, and removing technical roadblocks to help meet their production needs.

Zoom and the AWS team (collectively referred to as “the team” going forward) identified two major workflows for data ingestion and deletion.

Data ingestion workflow

The following diagram illustrates the data ingestion workflow.

Data Ingestion Workflow

The team needed to quickly populate millions of Kafka messages in the dev/test environment to achieve this. To expedite the process, we (the team) opted to use Amazon Managed Streaming for Apache Kafka (Amazon MSK), which makes it simple to ingest and process streaming data in real time, and we were up and running in under a day.

To generate test data that resembled production data, the AWS Data Lab team created a custom Python script that evenly populated over 1.2 billion messages across several Kafka partitions. To match the production setup in the development account, we had to increase the cloud quota limit via a support ticket.

We used Amazon MSK and the Spark Structured Streaming capability in Amazon EMR to ingest and process the incoming Kafka messages with high throughput and low latency. Specifically, we inserted the data from the source into EMR clusters at a maximum incoming rate of 150 million Kafka messages every 5 minutes, with each Kafka message holding 7–25 log data records.

To store the data, we chose to use Apache Hudi as the table format. We opted for Hudi because it’s an open-source data management framework that provides record-level insert, update, and delete capabilities on top of an immutable storage layer like Amazon Simple Storage Service (Amazon S3). Additionally, Hudi is optimized for handling large datasets and works well with Spark Structured Streaming, which was already being used at Zoom.

After 150 million messages were buffered, we processed the messages using Spark Structured Streaming on Amazon EMR and wrote the data into Amazon S3 in Apache Hudi-compatible format every 5 minutes. We first flattened the message array, creating a single record from the nested array of messages. Then we added a unique key, known as the Hudi record key, to each message. This key allows Hudi to perform record-level insert, update, and delete operations on the data. We also extracted the field values, including the Hudi partition keys, from incoming messages.

This architecture allowed end-users to query the data stored in Amazon S3 using Amazon Athena with the AWS Glue Data Catalog or using Apache Hive and Presto.

Data deletion workflow

The following diagram illustrates the data deletion workflow.

Data Deletion Workflow

Our architecture allowed for efficient data deletions. To help comply with the customer-initiated data retention policy for GDPR deletes, scheduled jobs ran daily to identify the data to be deleted in batch mode.

We then spun up a transient EMR cluster to run the GDPR upsert job to delete the records. The data was stored in Amazon S3 in Hudi format, and Hudi’s built-in index allowed us to efficiently delete records using bloom filters and file ranges. Because only those files that contained the record keys needed to be read and rewritten, it only took about 1–2 minutes to delete 1,000 records out of the 1 billion records, which had previously taken hours to complete as entire partitions were read.

Overall, our solution enabled efficient deletion of data, which provided an additional layer of data security that was critical for Zoom, in light of its GDPR requirements.

Architecting to optimize scale, performance, and cost

In this section, we share the following strategies Zoom took to optimize scale, performance, and cost:

  • Optimizing ingestion
  • Optimizing throughput and Amazon EMR utilization
  • Decoupling ingestion and GDPR deletion using EMRFS
  • Efficient deletes with Apache Hudi
  • Optimizing for low-latency reads with Apache Hudi
  • Monitoring

Optimizing ingestion

To keep the storage in Kafka lean and optimal, as well as to get a real-time view of data, we created a Spark job to read incoming Kafka messages in batches of 150 million messages and wrote to Amazon S3 in Hudi-compatible format every 5 minutes. Even during the initial stages of the iteration, when we hadn’t started scaling and tuning yet, we were able to successfully load all Kafka messages consistently under 2.5 minutes using the Amazon EMR runtime for Apache Spark.

Optimizing throughput and Amazon EMR utilization

We launched a cost-optimized EMR cluster and switched from uniform instance groups to using EMR instance fleets. We chose instance fleets because we needed the flexibility to use Spot Instances for task nodes and wanted to diversify the risk of running out of capacity for a specific instance type in our Availability Zone.

We started experimenting with test runs by first changing the number of Kafka partitions from 400 to 1,000, and then changing the number of task nodes and instance types. Based on the results of the run, the AWS team came up with the recommendation to use Amazon EMR with three core nodes (r5.16xlarge (64 vCPUs each)) and 18 task nodes using Spot fleet instances (a combination of r5.16xlarge (64 vCPUs), r5.12xlarge (48 vCPUs), r5.8xlarge (32 vCPUs)). These recommendations helped Zoom to reduce their Amazon EMR costs by more than 80% while meeting their desired performance goals of ingesting 150 million Kafka messages under 5 minutes.

Decoupling ingestion and GDPR deletion using EMRFS

A well-known benefit of separation of storage and compute is that you can scale the two independently. But a not-so-obvious advantage is that you can decouple continuous workloads from sporadic workloads. Previously data was stored in HDFS. Resource-intensive GDPR delete jobs and data movement jobs would compete for resources with the stream ingestion, causing a backlog of more than 5 hours in upstream Kafka clusters, which was close to filling up the Kafka storage (which only had 6 hours of data retention) and potentially causing data loss. Offloading data from HDFS to Amazon S3 allowed us the freedom to launch independent transient EMR clusters on demand to perform data deletion, helping to ensure that the ongoing data ingestion from Kafka into Amazon EMR is not starved for resources. This enabled the system to ingest data every 5 minutes and complete each Spark Streaming read in 2–3 minutes. Another side effect of using EMRFS is a cost-optimized cluster, because we removed reliance on Amazon Elastic Block Store (Amazon EBS) volumes for over 300 TB storage that was used for three copies (including two replicas) of HDFS data. We now pay for only one copy of the data in Amazon S3, which provides 11 9s of durability and is relatively inexpensive storage.

Efficient deletes with Apache Hudi

What about the conflict between ingest writes and GDPR deletes when running concurrently? This is where the power of Apache Hudi stands out.

Apache Hudi provides a table format for data lakes with transactional semantics that enables the separation of ingestion workloads and updates when run concurrently. The system was able to consistently delete 1,000 records in less than a minute. There were some limitations in concurrent writes in Apache Hudi 0.7.0, but the Amazon EMR team quickly addressed this by back-porting Apache Hudi 0.8.0, which supports optimistic concurrency control, to the current (at the time of the AWS Data Lab collaboration) Amazon EMR 6.4 release. This saved time in testing and allowed for a quick transition to the new version with minimal testing. This enabled us to query the data directly using Athena quickly without having to spin up a cluster to run ad hoc queries, as well as to query the data using Presto, Trino, and Hive. The decoupling of the storage and compute layers provided the flexibility to not only query data across different EMR clusters, but also delete data using a completely independent transient cluster.

Optimizing for low-latency reads with Apache Hudi

To optimize for low-latency reads with Apache Hudi, we needed to address the issue of too many small files being created within Amazon S3 due to the continuous streaming of data into the data lake.

We utilized Apache Hudi’s features to tune file sizes for optimal querying. Specifically, we reduced the degree of parallelism in Hudi from the default value of 1,500 to a lower number. Parallelism refers to the number of threads used to write data to Hudi; by reducing it, we were able to create larger files that were more optimal for querying.

Because we needed to optimize for high-volume streaming ingestion, we chose to implement the merge on read table type (instead of copy on write) for our workload. This table type allowed us to quickly ingest the incoming data into delta files in row format (Avro) and asynchronously compact the delta files into columnar Parquet files for fast reads. To do this, we ran the Hudi compaction job in the background. Compaction is the process of merging row-based delta files to produce new versions of columnar files. Because the compaction job would use additional compute resources, we adjusted the degree of parallelism for insertion to a lower value of 1,000 to account for the additional resource usage. This adjustment allowed us to create larger files without sacrificing performance throughput.

Overall, our approach to optimizing for low-latency reads with Apache Hudi allowed us to better manage file sizes and improve the overall performance of our data lake.

Monitoring

The team monitored MSK clusters with Prometheus (an open-source monitoring tool). Additionally, we showcased how to monitor Spark streaming jobs using Amazon CloudWatch metrics. For more information, refer to Monitor Spark streaming applications on Amazon EMR.

Outcomes

The collaboration between Zoom and the AWS Data Lab demonstrated significant improvements in data ingestion, processing, storage, and deletion using an architecture with Amazon EMR and Apache Hudi. One key benefit of the architecture was a reduction in infrastructure costs, which was achieved through the use of cloud-native technologies and the efficient management of data storage. Another benefit was an improvement in data management capabilities.

We showed that the costs of EMR clusters can be reduced by about 82% while bringing the storage costs down by about 90% compared to the prior HDFS-based architecture. All of this while making the data available in the data lake within 5 minutes of ingestion from the source. We also demonstrated that data deletions from a data lake containing multiple petabytes of data can be performed much more efficiently. With our optimized approach, we were able to delete approximately 1,000 records in just 1–2 minutes, as compared to the previously required 3 hours or more.

Conclusion

In conclusion, the log analytics process, which involves collecting, processing, storing, analyzing, and deleting log data from various sources such as servers, applications, and devices, is critical to aid organizations in working to meet their service resiliency, security, performance monitoring, troubleshooting, and compliance needs, such as GDPR.

This post shared what Zoom and the AWS Data Lab team have accomplished together to solve critical data pipeline challenges, and Zoom has extended the solution further to optimize extract, transform, and load (ETL) jobs and resource efficiency. However, you can also use the architecture patterns presented here to quickly build cost-effective and scalable solutions for other use cases. Please reach out to your AWS team for more information or contact Sales.


About the Authors

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects focused on underprivileged Children’s education.

Chandra DhandapaniChandra Dhandapani is a Senior Solutions Architect at AWS, where he specializes in creating solutions for customers in Analytics, AI/ML, and Databases. He has a lot of experience in building and scaling applications across different industries including Healthcare and Fintech. Outside of work, he is an avid traveler and enjoys sports, reading, and entertainment.

Amit Kumar Agrawal is a Senior Solutions Architect at AWS, based out of San Francisco Bay Area. He works with large strategic ISV customers to architect cloud solutions that address their business challenges. During his free time he enjoys exploring the outdoors with his family.

Viral Shah is a Analytics Sales Specialist working with AWS for 5 years helping customers to be successful in their data journey. He has over 20+ years of experience working with enterprise customers and startups, primarily in the data and database space. He loves to travel and spend quality time with his family.

How SOCAR handles large IoT data with Amazon MSK and Amazon ElastiCache for Redis

Post Syndicated from Younggu Yun original https://aws.amazon.com/blogs/big-data/how-socar-handles-large-iot-data-with-amazon-msk-and-amazon-elasticache-for-redis/

This is a guest blog post co-written with SangSu Park and JaeHong Ahn from SOCAR. 

As companies continue to expand their digital footprint, the importance of real-time data processing and analysis cannot be overstated. The ability to quickly measure and draw insights from data is critical in today’s business landscape, where rapid decision-making is key. With this capability, businesses can stay ahead of the curve and develop new initiatives that drive success.

This post is a continuation of How SOCAR built a streaming data pipeline to process IoT data for real-time analytics and control. In this post, we provide a detailed overview of streaming messages with Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon ElastiCache for Redis, covering technical aspects and design considerations that are essential for achieving optimal results.

SOCAR is the leading Korean mobility company with strong competitiveness in car-sharing. SOCAR wanted to design and build a solution for a new Fleet Management System (FMS). This system involves the collection, processing, storage, and analysis of Internet of Things (IoT) streaming data from various vehicle devices, as well as historical operational data such as location, speed, fuel level, and component status.

This post demonstrates a solution for SOCAR’s production application that allows them to load streaming data from Amazon MSK into ElastiCache for Redis, optimizing the speed and efficiency of their data processing pipeline. We also discuss the key features, considerations, and design of the solution.

Background

SOCAR operates about 20,000 cars and is planning to include other large vehicle types such as commercial vehicles and courier trucks. SOCAR has deployed in-car devices that capture data using AWS IoT Core. This data was then stored in Amazon Relational Database Service (Amazon RDS). The challenge with this approach included inefficient performance and high resource usage. Therefore, SOCAR looked for purpose-built databases tailored to the needs of their application and usage patterns while meeting the future requirements of SOCAR’s business and technical requirements. The key requirements for SOCAR included achieving maximum performance for real-time data analytics, which required storing data in an in-memory data store.

After careful consideration, ElastiCache for Redis was selected as the optimal solution due to its ability to handle complex data aggregation rules with ease. One of the challenges faced was loading data from Amazon MSK into the database, because there was no built-in Kafka connector and consumer available for this task. This post focuses on the development of a Kafka consumer application that was designed to tackle this challenge by enabling performant data loading from Amazon MSK to Redis.

Solution overview

Extracting valuable insights from streaming data can be a challenge for businesses with diverse use cases and workloads. That’s why SOCAR built a solution to seamlessly bring data from Amazon MSK into multiple purpose-built databases, while also empowering users to transform data as needed. With fully managed Apache Kafka, Amazon MSK provides a reliable and efficient platform for ingesting and processing real-time data.

The following figure shows an example of the data flow at SOCAR.

solution overview

This architecture consists of three components:

  • Streaming data – Amazon MSK serves as a scalable and reliable platform for streaming data, capable of receiving and storing messages from a variety of sources, including AWS IoT Core, with messages organized into multiple topics and partitions
  • Consumer application – With a consumer application, users can seamlessly bring data from Amazon MSK into a target database or data storage while also defining data transformation rules as needed
  • Target databases – With the consumer application, the SOCAR team was able to load data from Amazon MSK into two separate databases, each serving a specific workload

Although this post focuses on a specific use case with ElastiCache for Redis as the target database and a single topic called gps, the consumer application we describe can handle additional topics and messages, as well as different streaming sources and target databases such as Amazon DynamoDB. Our post covers the most important aspects of the consumer application, including its features and components, design considerations, and a detailed guide to the code implementation.

Components of the consumer application

The consumer application comprises three main parts that work together to consume, transform, and load messages from Amazon MSK into a target database. The following diagram shows an example of data transformations in the handler component.

consumer-application

The details of each component are as follows:

  • Consumer – This consumes messages from Amazon MSK and then forwards the messages to a downstream handler.
  • Loader – This is where users specify a target database. For example, SOCAR’s target databases include ElastiCache for Redis and DynamoDB.
  • Handler – This is where users can apply data transformation rules to the incoming messages before loading them into a target database.

Features of the consumer application

This connection has three features:

  • Scalability – This solution is designed to be scalable, ensuring that the consumer application can handle an increasing volume of data and accommodate additional applications in the future. For instance, SOCAR sought to develop a solution capable of handling not only the current data from approximately 20,000 vehicles but also a larger volume of messages as the business and data continue to grow rapidly.
  • Performance – With this consumer application, users can achieve consistent performance, even as the volume of source messages and target databases increases. The application supports multithreading, allowing for concurrent data processing, and can handle unexpected spikes in data volume by easily increasing compute resources.
  • Flexibility – This consumer application can be reused for any new topics without having to build the entire consumer application again. The consumer application can be used to ingest new messages with different configuration values in the handler. SOCAR deployed multiple handlers to ingest many different messages. Also, this consumer application allows users to add additional target locations. For example, SOCAR initially developed a solution for ElastiCache for Redis and then replicated the consumer application for DynamoDB.

Design considerations of the consumer application

Note the following design considerations for the consumer application:

  • Scale out – A key design principle of this solution is scalability. To achieve this, the consumer application runs with Amazon Elastic Kubernetes Service (Amazon EKS) because it can allow users to increase and replicate consumer applications easily.
  • Consumption patterns – To receive, store, and consume data efficiently, it’s important to design Kafka topics depending on messages and consumption patterns. Depending on messages consumed at the end, messages can be received into multiple topics of different schemas. For example, SOCAR has many different topics that are consumed by different workloads.
  • Purpose-built database – The consumer application supports loading data into multiple target options based on the specific use case. For example, SOCAR stored real-time IoT data in ElastiCache for Redis to power real-time dashboard and web applications, while storing recent trip information in DynamoDB that didn’t require real-time processing.

Walkthrough overview

The producer of this solution is AWS IoT Core, which sends out messages into a topic called gps. The target database of this solution is ElastiCache for Redis. ElastiCache for Redis a fast in-memory data store that provides sub-millisecond latency to power internet-scale, real-time applications. Built on open-source Redis and compatible with the Redis APIs, ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with the manageability, security, and scalability from Amazon to power the most demanding real-time applications.

The target location can be either another database or storage depending on the use case and workload. SOCAR uses Amazon EKS to operate the containerized solution to achieve scalability, performance, and flexibility. Amazon EKS is a managed Kubernetes service to run Kubernetes in the AWS Cloud. Amazon EKS automatically manages the availability and scalability of the Kubernetes control plane nodes responsible for scheduling containers, managing application availability, storing cluster data, and other key tasks.

For the programming language, the SOCAR team decided to use the Go Programming language, utilizing both the AWS SDK for Go and a Goroutine, a lightweight logical or virtual thread managed by the Go runtime, which makes it easy to manage multiple threads. The AWS SDK for Go simplifies the use of AWS services by providing a set of libraries that are consistent and familiar for Go developers.

In the following sections, we walk through the steps to implement the solution:

  1. Create a consumer.
  2. Create a loader.
  3. Create a handler.
  4. Build a consumer application with the consumer, loader, and handler.
  5. Deploy the consumer application.

Prerequisites

For this walkthrough, you should have the following:

Create a consumer

In this example, we use a topic called gps, and the consumer includes a Kafka client that receives messages from the topic. SOCAR created a struct and built a consumer (called NewConsumer in the code) to make it extendable. With this approach, any additional parameters and rules can be added easily.

To authenticate with Amazon MSK, SOCAR uses IAM. Because SOCAR already uses IAM to authenticate other resources, such as Amazon EKS, it uses the same IAM role (aws_msk_iam_v2) to authenticate clients for both Amazon MSK and Apache Kafka actions.

The following code creates the consumer:

type Consumer struct {
	logger      *zerolog.Logger
	kafkaReader *kafka.Reader
}

func NewConsumer(logger *zerolog.Logger, awsCfg aws.Config, brokers []string, consumerGroupID, topic string) *Consumer {
	return &Consumer{
		logger: logger,
		kafkaReader: kafka.NewReader(kafka.ReaderConfig{
			Dialer: &kafka.Dialer{
				TLS:           &tls.Config{MinVersion: tls.VersionTLS12},
				Timeout:       10 * time.Second,
				DualStack:     true,
				SASLMechanism: aws_msk_iam_v2.NewMechanism(awsCfg),
			},
			Brokers:     brokers, //
			GroupID:     consumerGroupID, //
			Topic:       topic, //
			StartOffset: kafka.LastOffset, //
		}),
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.message, error) {
	return consumer.kafkaReader.Readmessage(ctx)
}

Create a loader

The loader function, represented by the Loader struct, is responsible for loading messages to the target location, which in this case is ElastiCache for Redis. The NewLoader function initializes a new instance of the Loader struct with a logger and a Redis cluster client, which is used to communicate with the ElastiCache cluster. The redis.NewClusterClient object is initialized using the NewRedisClient function, which uses IAM to authenticate the client for Redis actions. This ensures secure and authorized access to the ElastiCache cluster. The Loader struct also contains the Close method to close the Kafka reader and free up resources.

The following code creates a loader:

type Loader struct {
	logger      *zerolog.Logger
	redisClient *redis.ClusterClient
}

func NewLoader(logger *zerolog.Logger, redisClient *redis.ClusterClient) *Loader {
	return &Loader{
		logger:      logger,
		redisClient: redisClient,
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.Message, error) {
	return consumer.kafkaReader.ReadMessage(ctx)
}

func NewRedisClient(ctx context.Context, awsCfg aws.Config, addrs []string, replicationGroupID, username string) (*redis.ClusterClient, error) {
	redisClient := redis.NewClusterClient(&redis.ClusterOptions{
		NewClient: func(opt *redis.Options) *redis.Client {
			return redis.NewClient(&redis.Options{
				Addr: opt.Addr,
				CredentialsProvider: func() (username string, password string) {
					token, err := BuildRedisIAMAuthToken(ctx, awsCfg, replicationGroupID, opt.Username)
					if err != nil {
						panic(err)
					}
					return opt.Username, token
				},
				PoolSize:    opt.PoolSize,
				PoolTimeout: opt.PoolTimeout,
				TLSConfig:   &tls.Config{InsecureSkipVerify: true},
			})
		},
		Addrs:       addrs,
		Username:    username,
		PoolSize:    100,
		PoolTimeout: 1 * time.Minute,
	})
	pong, err := redisClient.Ping(ctx).Result()
	if err != nil {
		return nil, err
	}
	if pong != "PONG" {
		return nil, fmt.Errorf("failed to verify connection to redis server")
	}
	return redisClient, nil
}

Create a handler

A handler is used to include business rules and data transformation logic that prepares data before loading it into the target location. It acts as a bridge between a consumer and a loader. In this example, the topic name is cars.gps.json, and the message includes two keys, lng and lat, with data type Float64. The business logic can be defined in a function like handlerFuncGpsToRedis and then applied as follows:

type (
	handlerFunc    func(ctx context.Context, loader *Loader, key, value []byte) error
	handlerFuncMap map[string]handlerFunc
)

var HandlerRedis = handlerFuncMap{
	"cars.gps.json":   handlerFuncGpsToRedis
}

func GetHandlerFunc(funcMap handlerFuncMap, topic string) (handlerFunc, error) {
	handlerFunc, exist := funcMap[topic]
	if !exist {
		return nil, fmt.Errorf("failed to find handler func for '%s'", topic)
	}
	return handlerFunc, nil
}

func handlerFuncGpsToRedis(ctx context.Context, loader *Loader, key, value []byte) error {
	// unmarshal raw data to map
	data := map[string]interface{}{}
	err := json.Unmarshal(value, &data)
	if err != nil {
		return err
	}

	// prepare things to load on redis as geolocation
	name := string(key)
	lng, err := getFloat64ValueFromMap(data, "lng")
	if err != nil {
		return err
	}
	lat, err := getFloat64ValueFromMap(data, "lat")
	if err != nil {
		return err
	}

	// add geolocation to redis
	return loader.RedisGeoAdd(ctx, "cars#gps", name, lng, lat)
}

Build a consumer application with the consumer, loader, and handler

Now you have created the consumer, loader, and handler. The next step is to build a consumer application using them. In a consumer application, you read messages from your stream with a consumer, transform them using a handler, and then load transformed messages into a target location with a loader. These three components are parameterized in a consumer application function such as the one shown in the following code:

type Connector struct {
	ctx    context.Context
	logger *zerolog.Logger

	consumer *Consumer
	handler  handlerFuncMap
	loader   *Loader
}

func NewConnector(ctx context.Context, logger *zerolog.Logger, consumer *Consumer, handler handlerFuncMap, loader *Loader) *Connector {
	return &Connector{
		ctx:    ctx,
		logger: logger,

		consumer: consumer,
		handler:  handler,
		loader:   loader,
	}
}

func (connector *Connector) Close() error {
	var err error = nil
	if connector.consumer != nil {
		err = connector.consumer.Close()
	}
	if connector.loader != nil {
		err = connector.loader.Close()
	}
	return err
}

func (connector *Connector) Run() error {
	wg := sync.WaitGroup{}
	defer wg.Wait()
	handlerFunc, err := GetHandlerFunc(connector.handler, connector.consumer.kafkaReader.Config().Topic)
	if err != nil {
		return err
	}
	for {
		msg, err := connector.consumer.Consume(connector.ctx)
		if err != nil {
			if errors.Is(context.Canceled, err) {
				break
			}
		}

		wg.Add(1)
		go func(key, value []byte) {
			defer wg.Done()
			err = handlerFunc(connector.ctx, connector.loader, key, value)
			if err != nil {
				connector.logger.Err(err).Msg("")
			}
		}(msg.Key, msg.Value)
	}
	return nil
}

Deploy the consumer application

To achieve maximum parallelism, SOCAR containerizes the consumer application and deploys it into multiple pods on Amazon EKS. Each consumer application contains a unique consumer, loader, and handler. For example, if you need to receive messages from a single topic with five partitions, you can deploy five identical consumer applications, each running in its own pod. Similarly, if you have two topics with three partitions each, you should deploy two consumer applications, resulting in a total of six pods. It’s a best practice to run one consumer application per topic, and the number of pods should match the number of partitions to enable concurrent message processing. The pod number can be specified in the Kubernetes deployment configuration

There are two stages in the Dockerfile. The first stage is the builder, which installs build tools and dependencies, and builds the application. The second stage is the runner, which uses a smaller base image (Alpine) and copies only the necessary files from the builder stage. It also sets the appropriate user permissions and runs the application. It’s also worth noting that the builder stage uses a specific version of the Golang image, while the runner stage uses a specific version of the Alpine image, both of which are considered to be lightweight and secure images.

The following code is an example of the Dockerfile:

# builder
FROM golang:1.18.2-alpine3.16 AS builder
RUN apk add build-base
WORKDIR /usr/src/app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o connector .

# runner
FROM alpine:3.16.0 AS runner
WORKDIR /usr/bin/app
RUN apk add --no-cache tzdata
RUN addgroup --system app && adduser --system --shell /bin/false --ingroup app app
COPY --from=builder /usr/src/app/connector .
RUN chown -R app:app /usr/bin/app
USER app
ENTRYPOINT ["/usr/bin/app/connector"]

Conclusion

In this post, we discussed SOCAR’s approach to building a consumer application that enables IoT real-time streaming from Amazon MSK to target locations such as ElastiCache for Redis. We hope you found this post informative and useful. Thank you for reading!


About the Authors

SangSu Park is the Head of Operation Group at SOCAR. His passion is to keep learning, embrace challenges, and strive for mutual growth through communication. He loves to travel in search of new cities and places.

jaehongJaeHong Ahn is a DevOps Engineer in SOCAR’s cloud infrastructure team. He is dedicated to promoting collaboration between developers and operators. He enjoys creating DevOps tools and is committed to using his coding abilities to help build a better world. He loves to cook delicious meals as a private chef for his wife.

bdb-2857-youngguYounggu Yun works at AWS Data Lab in Korea. His role involves helping customers across the APAC region meet their business objectives and overcome technical challenges by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.