Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

Converting Apache Kafka events from Avro to JSON using EventBridge Pipes

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/converting-apache-kafka-events-from-avro-to-json-using-eventbridge-pipes/

This post is written by Pascal Vogel, Solutions Architect, and Philipp Klose, Global Solutions Architect.

Event streaming with Apache Kafka has become an important element of modern data-oriented and event-driven architectures (EDAs), unlocking use cases such as real-time analytics of user behavior, anomaly and fraud detection, and Internet of Things event processing. Stream producers and consumers in Kafka often use schema registries to ensure that all components follow agreed-upon event structures when sending (serializing) and processing (deserializing) events to avoid application bugs and crashes.

A common schema format in Kafka is Apache Avro, which supports rich data structures in a compact binary format. To integrate Kafka with other AWS and third-party services more easily, AWS offers Amazon EventBridge Pipes, a serverless point-to-point integration service. However, many downstream services expect JSON-encoded events, requiring custom, and repetitive schema validation and conversion logic from Avro to JSON in each downstream service.

This blog post shows how to reliably consume, validate, convert, and send Avro events from Kafka to AWS and third-party services using EventBridge Pipes, allowing you to reduce custom deserialization logic in downstream services. You can also use EventBridge event buses as targets in Pipes to filter and distribute events from Pipes to multiple targets, including cross-account and cross-Region delivery.

This blog describes two scenarios:

  1. Using Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS Glue Schema Registry.
  2. Using Confluent Cloud and the Confluent Schema Registry.

See the associated GitHub repositories for Glue Schema Registry or Confluent Schema Registry for full source code and detailed deployment instructions.

Kafka event streaming and schema validation on AWS

To build event streaming applications with Kafka on AWS, you can use Amazon MSK, offerings such as Confluent Cloud, or self-hosted Kafka on Amazon Elastic Compute Cloud (Amazon EC2) instances.

To avoid common issues in event streaming and event-driven architectures, such as data inconsistencies and incompatibilities, it is a recommended practice to define and share event schemas between event producers and consumers. In Kafka, schema registries are used to manage, evolve, and enforce schemas for event producers and consumers. The AWS Glue Schema Registry provides a central location to discover, manage, and evolve schemas. In the case of Confluent Cloud, the Confluent Schema Registry serves the same role. Both the Glue Schema Registry and the Confluent Schema Registry support common schema formats such as Avro, Protobuf, and JSON.

To integrate Kafka with AWS services, third-party services, and your own applications, you can use EventBridge Pipes. EventBridge Pipes helps you create point-to-point integrations between event sources and targets with optional filtering, transformation, and enrichment. EventBridge Pipes reduces the amount of integration code that you have to write and maintain when building EDAs.

Many AWS and third-party services expect JSON-encoded payloads (events) as input, meaning they cannot directly consume Avro or Protobuf payloads. To replace repetitive Avro-to-JSON validation and conversion logic in each consumer, you can use the EventBridge Pipes enrichment step. This solution uses an AWS Lambda function in the enrichment step to deserialize and validate Kafka events with a schema registry, including error handling with dead-letter queues, and convert events to JSON before passing them to downstream services.

Solution overview

Architecture overview of the solution

The solution presented in this blog post consists of the following key elements:

  1. The source of the pipe is a Kafka cluster deployed using MSK or Confluent Cloud. EventBridge Pipes reads events from the Kafka stream in batches and sends them to the enrichment function (see here for an example event).
  2. The enrichment step (Lambda function) deserializes and validates the events against the configured schema registry (Glue or Confluent), converts events from Avro to JSON with integrated error handling, and returns them to the pipe.
  3. The target of this example solution is an EventBridge custom event bus that is invoked by EventBridge Pipes with JSON-encoded events returned by the enrichment Lambda function. EventBridge Pipes supports a variety of other targets, including Lambda, AWS Step Functions, Amazon API Gateway, API destinations, and more, enabling you to build EDAs without writing integration code.
  4. In this sample solution, the event bus sends all events to Amazon CloudWatch Logs via an EventBridge rule. You can extend the example to invoke additional EventBridge targets.

Optionally, you can add OpenAPI 3 or JSONSchema Draft 4 schemas for your events in the EventBridge schema registry by either manually generating it from the Avro schema or using EventBridge schema discovery. This allows you to download code bindings for the JSON-converted events for various programming languages, such as JavaScript, Python, and Java, to correctly use them in your EventBridge targets.

The remainder of this blog post describes this solution for the Glue and Confluent schema registries with code examples.

EventBridge Pipes with the Glue Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Glue Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

You need an Amazon MSK serverless cluster running and the Glue Schema registry configured. This example includes a Avro schema and a Glue Schema Registry. See the following AWS blog post for an introduction to schema validation with the Glue Schema Registry: Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry.

EventBridge Pipes configuration

Use the AWS Cloud Development Kit (AWS CDK) template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Amazon MSK Serverless Kafka topic as the source via AWS Identity and Access Management (IAM) authentication.
  2. EventBridge Pipes reads events from your Kafka topic using the Amazon MSK source type.
  3. An enrichment Lambda function in Java to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An Amazon Simple Queue Service (Amazon SQS) dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule sends all incoming events into a CloudWatch Logs log group.

For MSK-based sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

EventBridge Pipes with the Confluent Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Confluent Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

To set up this solution, you need a Kafka stream running on Confluent Cloud as well as the Confluent Schema Registry set up. See the corresponding Schema Registry tutorial for Confluent Cloud to set up a schema registry for your Confluent Kafka stream.

To connect to your Confluent Cloud Kafka cluster, you need an API key for Confluent Cloud and Confluent Schema Registry. AWS Secrets Manager is used to securely store your Confluent secrets.

EventBridge Pipes configuration

Use the AWS CDK template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Confluent Kafka topic as the source via an API secret stored in Secrets Manager.
  2. EventBridge Pipes reads events from your Confluent Kafka topic using the self-managed Apache Kafka stream source type, which includes all non-MSK Kafka clusters.
  3. An enrichment Lambda function in Python to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An SQS dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule writes all incoming events into a CloudWatch Logs log group.

For self-managed Kafka sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

Enrichment Lambda functions

Both of the solutions described previously include an enrichment Lambda function for schema validation and conversion from Avro to JSON.

The Java Lambda function integrates with the Glue Schema Registry using the AWS Glue Schema Registry Library. The Python Lambda function integrates with the Confluent Schema Registry using the confluent-kafka library and uses Powertools for AWS Lambda (Python) to implement Serverless best practices such as logging and tracing.

The enrichment Lambda functions perform the following tasks:

  1. In the events polled from the Kafka stream by the EventBridge pipe, the key and value of the event are base64 encoded. Therefore, for each event in the batch passed to the function, the key and the value are decoded.
  2. The event key is assumed to be serialized by the producer as a string type.
  3. The event value is deserialized using the Glue Schema registry Serde (Java) or the confluent-kafka AvroDeserializer (Python).
  4. The function then returns the successfully converted JSON events to the EventBridge pipe, which then invokes the target for each of them.
  5. Events for which Avro deserialization failed are sent to the SQS dead letter queue.

Conclusion

This blog post shows how to implement event consumption, Avro schema validation, and conversion to JSON using Amazon EventBridge Pipes, Glue Schema Registry, and Confluent Schema Registry.

The source code for the presented example is available in the AWS Samples GitHub repository for Glue Schema Registry and Confluent Schema Registry. For more patterns, visit the Serverless Patterns Collection.

For more serverless learning resources, visit Serverless Land.

Amazon MSK Serverless now supports Kafka clients written in all programming languages

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/amazon-msk-serverless-now-supports-kafka-clients-written-in-all-programming-languages/

Amazon MSK Serverless is a cluster type for Amazon Managed Streaming for Apache Kafka (Amazon MSK) that is the most straightforward way to run Apache Kafka clusters without having to manage compute and storage capacity. With MSK Serverless, you can run your applications without having to provision, configure, or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring and moving them to even load across a cluster.

With today’s launch, MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. Administrators can simplify and standardize access control to Kafka resources using AWS Identity and Access Management (IAM). This support for IAM in Amazon MSK is based on SASL/OUATHBEARER, an open standard for authorization and authentication.

In this post, we show how you can connect your applications to MSK Serverless with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including Java, Python, Go, JavaScript, and .NET. Using IAM authentication and authorization is the preferred choice of many customers because you can secure Kafka resources just like you do with all other AWS services. Additionally, you get all the other benefits of IAM, such as temporary role-based credentials, precisely scoped permission policies, and more. Now you can use MSK Serverless with IAM-based authentication more broadly with the support for multiple languages.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. For example, you can create an IAM user and a policy that allows the user to write to a specific Kafka topic but restricts access to other resources without worrying about managing Kafka ACLs. After you provide the identity policies with the necessary permissions, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. With OAUTHBEARER support, you can build clients that can work across both Amazon MSK and other Kafka environments. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, Amazon MSK provides new code libraries for the following programming languages in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the bootstrap address along with its request to access Apache Kafka resources.
  3. The MSK Serverless cluster decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK Serverless clusters with no additional cost. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. You also need to make sure the security group associated with your MSK Serverless cluster has an inbound rule allowing the traffic from the client applications in the associated VPCs to the TCP port number 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

For this post, we use the Python programming language. We also use https://github.com/dpkp/kafka-python as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for Python with your Kafka client library, run the following command:
    $ pip install aws-msk-iam-sasl-signer-python

  2. Import the installed Amazon MSK IAM SASL signer library in your code:
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import socket
    import time
    from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    class MSKTokenProvider():
            def token(self):
                token, _ = MSKAuthTokenProvider.generate_auth_token('<your aws region>')
                return token

  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your Python Kafka client properties, and pass the token provider in the configuration object:
    tp = MSKTokenProvider()
    
        producer = KafkaProducer(
            bootstrap_servers='<Amazon MSK Serverless bootstrap string>',
            security_protocol='SASL_SSL',
            sasl_mechanism='OAUTHBEARER',
            sasl_oauth_token_provider=tp,
            client_id=”my.kafka.client.unique.id”,
        )

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. You can run your applications without having to configure and manage the infrastructure or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring, and ensures an even balance of partition distribution across brokers in the cluster (auto-balancing).

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Amazon MSK IAM authentication now supports all programming languages

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/amazon-msk-iam-authentication-now-supports-all-programming-languages/

The AWS Identity and Access Management (IAM) authentication feature in Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports all programming languages. Administrators can simplify and standardize access control to Kafka resources using IAM. This support is based on SASL/OUATHBEARER, an open standard for authorization and authentication. Both Amazon MSK provisioned and serverless cluster types support the new Amazon MSK IAM expansion to all programming languages.

In this post, we show how you can connect your applications to MSK clusters with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including JavaPythonGoJavaScript, and .NET. You can also use standard IAM access controls such as temporary role-based credentials and precisely scoped permission policies more broadly with the multiple language support on Amazon MSK.

For clients that need to connect from other VPCs to an MSK cluster, whether in a same or a different AWS account, you can enable multi-VPC private connectivity and cluster policy support. IAM access control via cluster policy helps you manage all access to the cluster and topics in one place. For example, you can control which IAM principals have write access to certain topics, and which principals can only read from them. Users who are using IAM client authentication can also add permissions for required kafka-cluster actions in the cluster resource policy.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. After IAM authentication is enabled for your cluster, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, new code libraries for the following programming languages are available in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the IAM bootstrap broker addresses along with its request to access Apache Kafka resources.
  3. The MSK broker decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK clusters with Kafka version 2.7.1 and above. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Set up the MSK cluster

You need to enable the IAM access control authentication scheme for your MSK provisioned cluster and wait until the cluster finishes updating and turns to the Active state. This is because SASL/OAUTHBEARER uses the same broker addresses for IAM authentication.

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. Next, update your application to use the bootstrap server addresses for IAM authentication. You also need to make sure the security group associated with your MSK cluster has an inbound rule allowing the traffic from the client applications in the same VPC as the cluster to the TCP port 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

In this post, we use the JavaScript programming language. We also use https://github.com/tulios/kafkajs as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for JavaScript with your Kafka client library, run the following command:
    npm install https://github.com/aws/aws-msk-iam-sasl-signer-js

  2. You need to import the installed Amazon MSK IAM SASL signer library in your code:
    const { Kafka } = require('kafkajs')
    
    const { generateAuthToken } = require('aws-msk-iam-sasl-signer-js')

  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    async function oauthBearerTokenProvider(region) {
        // Uses AWS Default Credentials Provider Chain to fetch credentials
        const authTokenResponse = await generateAuthToken({ region });
        return {
            value: authTokenResponse.token
        }
    }

  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your JavaScript Kafka client properties, and pass the token provider in the configuration object:
    const run = async () => {
        const kafka = new Kafka({
            clientId: 'my-app',
            brokers: [bootstrap server addresses for IAM],
            ssl: true,
            sasl: {
                mechanism: 'oauthbearer',
                oauthBearerProvider: () => oauthBearerTokenProvider('us-east-1')
            }
        })
    
        const producer = kafka.producer()
        const consumer = kafka.consumer({ groupId: 'test-group' })
    
        // Producing
        await producer.connect()
        await producer.send({
            topic: 'test-topic',
            messages: [
                { value: 'Hello KafkaJS user!' },
            ],
        })
    
        // Consuming
        await consumer.connect()
        await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
    
        await consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                console.log({
                    partition,
                    offset: message.offset,
                    value: message.value.toString(),
                })
            },
        })
    }
    
    run().catch(console.error)

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

IAM access control for Amazon MSK enables you to handle both authentication and authorization for your MSK cluster. This eliminates the need to use one mechanism for authentication and another for authorization. For example, when a client tries to write to your cluster, Amazon MSK uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

With today’s launch, Amazon MSK IAM authentication now supports all programming languages. This means you can connect your applications in all languages without worrying about implementing separate authentication and authorization mechanisms. For workloads that require Amazon MSK multi-VPC private connectivity and cluster policy support, you can now simplify connectivity to your MSK brokers and manage all access to the cluster and topics in one place that is your cluster policy.

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Scaling improvements when processing Apache Kafka with AWS Lambda

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/scaling-improvements-when-processing-apache-kafka-with-aws-lambda/

AWS Lambda is improving the automatic scaling behavior when processing data from Apache Kafka event-sources. Lambda is increasing the default number of initial consumers, improving how quickly consumers scale up, and helping to ensure that consumers don’t scale down too quickly. There is no additional action that you must take, and there is no additional cost.

Running Kafka on AWS

Apache Kafka is a popular open-source platform for building real-time streaming data pipelines and applications. You can deploy and manage your own Kafka solution on-premises or in the cloud on Amazon EC2.

Amazon Managed Streaming for Apache Kafka (MSK) is a fully managed service that makes it easier to build and run applications that use Kafka to process streaming data. MSK Serverless is a cluster type for Amazon MSK that allows you to run Kafka without having to manage and scale cluster capacity. It automatically provisions and scales capacity while managing the partitions in your topic, so you can stream data without thinking about right-sizing or scaling clusters. MSK Serverless offers a throughput-based pricing model, so you pay only for what you use. For more information, see Using Kafka to build your streaming application.

Using Lambda to consume records from Kafka

Processing streaming data can be complex in traditional, server-based architectures, especially if you must react in real-time. Many organizations spend significant time and cost managing and scaling their streaming platforms. In order to react fast, they must provision for peak capacity, which adds complexity.

Lambda and serverless architectures remove the undifferentiated heavy lifting when processing Kafka streams. You don’t have to manage infrastructure, can reduce operational overhead, lower costs, and scale on-demand. This helps you focus more on building streaming applications. You can write Lambda functions in a number of programming languages, which provide flexibility when processing streaming data.

Lambda event source mapping

Lambda can integrate natively with your Kafka environments as a consumer to process stream data as soon as it’s generated.

To consume streaming data from Kafka, you configure an event source mapping (ESM) on your Lambda functions. This is a resource managed by the Lambda service, which is separate from your function. It continually polls records from the topics in the Kafka cluster. The ESM optionally filters records and batches them into a payload. Then, it calls the Lambda Invoke API to deliver the payload to your Lambda function synchronously for processing.

As Lambda manages the pollers, you don’t need to manage a fleet of consumers across multiple teams. Each team can create and configure their own ESM with Lambda handling the polling.

AWS Lambda event source mapping

AWS Lambda event source mapping

For more information on using Lambda to process Kafka streams, see the learning guide.

Scaling and throughput

Kafka uses partitions to increase throughput and spread the load of records to all brokers in a cluster.

The Lambda event source mapping resource includes pollers and processors. Pollers have consumers that read records from Kafka partitions. The poller assigners send them to processors which batch the records and invoke your function.

When you create a Kafka event source mapping, Lambda allocates consumers to process all partitions in the Kafka topic. Previously, Lambda allocated a minimum of one processor for a consumer.

Lambda previous initial scaling

Lambda previous initial scaling

With these scaling improvements, Lambda allocates multiple processors to improve processing. This reduces the possibility of a single invoke slowing down the entire processing stream.

Lambda updated initial scaling

Lambda updated initial scaling

Each consumer sends records to multiple processors running in parallel to handle increased workloads. Records in each partition are only assigned to a single processor to maintain order.

Lambda automatically scales up or down the number of consumers and processors based on workload. Lambda samples the consumer offset lag of all the partitions in the topic every minute. If the lag is increasing, this means Lambda can’t keep up with processing the records from the partition.

The scaling algorithm accounts for the current offset lag, and also the rate of new messages added to the topic. Lambda can reach the maximum number of consumers within three minutes to lower the offset lag as quickly as possible. Lambda is also reducing the scale down behavior to ensure records are processed more quickly and latency is reduced, particularly for bursty workloads.

Total processors for all pollers can only scale up to the total number of partitions in the topic.

After successful invokes, the poller periodically commits offsets to the respective brokers.

Lambda further scaling

Lambda further scaling

You can monitor the throughput of your Kafka topic using consumer metrics consumer_lag and consumer_offset.

To check how many function invocations occur in parallel, you can also monitor the concurrency metrics for your function. The concurrency is approximately equal to the total number of processors across all pollers, depending on processor activity. For example, if three pollers have five processors running for a given ESM, the function concurrency would be approximately 15 (5 + 5 + 5).

Seeing the improved scaling in action

There are a number of Serverless Patterns that you can use to process Kafka streams using Lambda. To set up Amazon MSK Serverless, follow the instructions in the GitHub repo:

  1. Create an example Amazon MSK Serverless topic with 1000 partitions.
  2. ./kafka-topics.sh --create --bootstrap-server "{bootstrap-server}" --command-config client.properties --replication-factor 3 --partitions 1000 --topic msk-1000p
  3. Add records to the topic using a UUID as a key to distribute records evenly across partitions. This example adds 13 million records.
  4. for x in {1..13000000}; do echo $(uuidgen -r),message_$x; done | ./kafka-console-producer.sh --broker-list "{bootstrap-server}" --topic msk-1000p --producer.config client.properties --property parse.key=true --property key.separator=, --producer-property acks=all
  5. Create a Python function based on this pattern, which logs the processed records.
  6. Amend the function code to insert a delay of 0.1 seconds to simulate record processing.
  7. import json
    import base64
    import time
    
    def lambda_handler(event, context):
        # Define a variable to keep track of the number of the message in the batch of messages
        i=1
        # Looping through the map for each key (combination of topic and partition)
        for record in event['records']:
            for messages in event['records'][record]:
                print("********************")
                print("Record number: " + str(i))
                print("Topic: " + str(messages['topic']))
                print("Partition: " + str(messages['partition']))
                print("Offset: " + str(messages['offset']))
                print("Timestamp: " + str(messages['timestamp']))
                print("TimestampType: " + str(messages['timestampType']))
                if None is not messages.get('key'):
                    b64decodedKey=base64.b64decode(messages['key'])
                    decodedKey=b64decodedKey.decode('ascii')
                else:
                    decodedKey="null"
                if None is not messages.get('value'):
                    b64decodedValue=base64.b64decode(messages['value'])
                    decodedValue=b64decodedValue.decode('ascii')
                else:
                    decodedValue="null"
                print("Key = " + str(decodedKey))
                print("Value = " + str(decodedValue))
                i=i+1
                time.sleep(0.1)
        return {
            'statusCode': 200,
        }
    
  8. Configure the ESM to point to the previously created cluster and topic.
  9. Use the default batch size of 100. Set the StartingPosition to TRIM_HORIZON to process from the beginning of the stream.
  10. Deploy the function, which also adds and configures the ESM.
  11. View the Amazon CloudWatch ConcurrentExecutions and OffsetLag metrics to view the processing.

With the scaling improvements, once the ESM is configured, the ESM and function scale up to handle the number of partitions.

Lambda automatic scaling improvement graph

Lambda automatic scaling improvement graph

Increasing data processing throughput

It is important that your function can keep pace with the rate of traffic. A growing offset lag means that the function processing cannot keep up. If the age is high in relation to the stream’s retention period, you can lose data as records expire from the stream.

This value should generally not exceed 50% of the stream’s retention period. When the value reaches 100% of the stream retention period, data is lost. One temporary solution is to increase the retention time of the stream. This gives you more time to resolve the issue before losing data.

There are several ways to improve processing throughput.

  1. Avoid processing unnecessary records by using content filtering to control which records Lambda sends to your function. This helps reduce traffic to your function, simplifies code, and reduces overall cost.
  2. Lambda allocates processors across all pollers based on the number of partitions up to a maximum of one concurrent Lambda function per partition. You can increase the number of processing Lambda functions by increasing the number of partitions.
  3. For compute intensive functions, you can increase the memory allocated to your function, which also increases the amount of virtual CPU available. This can help reduce the duration of a processing function.
  4. Lambda polls Kafka with a configurable batch size of records. You can increase the batch size to process more records in a single invocation. This can improve processing time and reduce costs, particularly if your function has an increased init time. A larger batch size increases the latency to process the first record in the batch, but potentially decreases the latency to process the last record in the batch. There is a tradeoff between cost and latency when optimizing a partition’s capacity and the decision depends on the needs of your workload.
  5. Ensure that your producers evenly distribute records across partitions using an effective partition key strategy. A workload is unbalanced when a single key dominates other keys, creating a hot partition, which impacts throughput.

See Increasing data processing throughput for some additional guidance.

Conclusion

Today, AWS Lambda is improving the automatic scaling behavior when processing data from Apache Kafka event-sources. Lambda is increasing the default number of initial consumers, improving how quickly they scale up, and ensuring they don’t scale down too quickly. There is no additional action that you must take, and there is no additional cost.

You can explore the scaling improvements with your existing workloads or deploy an Amazon MSK cluster and try one of the patterns to measure processing time.

To explore using Lambda to process Kafka streams, see the learning guide.

For more serverless learning resources, visit Serverless Land.

Let’s Architect! Designing systems for stream data processing

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-designing-systems-for-stream-data-processing/

From customer interactions on e-commerce platforms to social media trends and from sensor data in internet of things (IoT) devices to financial market updates, streaming data encompasses a vast array of information. This ability to handle real-time flow often distinguishes successful organizations from their competitors. Harnessing the potential of streaming data processing offers organizations an opportunity to stay at the forefront of their industries, make data-informed decisions with unprecedented agility, and gain invaluable insights into customer behavior and operational efficiency.

AWS provides a foundation for building robust and reliable data pipelines that efficiently transport streaming data, eliminating the intricacies of infrastructure management. This shift empowers engineers to focus their talents and energies on creating business value, rather than consuming their time for managing infrastructure.

Build Modern Data Streaming Architectures on AWS

In a world of exploding data, traditional on-premises analytics struggle to scale and become cost-prohibitive. Modern data architecture on AWS offers a solution. It lets organizations easily access, analyze, and break down data silos, all while ensuring data security. This empowers real-time insights and versatile applications, from live dashboards to data lakes and warehouses, transforming the way we harness data.

This whitepaper guides you through implementing this architecture, focusing on streaming technologies. It simplifies data collection, management, and analysis, offering three movement patterns to glean insights from near real-time data using AWS’s tailored analytics services. The future of data analytics has arrived.

Take me to this whitepaper!

A serverless streaming data pipeline using Amazon Kinesis and AWS Glue

A serverless streaming data pipeline using Amazon Kinesis and AWS Glue

Lab: Streaming Data Analytics

In this workshop, you’ll see how to process data in real-time, using streaming and micro-batching technologies in the context of anomaly detection. You will also learn how to integrate Apache Kafka on Amazon Managed Streaming for Apache Kafka (Amazon MSK) with an Apache Flink consumer to process and aggregate the events for reporting purposes.

Take me to this workshop

A cloud architecture used for ingestion and stream processing on AWS

A cloud architecture used for ingestion and stream processing on AWS

Publishing real-time financial data feeds using Kafka

Streaming architectures built on Apache Kafka follow the publish/subscribe paradigm: producers publish events to topics via a write operation and the consumers read the events.

This video describes how to offer a real-time financial data feed as a service on AWS. By using Amazon MSK, you can work with Kafka to allow consumers to subscribe to message topics containing the data of interest. The sessions drills down into the best design practices for working with Kafka and the techniques for establishing hybrid connectivity for working at a global scale.

Take me to this video

The topics in Apache Kafka are partitioned for better scaling and replicated for resiliency

The topics in Apache Kafka are partitioned for better scaling and replicated for resiliency

How Samsung modernized architecture for real-time analytics

The Samsung SmartThings story is a compelling case study in how businesses can modernize and optimize their streaming data analytics, relieve the burden of infrastructure management, and embrace a future of real-time insights. After Samsung migrated to Amazon Managed Service for Apache Flink, the development team’s focus shifted from the tedium of infrastructure upkeep to the realm of delivering tangible business value. This change enabled them to harness the full potential of a fully managed stream-processing platform.

Take me to this video

The architecture Samsung used in their real-time analytics system

The architecture Samsung used in their real-time analytics system

See you next time!

Thanks for reading! Next time, we’ll talk about tools for developers. To find all the posts from this series, check the Let’s Architect! page.

AWS Weekly Roundup – CodeWhisperer, CodeCatalyst, RDS, Route53, and more – October 24, 2023

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-codewhisperer-codecatalyst-rds-route53-and-more-october-23-2023/

The entire AWS News Blog team is fully focused on writing posts to announce the new services and features during our annual customer conference in Las Vegas, AWS re:Invent! And while we prepare content for you to read, our services teams continue to innovate. Here is my summary of last week’s launches.

Last week’s launches
Here are some of the launches that captured my attention:

Amazon CodeCatalystYou can now add a cron expression to trigger a CI/CD workflow, providing a way to start workflows at set times. CodeCatalyst is a unified development service that integrates a project’s collaboration tools, CI/CD pipelines, and development and deployment environments.

Amazon Route53You can now route your customer’s traffic to their closest AWS Local Zones to improve application performance for latency-sensitive workloads. Learn more about geoproximity routing in the Route53 documentation.

Amazon RDS – The root certificates we use to sign your databases’ TLS certificates will expire in 2024. You must generate new certificates for your databases before the expiration date. This blog post details the procedure step by step. The new root certificates we generated are valid for the next 40 years for RSA2048 and 100 years for the RSA4098 and ECC384. It is likely this is the last time in your professional career that you are obliged to renew your database certificates for AWS.

Amazon MSK – Replicating Kafka clusters at scale is difficult and often involves managing the infrastructure and the replication solution by yourself. We launched Amazon MSK Replicator, a fully managed replication solution for your Kafka clusters, in the same or across multiple AWS Regions.

Amazon CodeWhisperer – We launched a preview for an upcoming capability of Amazon CodeWhisperer Professional. You can now train CodeWhisperer on your private code base. It allows you to give your organization’s developers more relevant suggestions to better assist them in their day-to-day coding against your organization’s private libraries and frameworks.

Amazon EC2The seventh generation of memory-optimized EC2 instances is available (R7i). These instances use the 4th Generation Intel Xeon Scalable Processors (Sapphire Rapids). This family of instances provides up to 192 vCPU and 1,536 GB of memory. They are well-suited for memory-intensive applications such as in-memory databases or caches.

X in Y – We launched existing services and instance types in additional Regions:

Other AWS news
Here are some other blog posts and news items that you might like:

The Community.AWS blog has new posts to teach you how to integrate Amazon Bedrock inside your Java and Go applications, and my colleague Brooke wrote a survival guide for re:Invent first-timers.

The Official AWS Podcast – Listen each week for updates on the latest AWS news and deep dives into exciting use cases. There are also official AWS podcasts in several languages. Check out the ones in FrenchGermanItalian, and Spanish.

Some other great sources of AWS news include:

Upcoming AWS events
Check your calendars and sign up for these AWS events:

AWS Community DayAWS Community Days – Join a community-led conference run by AWS user group leaders in your region: Jaipur (November 4), Vadodara (November 4), and Brasil (November 4).

AWS Innovate: Every Application Edition – Join our free online conference to explore cutting-edge ways to enhance security and reliability, optimize performance on a budget, speed up application development, and revolutionize your applications with generative AI. Register for AWS Innovate Online Asia Pacific & Japan on October 26.

AWS re:Invent 2023AWS re:Invent (November 27 – December 1) – Join us to hear the latest from AWS, learn from experts, and connect with the global cloud community. Browse the session catalog and attendee guides and check out the re:Invent highlights for generative AI.

You can browse all upcoming in-person and virtual events.

And that’s all for me today. I’ll go back writing my re:Invent blog posts.

Check back next Monday for another Weekly Roundup!

— seb

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Resolve private DNS hostnames for Amazon MSK Connect

Post Syndicated from Amar Surjit original https://aws.amazon.com/blogs/big-data/resolve-private-dns-hostnames-for-amazon-msk-connect/

Amazon MSK Connect is a feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that offers a fully managed Apache Kafka Connect environment on AWS. With MSK Connect, you can deploy fully managed connectors built for Kafka Connect that move data into or pull data from popular data stores like Amazon S3 and Amazon OpenSearch Service. With the introduction of the Private DNS support into MSK Connect, connectors are able to resolve private customer domain names, using their DNS servers configured in the customer VPC DHCP Options set. This post demonstrates a solution for resolving private DNS hostnames defined in a customer VPC for MSK Connect.

You may want to use private DNS hostname support for MSK Connect for multiple reasons. Before the private DNS resolution capability included with MSK Connect, it used the service VPC DNS resolver for DNS resolution. MSK Connect didn’t use the private DNS servers defined in the customer VPC DHCP option sets for DNS resolution. The connectors were only able to reference hostnames in the connector configuration or plugin that are publicly resolvable and couldn’t resolve private hostnames defined in either a private hosted zone or use DNS servers in another customer network.

Many customers ensure that their internal DNS applications are not publicly resolvable. For example, you might have a MySQL or PostgreSQL database and may not want the DNS name for your database to be publicly resolvable or accessible. Amazon Relational Database Service (Amazon RDS) or Amazon Aurora servers have DNS names that are publicly resolvable but not accessible. You can have multiple internal applications such as databases, data warehouses, or other systems where DNS names are not publicly resolvable.

With the recent launch of MSK Connect private DNS support, you can configure connectors to reference public or private domain names. Connectors use the DNS servers configured in your VPC’s DHCP option set to resolve domain names. You can now use MSK Connect to privately connect with databases, data warehouses, and other resources in your VPC to comply with your security needs.

If you have a MySQL or PostgreSQL database with private DNS, you can configure it on a custom DNS server and configure the VPC-specific DHCP option set to do the DNS resolution using the custom DNS server local to the VPC instead of using the service DNS resolution.

Solution overview

A customer can have different architecture options to set up their MSK Connect. For example, they can have Amazon MSK and MSK Connect are in the same VPC or source system in VPC1 and Amazon MSK and MSK Connect are in VPC2 or source system, Amazon MSK and MSK Connect are all in different VPCs.

The following setup uses two different VPCs, where the MySQL VPC hosts the MySQL database and the MSK VPC hosts Amazon MSK, MSK Connect, the DNS server, and various other components. You can extend this architecture to support other deployment topologies using appropriate AWS Identity and Access Management (IAM) permissions and connectivity options.

This post provides step-by-step instructions to set up MSK Connect where it will receive data from a source MySQL database with private DNS hostname in the MySQL VPC and send data to Amazon MSK using MSK Connect in another VPC. The following diagram illustrates the high-level architecture.

The setup instructions include the following key steps:

  1. Set up the VPCs, subnets, and other core infrastructure components.
  2. Install and configure the DNS server.
  3. Upload the data to the MySQL database.
  4. Deploy Amazon MSK and MSK Connect and consume the change data capture (CDC) records.

Prerequisites

To follow the tutorial in this post, you need the following:

Create the required infrastructure using AWS CloudFormation

Before configuring the MSK Connect, we need to set up the VPCs, subnets, and other core infrastructure components. To set up resources in your AWS account, complete the following steps:

  1. Choose Launch Stack to launch the stack in a Region that supports Amazon MSK and MSK Connect.
  2. Specify the private key that you use to connect to the EC2 instances.
  3. Update the SSH location with your local IP address and keep the other values as default.
  4. Choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create stack and wait for the required resources to get created.

The CloudFormation template creates the following key resources in your account:

  • VPCs:
    • MSK VPC
    • MySQL VPC
  • Subnets in the MSK VPC:
    • Three private subnets for Amazon MSK
    • Private subnet for DNS server
    • Private subnet for MSKClient
    • Public subnet for bastion host
  • Subnets in the MySQL VPC:
    • Private subnet for MySQL database
    • Public subnet for bastion host
  • Internet gateway attached to the MySQL VPC and MSK VPC
  • NAT gateways attached to MySQL public subnet and MSK public subnet
  • Route tables to support the traffic flow between different subnets in a VPC and across VPCs
  • Peering connection between the MySQL VPC and MSK VPC
  • MySQL database and configurations
  • DNS server
  • MSK client with respective libraries

Please note, if you’re using VPC peering or AWS Transit Gateway with MSK Connect, don’t configure your connector for reaching the peered VPC resources with IPs in the CIDR ranges. For more information, refer to Connecting from connectors.

Configure the DNS server

Complete the following steps to configure the DNS server:

  1. Connect to the DNS server. There are three configuration files available on the DNS server under the /home/ec2-user folder:
    • named.conf
    • mysql.internal.zone
    • kafka.us-east-1.amazonaws.com.zone
  2. Run the following commands to install and configure your DNS server:
    sudo yum install bind bind-utils –y
    cp /home/ec2-user/named.conf /etc/named.conf
    chmod 644 /etc/named.conf
    cp mysql.internal.zone /var/named/mysql.internal.zone
    cp kafka.region.amazonaws.com.zone /var/named/kafka.region.amazonaws.com.zone
    

  3. Update /etc/named.conf.

For the allow-transfer attribute, update the DNS server internal IP address to allow-transfer

{ localhost; <DNS Server internal IP address>; };.

You can find the DNS server IP address on the CloudFormation template Outputs tab.

Note that the MSK cluster is still not set up at this stage. We need to update the Kafka broker DNS names and their respective internal IP addresses in the /var/named/kafka.region.amazonaws.com configuration file after setting up the MSK cluster later in this post. For instructions, refer to here.

Also note that these settings configure the DNS server for this post. In your own environment, you can configure the DNS server as per your needs.

  1. Restart the DNS service:
    sudo su
    service named restart
    

You should see the following message:

Redirecting to /bin/systemctl restart named.service

Your custom DNS server is up and running now.

Upload the data to the MySQL database

Typically, we can use an Amazon RDS for MySQL database, but for this post, we use custom MySQL database servers. The Amazon RDS DNS is publicly accessible and MSK Connect supports it, but it was not able to support databases or applications with private DNS in the past. With the latest private DNS hostnames feature launch, it can support applications’ private DNS as well, so we use a MySQL database on the EC2 instance.

This installation provides information about setting up the MySQL database on a single-node EC2 instance. This should not be used for your production setup. You should follow appropriate guidance for setting up and configuring MySQL in your account.

The MySQL database is already set up using the CloudFormation template and is ready to use now. To upload the data, complete the followings steps:

  1. SSH to the MySQL EC2 instance. For instructions, refer to Connect to your Linux instance. The data file salesdb.sql is already downloaded and available under the /home/ec2-user directory.
  2. Log in to mysqldb with the user name master.
  3. To access the password, navigate to AWS Systems Manager and Parameter Store tab. Select /Database/Credentials/master and click on View Details and copy the value for the key.
  4. Log in to MySQL using the following command:
    mysql -umaster -p<MySQLMasterUserPassword>
    

  5. Run the following commands to create the salesdb database and load the data to the table:
    use salesdb;
    source /home/ec2-user/salesdb.sql;

This will insert the records in various different tables in the salesdb database.

  1. Run show tables to see the following tables in the salesdb:
    mysql> show tables;
    +-----------------------+
    | Tables_in_salesdb |
    +-----------------------+
    | CUSTOMER |
    | CUSTOMER_SITE |
    | PRODUCT |
    | PRODUCT_CATEGORY |
    | SALES_ORDER |
    | SALES_ORDER_ALL |
    | SALES_ORDER_DETAIL |
    | SALES_ORDER_DETAIL_DS |
    | SALES_ORDER_V |
    | SUPPLIER |
    +-----------------------+

Create a DHCP option set

DHCP option sets give you control over the following aspects of routing in your virtual network:

  • You can control the DNS servers, domain names, or Network Time Protocol (NTP) servers used by the devices in your VPC.
  • You can disable DNS resolution completely in your VPC.

To support private DNS, you can use an Amazon Route 53 private zone or your own custom DNS server. If you use a Route 53 private zone, the setup will work automatically and there is no need to make any changes to the default DHCP option set for the MSK VPC. For a custom DNS server, complete the following steps to set up a custom DHCP configuration using Amazon Virtual Private Cloud (Amazon VPC) and attach it to the MSK VPC.

There will be a default DHCP option set in your VPC attached to the Amazon provided DNS server. At this stage, the requests will go to Amazon’s provided DNS server for resolution. However, we create a new DHCP option set because we’re using a custom DNS server.

  1. On the Amazon VPC console, choose DHCP option set in the navigation pane.
  2. Choose Create DHCP option set.
  3. For DHCP option set name, enter MSKConnect_Private_DHCP_OptionSet.
  4. For Domain name, enter mysql.internal.
  5. For Domain name server, enter the DNS server IP address.
  6. Choose Create DHCP option set.
  7. Navigate to the MSK VPC and on the Actions menu, choose Edit VPC settings.
  8. Select the newly created DHCP option set and save it.
    The following screenshot shows the example configurations.
  9. On the Amazon EC2 console, navigate to privateDNS_bastion_host.
  10. Choose Instance state and Reboot instance.
  11. Wait a few minutes and then run nslookup from the bastion host; it should be able to resolve it using your local DNS server instead of Route 53:
nslookup local.mysql.internal

Now our base infrastructure setup is ready to move to the next stage. As part of our base infrastructure, we have set up the following key components successfully:

  • MSK and MySQL VPCs
  • Subnets
  • EC2 instances
  • VPC peering
  • Route tables
  • NAT gateways and internet gateways
  • DNS server and configuration
  • Appropriate security groups and NACLs
  • MySQL database with the required data

At this stage, the MySQL DB DNS name is resolvable using a custom DNS server instead of Route 53.

Set up the MSK cluster and MSK Connect

The next step is to deploy the MSK cluster and MSK Connect, which will fetch records from the salesdb and send it to an Amazon Simple Storage Service (Amazon S3) bucket. In this section, we provide a walkthrough of replicating the MySQL database (salesdb) to Amazon MSK using Debezium, an open-source connector. The connector will monitor for any changes to the database and capture any changes to the tables.

With MSK Connect, you can run fully managed Apache Kafka Connect workloads on AWS. MSK Connect provisions the required resources and sets up the cluster. It continuously monitors the health and delivery state of connectors, patches and manages the underlying hardware, and auto scales connectors to match changes in throughput. As a result, you can focus your resources on building applications rather than managing infrastructure.

MSK Connect will make use of the custom DNS server in the VPC and it won’t be dependent on Route 53.

Create an MSK cluster configuration

Complete the following steps to create an MSK cluster:

  1. On the Amazon MSK console, choose Cluster configurations under MSK clusters in the navigation pane.
  2. Choose Create configuration.
  3. Name the configuration mskc-tutorial-cluster-configuration.
  4. Under Configuration properties, remove everything and add the line auto.create.topics.enable=true.
  5. Choose Create.

Create an MSK cluster and attach the configuration

In the next step, we attach this configuration to a cluster. Complete the following steps:

  1. On the Amazon MSK console, choose Clusters under MSK clusters in the navigation pane.
  2. Choose Create clusters and Custom create.
  3. For the cluster name, enter mkc-tutorial-cluster.
  4. Under General cluster properties, choose Provisioned for the cluster type and use the Apache Kafka default version 2.8.1.
  5. Use all the default options for the Brokers and Storage sections.
  6. Under Configurations, choose Custom configuration.
  7. Select mskc-tutorial-cluster-configuration with the appropriate revision and choose Next.
  8. Under Networking, choose the MSK VPC.
  9. Select the Availability Zones depending upon your Region, such as us-east1a, us-east1b, and us-east1c, and the respective private subnets MSK-Private-1, MSK-Private-2, and MSK-Private-3 if you are in the us-east-1 Region. Public access to these brokers should be off.
  10. Copy the security group ID from Chosen security groups.
  11. Choose Next.
  12. Under Access control methods, select IAM role-based authentication.
  13. In the Encryption section, under Between clients and brokers, TLS encryption will be selected by default.
  14. For Encrypt data at rest, select Use AWS managed key.
  15. Use the default options for Monitoring and select Basic monitoring.
  16. Select Deliver to Amazon CloudWatch Logs.
  17. Under Log group, choose visit Amazon CloudWatch Logs console.
  18. Choose Create log group.
  19. Enter a log group name and choose Create.
  20. Return to the Monitoring and tags page and under Log groups, choose Choose log group
  21. Choose Next.
  22. Review the configurations and choose Create cluster. You’re redirected to the details page of the cluster.
  23. Under Security groups applied, note the security group ID to use in a later step.

Cluster creation can typically take 25–30 minutes. Its status changes to Active when it’s created successfully.

Update the /var/named/kafka.region.amazonaws.com zone file

Before you create the MSK connector, update the DNS server configurations with the MSK cluster details.

  1. To get the list of bootstrap server DNS and respective IP addresses, navigate to the cluster and choose View client information.
  2. Copy the bootstrap server information with IAM authentication type.
  3. You can identify the broker IP addresses using nslookup from your local machine and it will provide you the broker local IP address. Currently, your VPC points to the latest DHCP option set and your DNS server will not be able to resolve these DNS names from your VPC.
    nslookup <broker 1 DNS name>

Now you can log in to the DNS server and update the records for different brokers and respective IP addresses in the /var/named/kafka.region.amazonaws.com file.

  1. Upload the msk-access.pem file to BastionHostInstance from your local machine:
    scp -i "< your pem file>" Your pem file ec2-user@<BastionHostInstance IP address>:/home/ec2-user/

  2. Log in to the DNS server and open the /var/named/kafka.region.amazonaws.com file and update the following lines with the correct MSK broker DNS names and respective IP addresses:
    <b-1.<clustername>.******.c6> IN A <Internal IP Address - broker 1>
    <b-2.<clustername>.******.c6> IN A <Internal IP Address - broker 2>
    <b-3.<clustername>.******.c6> IN A <Internal IP Address - broker 3>
    

Note that you need to provide the broker DNS as mentioned earlier. Remove .kafka.<region id>.amazonaws.com from the broker DNS name.

  1. Restart the DNS service:
    sudo su
    service named restart

You should see the following message:

Redirecting to /bin/systemctl restart named.service

Your custom DNS server is up and running now and you should be able to resolve using broker DNS names using the internal DNS server.

Update the security group for connectivity between the MySQL database and MSK Connect

It’s important to have the appropriate connectivity in place between MSK Connect and the MySQL database. Complete the following steps:

  1. On the Amazon MSK console, navigate to the MSK cluster and under Network settings, copy the security group.
  2. On the Amazon EC2 console, choose Security groups in the navigation pane.
  3. Edit the security group MySQL_SG and choose Add rule.
  4. Add a rule with MySQL/Aurora as the type and the MSK security group as the inbound resource for its source.
  5. Choose Save rules.

Create the MSK connector

To create your MSK connector, complete the following steps:

  1. On the Amazon MSK console, choose Connectors under MSK Connect in the navigation pane.
  2. Choose Create connector.
  3. Select Create custom plugin.
  4. Download the MySQL connector plugin for the latest stable release from the Debezium site or download Debezium.zip.
  5. Upload the MySQL connector zip file to the S3 bucket.
  6. Copy the URL for the file, such as s3://<bucket name>/Debezium.zip.
  7. Return to the Choose custom plugin page and enter the S3 file path for S3 URI.
  8. For Custom plugin name, enter mysql-plugin.
  9. Choose Next.
  10. For Name, enter mysql-connector.
  11. For Description, enter a description of the connector.
  12. For Cluster type, choose MSK Cluster.
  13. Select the existing cluster from the list (for this post, mkc-tutorial-cluster).
  14. Specify the authentication type as IAM.
  15. Use the following values for Connector configuration:
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.history.producer.sasl.mechanism=AWS_MSK_IAM
    database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    database.allowPublicKeyRetrieval=true
    database.user=master
    database.server.id=123456
    tasks.max=1
    database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    database.history.producer.security.protocol=SASL_SSL
    database.history.kafka.topic=dbhistory.salesdb
    database.history.kafka.bootstrap.servers=b-3.xxxxx.yyyy.zz.kafka.us-east-2.amazonaws.com:9098,b-1.xxxxx.yyyy.zz.kafka.us-east-2.amazonaws.com:9098,b-2. xxxxx.yyyy.zz.kafka.us-east-2.amazonaws.com:9098
    database.server.name=salesdb-server
    database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    database.history.consumer.security.protocol=SASL_SSL
    database.port=3306
    include.schema.changes=true
    database.hostname=local.mysql.internal
    database.password=xxxxxx
    table.include.list=salesdb.SALES_ORDER,salesdb.SALES_ORDER_ALL,salesdb.CUSTOMER
    database.history.consumer.sasl.mechanism=AWS_MSK_IAM
    database.include.list=salesdb
    

  16. Update the following connector configuration:
    database.user=master
    database.hostname=local.mysql.internal
    database.password=<MySQLMasterUserPassword>
    

  17. For Capacity type, choose Provisioned.
  18. For MCU count per worker, enter 1.
  19. For Number of workers, enter 1.
  20. Select Use the MSK default configuration.
  21. In the Access Permissions section, on the Choose service role menu, choose MSK-Connect-PrivateDNS-MySQLConnector*, then choose Next.
  22. In the Security section, keep the default settings.
  23. In the Logs section, select Deliver to Amazon CloudWatch logs.
  24. Choose visit Amazon CloudWatch Logs console.
  25. Under Logs in the navigation pane, choose Log group.
  26. Choose Create log group.
  27. Enter the log group name, retention settings, and tags, then choose Create.
  28. Return to the connector creation page and choose Browse log group.
  29. Choose the AmazonMSKConnect log group, then choose Next.
  30. Review the configurations and choose Create connector.

Wait for the connector creation process to complete (about 10–15 minutes).

The MSK Connect connector is now up and running. You can log in to the MySQL database using your user ID and make a couple of record changes to the customer table record. MSK Connect will be able to receive CDC records and updates to the database will be available in the MSK <Customer> topic.

Consume messages from the MSK topic

To consume messages from the MSK topic, run the Kafka consumer on the MSK_Client EC2 instance available in the MSK VPC.

  1. SSH to the MSK_Client EC2 instance. The MSK_Client instance has the required Kafka client libraries, Amazon MSK IAM JAR file, client.properties file, and an instance profile attached to it, along with the appropriate IAM role using the CloudFormation template.
  2. Add the MSKClientSG security group as the source for the MSK security group with the following properties:
    • For Type, choose All Traffic.
    • For Source, choose Custom and MSK Security Group.

    Now you’re ready to consume data.

  3. To list the topics, run the following command:
    ./kafka-topics.sh --bootstrap-server <BootstrapServerString>

  4. To consume data from the salesdb-server.salesdb.CUSTOMER topic, use the following command:
    ./kafka-console-consumer.sh --bootstrap-server <BootstrapServerString> --consumer.config client.properties --topic salesdb-server.salesdb.CUSTOMER --from-beginning

Run the Kafka consumer on your EC2 machine and you will be able to log messages similar to the following:

Struct{after=Struct{CUST_ID=1998.0,NAME=Customer Name 1998,MKTSEGMENT=Market Segment 3},source=Struct{version=1.9.5.Final,connector=mysql,name=salesdb-server,ts_ms=1678099992174,snapshot=true,db=salesdb,table=CUSTOMER,server_id=0,file=binlog.000001,pos=43298383,row=0},op=r,ts_ms=1678099992174}
Struct{after=Struct{CUST_ID=1999.0,NAME=Customer Name 1999,MKTSEGMENT=Market Segment 7},source=Struct{version=1.9.5.Final,connector=mysql,name=salesdb-server,ts_ms=1678099992174,snapshot=true,db=salesdb,table=CUSTOMER,server_id=0,file=binlog.000001,pos=43298383,row=0},op=r,ts_ms=1678099992174}
Struct{after=Struct{CUST_ID=2000.0,NAME=Customer Name 2000,MKTSEGMENT=Market Segment 9},source=Struct{version=1.9.5.Final,connector=mysql,name=salesdb-server,ts_ms=1678099992174,snapshot=last,db=salesdb,table=CUSTOMER,server_id=0,file=binlog.000001,pos=43298383,row=0},op=r,ts_ms=1678099992174}
Struct{before=Struct{CUST_ID=2000.0,NAME=Customer Name 2000,MKTSEGMENT=Market Segment 9},after=Struct{CUST_ID=2000.0,NAME=Customer Name 2000,MKTSEGMENT=Market Segment10},source=Struct{version=1.9.5.Final,connector=mysql,name=salesdb-server,ts_ms=1678100372000,db=salesdb,table=CUSTOMER,server_id=1,file=binlog.000001,pos=43298616,row=0,thread=67},op=u,ts_ms=1678100372612}

While testing the application, records with CUST_ID 1998, 1999, and 2000 were updated, and these records are available in the logs.

Clean up

It’s always a good practice to clean up all the resources created as part of this post to avoid any additional cost. To clean up your resources, delete the MSK Cluster, MSK Connect connection, EC2 instances, DNS server, bastion host, S3 bucket, VPC, subnets and CloudWatch logs.

Additionally, clean up all other AWS resources that you created using AWS CloudFormation. You can delete these resources on the AWS CloudFormation console by deleting the stack.

Conclusion

In this post, we discussed the process of setting up MSK Connect using a private DNS. This feature allows you to configure connectors to reference public or private domain names.

We are able to receive the initial load and CDC records from a MySQL database hosted in a separate VPC and its DNS is not accessible or resolvable externally. MSK Connect was able to connect to the MySQL database and consume the records using the MSK Connect private DNS feature. The custom DHCP option set was attached to the VPC, which ensured DNS resolution was performed using the local DNS server instead of Route 53.

With the MSK Connect private DNS support feature, you can make your databases, data warehouses, and systems like secret managers that work with your own VPC inaccessible to the internet and be able to overcome this limitation and comply with your corporate security posture.

To learn more and get started, refer to private DNS for MSK connect.


About the author

Amar is a Senior Solutions Architect at Amazon AWS in the UK. He works across power, utilities, manufacturing and automotive customers on strategic implementations, specializing in using AWS Streaming and advanced data analytics solutions, to drive optimal business outcomes.

Introducing Amazon MSK Replicator – Fully Managed Replication across MSK Clusters in Same or Different AWS Regions

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/introducing-amazon-msk-replicator-fully-managed-replication-across-msk-clusters-in-same-or-different-aws-regions/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) provides a fully managed and highly available Apache Kafka service simplifying the way you process streaming data. When using Apache Kafka, a common architectural pattern is to replicate data from one cluster to another.

Cross-cluster replication is often used to implement business continuity and disaster recovery plans and increase application resilience across AWS Regions. Another use case, when building multi-Region applications, is to have copies of streaming data in multiple geographies stored closer to end consumers for lower latency access. You might also need to aggregate data from multiple clusters into one centralized cluster for analytics.

To address these needs, you would have to write custom code or install and manage open-source tools like MirrorMaker 2.0, available as part of Apache Kafka starting with version 2.4. However, these tools can be complex and time-consuming to set up for reliable replication, and require continuous monitoring and scaling.

Today, we’re introducing MSK Replicator, a new capability of Amazon MSK that makes it easier to reliably set up cross-Region and same-Region replication between MSK clusters, scaling automatically to handle your workload. You can use MSK Replicator with both provisioned and serverless MSK cluster types, including those using tiered storage.

With MSK Replicator, you can setup both active-passive and active-active cluster topologies to increase the resiliency of your Kafka application across Regions:

  • In an active-active setup, both MSK clusters are actively serving reads and writes.
  • In an active-passive setup, only one MSK cluster at a time is actively serving streaming data while the other cluster is on standby.

Let’s see how that works in practice.

Creating an MSK Replicator across AWS Regions
I have two MSK clusters deployed in different Regions. MSK Replicator requires that the clusters have IAM authentication enabled. I can continue to use other authentication methods such as mTLS or SASL for my other clients. The source cluster also needs to enable multi-VPC private connectivity.

MSK Replicator cross-Region architecture diagram.

From a network perspective, the security groups of the clusters allow traffic between the cluster and the security group used by the Replicator. For example, I can add self-referencing inbound and outbound rules that allow traffic from and to the same security group. For simplicity, I use the default VPC and its default security group for both clusters.

Before creating a replicator, I update the cluster policy of the source cluster to allow the MSK service (including replicators) to find and reach the cluster. In the Amazon MSK console, I select the source Region. I choose Clusters from the navigation pane and then the source cluster. First, I copy the source cluster ARN at the top. Then, in the Properties tab, I choose Edit cluster policy in the Security settings. There, I use the following JSON policy (replacing the source cluster ARN) and save the changes:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kafka.amazonaws.com"
            },
            "Action": [
                "kafka:CreateVpcConnection",
                "kafka:GetBootstrapBrokers",
                "kafka:DescribeClusterV2"
            ],
            "Resource": "<SOURCE_CLUSTER_ARN>"
        }
    ]
}

I select the target Region in the console. I choose Replicators from the navigation pane and then Create replicator. Here, I enter a name and a description for the replicator.

Console screenshot.

In the Source cluster section, I select the Region of the source MSK cluster. Then, I choose Browse to select the source MSK cluster from the list. Note that Replicators can be created only for clusters that have a cluster policy set.

Console screenshot.

I leave Subnets and Security groups as their default values to use my default VPC and its default security group. This network configuration may be used to place elastic network interfaces (EINs) to facilitate communication with your cluster.

The Access control method for the source cluster is set to IAM role-based authentication. Optionally, I can turn on multiple authentication methods at the same time to continue to use clients that need other authentication methods like mTLS or SASL while the Replicator uses IAM. For cross-Region replication, the source cluster cannot have unauthenticated access enabled, because we use multi-VPC to access their source cluster.

Console screenshot.

In the Target cluster section, the Cluster region is set to the Region where I’m using the console. I choose Browse to select the target MSK cluster from the list.

Console screenshot.

Similar to what I did for the source cluster, I leave Subnets and Security groups as their default values. This network configuration is used to place the ENIs required to communicate with the target cluster. The Access control method for the target cluster is also set to IAM role-based authentication.

Console screenshot.

In the Replicator settings section, I use the default Topic replication configuration, so that all topics are replicated. Optionally, I can specify a comma-separated list of regular expressions that indicate the names of the topics to replicate or to exclude from replication. In the Additional settings, I can choose to copy topics configurations, access control lists (ACLs), and to detect and copy new topics.

Console screenshot.

Consumer group replication allows me to specify if consumer group offsets should be replicated so that, after a switchover, consuming applications can resume processing near where they left off in the primary cluster. I can specify a comma-separated list of regular expressions that indicate the names of the consumer groups to replicate or to exclude from replication. I can also choose to detect and copy new consumer groups. I use the default settings that replicate all consumer groups.

Console screenshot.

In Compression, I select None from the list of available compression types for the data that is being replicated.

Console screenshot.

The Amazon MSK console can automatically create a service execution role with the necessary permissions required for the Replicator to work. The role is used by the MSK service to connect to the source and target clusters, to read from the source cluster, and to write to the target cluster. However, I can choose to create and provide my own role as well. In Access permissions, I choose Create or update IAM role.

Console screenshot.

Finally, I add tags to the replicator. I can use tags to search and filter my resources or to track my costs. In the Replicator tags section, I enter Environment as the key and AWS News Blog as the value. Then, I choose Create.

Console screenshot.

After a few minutes, the replicator is running. Let’s put it into use!

Testing an MSK Replicator across AWS Regions
To connect to the source and target clusters, I already set up two Amazon Elastic Compute Cloud (Amazon EC2) instances in the two Regions. I followed the instructions in the MSK documentation to install the Apache Kafka client tools. Because I am using IAM authentication, the two instances have an IAM role attached that allows them to connect, send, and receive data from the clusters. To simplify networking, I used the default security group for the EC2 instances and the MSK clusters.

First, I create a new topic in the source cluster and send a few messages. I use Amazon EC2 Instance Connect to log into the EC2 instance in the source Region. I change the directory to the path where the Kafka client executables have been installed (the path depends on the version you use):

cd /home/ec2-user/kafka_2.12-2.8.1/bin

To connect to the source cluster, I need to know its bootstrap servers. Using the MSK console in the source Region, I choose Clusters from the navigation page and then the source cluster from the list. In the Cluster summary section, I choose View client information. There, I copy the list of Bootstrap servers. Because the EC2 instance is in the same VPC as the cluster, I copy the list in the Private endpoint (single-VPC) column.

Console screenshot.

Back to the EC2 instance, I put the list of bootstrap servers in the SOURCE_BOOTSTRAP_SERVERS environment variable.

export SOURCE_BOOTSTRAP_SERVERS=b-2.uscluster.esijym.c9.kafka.us-east-1.amazonaws.com:9098,b-3.uscluster.esijym.c9.kafka.us-east-1.amazonaws.com:9098,b-1.uscluster.esijym.c9.kafka.us-east-1.amazonaws.com:9098

Now, I create a topic on the source cluster.

./kafka-topics.sh --bootstrap-server $SOURCE_BOOTSTRAP_SERVERS --command-config client.properties --create --topic my-topic --partitions 6

Using the new topic, I send a few messages to the source cluster.

./kafka-console-producer.sh --broker-list $SOURCE_BOOTSTRAP_SERVERS --producer.config client.properties --topic my-topic
>Hello from the US
>These are my messages

Let’s see what happens in the target cluster. I connect to the EC2 instance in the target Region. Similar to what I did for the other instance, I get the list of bootstrap servers for the target cluster and put it into the TARGET_BOOTSTRAP_SERVERS environment variable.

On the target cluster, the source cluster alias is added as a prefix to the replicated topic names. To find the source cluster alias, I choose Replicators in the MSK console navigation pane. There, I choose the replicator I just created. In the Properties tab, I look up the Cluster alias in the Source cluster section.

Console screenshot.

I confirm the name of the replicated topic by looking at the list of topics in the target cluster (it’s the last one in the output list):

./kafka-topics.sh --list --bootstrap-server $TARGET_BOOTSTRAP_SERVERS --command-config client.properties
. . .
us-cluster-c78ec6d63588.my-topic

Now that I know the name of the replicated topic on the target cluster, I start a consumer to receive the messages originally sent to the source cluster:

./kafka-console-consumer.sh --bootstrap-server $TARGET_BOOTSTRAP_SERVERS --consumer.config client.properties --topic us-cluster-c78ec6d63588.my-topic --from-beginning
Hello from the US
These are my messages

Note that I can use a wildcard in the topic subscription (for example, .*my-topic) to automatically handle the prefix and have the same configuration in the source and target clusters.

As expected, all the messages I sent to the source cluster have been replicated and received by the consumer connected to the target cluster.

I can monitor the MSK Replicator latency, throughput, errors, and lag metrics using the Monitoring tab. Because this works through Amazon CloudWatch, I can easily create my own alarms and include these metrics in my dashboards.

To update the configuration to an active-active setup, I follow similar steps to create a replicator in the other Region and replicate streaming data between the clusters in the other direction. For details on how to manage failover and failback, see the MSK Replicator documentation.

Availability and Pricing
MSK Replicator is available today in: US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Europe (Frankfurt), and Europe (Ireland).

With MSK Replicator, you pay per GB of data replicated and an hourly rate for each Replicator. You also pay Amazon MSK’s usual charges for your source and target MSK clusters and standard AWS charges for cross-Region data transfer. For more information, see MSK pricing.

Using MSK replicators, you can quickly implement cross-Region and same-Region replication to improve the resiliency of your architecture and store data close to your partners and end users. You can also use this new capability to get better insights by replicating streaming data to a single, centralized cluster where it is easier to run your analytics.

Simplify your data streaming architectures using Amazon MSK Replicator.

Danilo

AWS Weekly Roundup – Amazon Bedrock Is Now Generally Available, Attend AWS Innovate Online, and More – Oct 2, 2023

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-amazon-bedrock-is-now-generally-available-attend-aws-innovate-online-and-more-oct-2-2023/

Last week I attended the AWS Summit Johannesburg. This was the first summit to be hosted in my own country and my own city since 2019 so it was very special to have the opportunity to attend. It was great to get to meet with so many of our customers and hear how they are building on AWS.

Now on to the AWS updates. I’ve compiled a few announcements and upcoming events you need to know about. Let’s get started!

Last Week’s Launches
Amazon Bedrock Is Now Generally Available – Amazon Bedrock was announced in preview in April of this year as part of a set of new tools for building with generative AI on AWS. Last week’s announcement of this service being generally available was received with a lot of excitement and customers have already been sharing what they are building with Amazon Bedrock. I quite enjoyed this lighthearted post from AWS Serverless Hero Jones Zachariah Noel about the “Bengaluru with traffic-filled roads” image he produced using Stability AI’s Stable Diffusion XL image generation model on Amazon Bedrock.

Amazon MSK Introduces Managed Data Delivery from Apache Kafka to Your Data Lake – Amazon MSK was released in 2019 to help our customers reduce the work needed to set up, scale, and manage Apache Kafka in production. Now you can continuously load data from an Apache Kafka cluster to Amazon Simple Storage Service (Amazon S3).

Other AWS News
A few more news items and blog posts you might have missed:

The Community.AWS Blog is where builders share and learn with the community of cloud enthusiasts. Contributors to this blog include AWS employees, AWS Heroes, AWS Community Builders, and other members of the AWS Community. Last week, AWS Hero Johannes Koch published this awesome post on how to build a simple website using Flutter that interacts with a serverless backend powered by AppSync-merged APIs.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Upcoming AWS Events
We have the following upcoming events:

AWS Cloud Days (October 10, 24) – Connect and collaborate with other like-minded folks while learning about AWS at the AWS Cloud Day in Athens and Prague.

AWS Innovate Online (October 19)Register for AWS Innovate Online to learn how you can build, run, and scale next-generation applications on the most extensive cloud platform. There will be 80+ sessions delivered in five languages and you’ll receive a certificate of attendance to showcase all you’ve learned.

We’re focused on improving our content to provide a better customer experience, and we need your feedback to do so. Take this quick survey to share insights on your experience with the AWS Blog. Note that this survey is hosted by an external company, so the link doesn’t lead to our website. AWS handles your information as described in the AWS Privacy Notice.

Veliswa

Non-JSON ingestion using Amazon Kinesis Data Streams, Amazon MSK, and Amazon Redshift Streaming Ingestion

Post Syndicated from M Mehrtens original https://aws.amazon.com/blogs/big-data/non-json-ingestion-using-amazon-kinesis-data-streams-amazon-msk-and-amazon-redshift-streaming-ingestion/

Organizations are grappling with the ever-expanding spectrum of data formats in today’s data-driven landscape. From Avro’s binary serialization to the efficient and compact structure of Protobuf, the landscape of data formats has expanded far beyond the traditional realms of CSV and JSON. As organizations strive to derive insights from these diverse data streams, the challenge lies in seamlessly integrating them into a scalable solution.

In this post, we dive into Amazon Redshift Streaming Ingestion to ingest, process, and analyze non-JSON data formats. Amazon Redshift Streaming Ingestion allows you to connect to Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK) directly through materialized views, in real time and without the complexity associated with staging the data in Amazon Simple Storage Service (Amazon S3) and loading it into the cluster. These materialized views not only provide a landing zone for streaming data, but also offer the flexibility of incorporating SQL transforms and blending into your extract, load, and transform (ELT) pipeline for enhanced processing. For a deeper exploration on configuring and using streaming ingestion in Amazon Redshift, refer to Real-time analytics with Amazon Redshift streaming ingestion.

JSON data in Amazon Redshift

Amazon Redshift enables storage, processing, and analytics on JSON data through the SUPER data type, PartiQL language, materialized views, and data lake queries. The base construct to access streaming data in Amazon Redshift provides metadata from the source stream (attributes like stream timestamp, sequence numbers, refresh timestamp, and more) and the raw binary data from the stream itself. For streams that contain the raw binary data encoded in JSON format, Amazon Redshift provides a variety of tools for parsing and managing the data. For more information about the metadata of each stream format, refer to Getting started with streaming ingestion from Amazon Kinesis Data Streams and Getting started with streaming ingestion from Amazon Managed Streaming for Apache Kafka.

At the most basic level, Amazon Redshift allows parsing the raw data into distinct columns. The JSON_EXTRACT_PATH_TEXT and JSON_EXTRACT_ARRAY_ELEMENT_TEXT functions enable the extraction of specific details from JSON objects and arrays, transforming them into separate columns for analysis. When the structure of the JSON documents and specific reporting requirements are defined, these methods allow for pre-computing a materialized view with the exact structure needed for reporting, with improved compression and sorting for analytics.

In addition to this approach, the Amazon Redshift JSON functions allow storing and analyzing the JSON data in its original state using the adaptable SUPER data type. The function JSON_PARSE allows you to extract the binary data in the stream and convert it into the SUPER data type. With the SUPER data type and PartiQL language, Amazon Redshift extends its capabilities for semi-structured data analysis. It uses the SUPER data type for JSON data storage, offering schema flexibility within a column. For more information on using the SUPER data type, refer to Ingesting and querying semistructured data in Amazon Redshift. This dynamic capability simplifies data ingestion, storage, transformation, and analysis of semi-structured data, enriching insights from diverse sources within the Redshift environment.

Streaming data formats

Organizations using alternative serialization formats must explore different deserialization methods. In the next section, we dive into the optimal approach for deserialization. In this section, we take a closer look at the diverse formats and strategies organizations use to effectively manage their data. This understanding is key in determining the data parsing approach in Amazon Redshift.

Many organizations use a format other than JSON for their streaming use cases. JSON is a self-describing serialization format, where the schema of the data is stored alongside the actual data itself. This makes JSON flexible for applications, but this approach can lead to increased data transmission between applications due to the additional data contained in the JSON keys and syntax. Organizations seeking to optimize their serialization and deserialization performance, and their network communication between applications, may opt to use a format like Avro, Protobuf, or even a custom proprietary format to serialize application data into binary format in an optimized way. This provides the advantage of an efficient serialization where only the message values are packed into a binary message. However, this requires the consumer of the data to know what schema and protocol was used to serialize the data to deserialize the message. There are several ways that organizations can solve this problem, as illustrated in the following figure.

Visualization of different binary message serialization approaches

Embedded schema

In an embedded schema approach, the data format itself contains the schema information alongside the actual data. This means that when a message is serialized, it includes both the schema definition and the data values. This allows anyone receiving the message to directly interpret and understand its structure without needing to refer to an external source for schema information. Formats like JSON, MessagePack, and YAML are examples of embedded schema formats. When you receive a message in this format, you can immediately parse it and access the data with no additional steps.

Assumed schema

In an assumed schema approach, the message serialization contains only the data values, and there is no schema information included. To interpret the data correctly, the receiving application needs to have prior knowledge of the schema that was used to serialize the message. This is typically achieved by associating the schema with some identifier or context, like a stream name. When the receiving application reads a message, it uses this context to retrieve the corresponding schema and then decodes the binary data accordingly. This approach requires an additional step of schema retrieval and decoding based on context. This generally requires setting up a mapping in-code or in an external database so that consumers can dynamically retrieve the schemas based on stream metadata (such as the AWS Glue Schema Registry).

One drawback of this approach is in tracking schema versions. Although consumers can identify the relevant schema from the stream name, they can’t identify the particular version of the schema that was used. Producers need to ensure that they are making backward-compatible changes to schemas to ensure consumers aren’t disrupted when using a different schema version.

Embedded schema ID

In this case, the producer continues to serialize the data in binary format (like Avro or Protobuf), similar to the assumed schema approach. However, an additional step is involved: the producer adds a schema ID at the beginning of the message header. When a consumer processes the message, it starts by extracting the schema ID from the header. With this schema ID, the consumer then fetches the corresponding schema from a registry. Using the retrieved schema, the consumer can effectively parse the rest of the message. For example, the AWS Glue Schema Registry provides Java SDK SerDe libraries, which can natively serialize and deserialize messages in a stream using embedded schema IDs. Refer to How the schema registry works for more information about using the registry.

The usage of an external schema registry is common in streaming applications because it provides a number of benefits to consumers and developers. This registry contains all the message schemas for the applications and associates them with a unique identifier to facilitate schema retrieval. In addition, the registry may provide other functionalities like schema version change handling and documentation to facilitate application development.

The embedded schema ID in the message payload can contain version information, ensuring publishers and consumers are always using the same schema version to manage data. When schema version information isn’t available, schema registries can help enforce producers making backward-compatible changes to avoid causing issues in consumers. This helps decouple producers and consumers, provides schema validation at both the publisher and consumer stage, and allows for more flexibility in stream usage to allow for a variety of application requirements. Messages can be published with one schema per stream, or with multiple schemas inside a single stream, allowing consumers to dynamically interpret messages as they arrive.

For a deeper dive into the benefits of a schema registry, refer to Validate streaming data over Amazon MSK using schemas in cross-account AWS Glue Schema Registry.

Schema in file

For batch processing use cases, applications may embed the schema used to serialize the data into the data file itself to facilitate data consumption. This is an extension of the embedded schema approach but is less costly because the data file is generally larger, so the schema accounts for a proportionally smaller amount of the overall data. In this case, the consumers can process the data directly without additional logic. Amazon Redshift supports loading Avro data that has been serialized in this manner using the COPY command.

Convert non-JSON data to JSON

Organizations aiming to use non-JSON serialization formats need to develop an external method for parsing their messages outside of Amazon Redshift. We recommend using an AWS Lambda-based external user-defined function (UDF) for this process. Using an external Lambda UDF allows organizations to define arbitrary deserialization logic to support any message format, including embedded schema, assumed schema, and embedded schema ID approaches. Although Amazon Redshift supports defining Python UDFs natively, which may be a viable alternative for some use cases, we demonstrate the Lambda UDF approach in this post to cover more complex scenarios. For examples of Amazon Redshift UDFs, refer to AWS Samples on GitHub.

The basic architecture for this solution is as follows.

See the following code:

-- Step 1
CREATE OR REPLACE EXTERNAL FUNCTION fn_lambda_decode_avro_binary(varchar)
RETURNS varchar IMMUTABLE LAMBDA 'redshift-avro-udf';

-- Step 2
CREATE EXTERNAL SCHEMA kds FROM KINESIS

-- Step 3
CREATE MATERIALIZED VIEW {name} AUTO REFRESH YES AS
SELECT
    -- Step 4
   t.kinesis_data AS binary_avro,
   to_hex(binary_avro) AS hex_avro,
   -- Step 5
   fn_lambda_decode_avro_binary('{stream-name}', hex_avro) AS json_string,
   -- Step 6
   JSON_PARSE(json_string) AS super_data,
   t.sequence_number,
   t.refresh_time,
   t.approximate_arrival_timestamp,
   t.shard_id
FROM kds.{stream_name} AS t

Let’s explore each step in more detail.

Create the Lambda UDF

The overall goal is to develop a method that can accept the raw data as input and produce JSON-encoded data as an output. This aligns with the Amazon Redshift ability to natively process JSON into the SUPER data type. The specifics of the function depend on the serialization and streaming approach. For example, using the assumed schema approach with Avro format, your Lambda function may complete the following steps:

  1. Take in the stream name and hexadecimal-encoded data as inputs.
  2. Use the stream name to perform a lookup to identify the schema for the given stream name.
  3. Decode the hexadecimal data into binary format.
  4. Use the schema to deserialize the binary data into readable format.
  5. Re-serialize the data into JSON format.

The f_glue_schema_registry_avro_to_json AWS samples example illustrates the process of decoding Avro using the assumed schema approach using the AWS Glue Schema Registry in a Lambda UDF to retrieve and use Avro schemas by stream name. For other approaches (such as embedded schema ID), you should author your Lambda function to handle deserialization as defined by your serialization process and schema registry implementation. If your application depends on an external schema registry or table lookup to process the message schema, we recommend that you implement caching for schema lookups to help reduce the load on the external systems and reduce the average Lambda function invocation duration.

When creating the Lambda function, make sure you accommodate the Amazon Redshift input event format and ensure compliance with the expected Amazon Redshift event output format. For details, refer to Creating a scalar Lambda UDF.

After you create and test the Lambda function, you can define it as a UDF in Amazon Redshift. For effective integration within Amazon Redshift, designate this Lambda function UDF as IMMUTABLE. This classification supports incremental materialized view updates. This treats the Lambda function as idempotent and minimizes the Lambda function costs for the solution, because a message doesn’t need to be processed if it has been processed before.

Configure the baseline Kinesis data stream

Regardless of your messaging format or approach (embedded schema, assumed schema, and embedded schema ID), you begin with setting up the external schema for streaming ingestion from your messaging source into Amazon Redshift. For more information, refer to Streaming ingestion.

CREATE EXTERNAL SCHEMA kds FROM KINESIS

IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';

Create the raw materialized view

Next, you define your raw materialized view. This view contains the raw message data from the streaming source in Amazon Redshift VARBYTE format.

Convert the VARBYTE data to VARCHAR format

External Lambda function UDFs don’t support VARBYTE as an input data type. Therefore, you must convert the raw VARBYTE data from the stream into VARCHAR format to pass to the Lambda function. The best way to do this in Amazon Redshift is using the TO_HEX built-in method. This converts the binary data into hexadecimal-encoded character data, which can be sent to the Lambda UDF.

Invoke the Lambda function to retrieve JSON data

After the UDF has been defined, we can invoke the UDF to convert our hexadecimal-encoded data into JSON-encoded VARCHAR data.

Use the JSON_PARSE method to convert the JSON data to SUPER data type

Finally, we can use the Amazon Redshift native JSON parsing methods like JSON_PARSE, JSON_EXTRACT_PATH_TEXT, and more to parse the JSON data into a format that we can use for analytics.

Considerations

Consider the following when using this strategy:

  • Cost – Amazon Redshift invokes the Lambda function in batches to improve scalability and reduce the overall number of Lambda invocations. The cost of this solution depends on the number of messages in your stream, the frequency of the refresh, and the invocation time required to process the messages in a batch from Amazon Redshift. Using the IMMUTABLE UDF type in Amazon Redshift can also help minimize costs by utilizing the incremental refresh strategy for the materialized view.
  • Permissions and network access – The AWS Identity and Access Management (IAM) role used for the Amazon Redshift UDF must have permissions to invoke the Lambda function, and you must deploy the Lambda function such that it has access to invoke its external dependencies (for example, you may need to deploy it in a VPC to access private resources like a schema registry).
  • Monitoring – Use Lambda function logging and metrics to identify errors in deserialization, connection to the schema registry, and data processing. For details on monitoring the UDF Lambda function, refer to Embedding metrics within logs and Monitoring and troubleshooting Lambda functions.

Conclusion

In this post, we dove into different data formats and ingestion methods for a streaming use case. By exploring strategies for handling non-JSON data formats, we examined the use of Amazon Redshift streaming to seamlessly ingest, process, and analyze these formats in near-real time using materialized views.

Furthermore, we navigated through schema-per-stream, embedded schema, assumed schema, and embedded schema ID approaches, highlighting their merits and considerations. To bridge the gap between non-JSON formats and Amazon Redshift, we explored the creation of Lambda UDFs for data parsing and conversion. This approach offers a comprehensive means to integrate diverse data streams into Amazon Redshift for subsequent analysis.

As you navigate the ever-evolving landscape of data formats and analytics, we hope this exploration provides valuable guidance to derive meaningful insights from your data streams. We welcome any thoughts or questions in the comments section.


About the Authors

M Mehrtens has been working in distributed systems engineering throughout their career, working as a Software Engineer, Architect, and Data Engineer. In the past, M has supported and built systems to process terrabytes of streaming data at low latency, run enterprise Machine Learning pipelines, and created systems to share data across teams seamlessly with varying data toolsets and software stacks. At AWS, they are a Sr. Solutions Architect supporting US Federal Financial customers.

Sindhu Achuthan is a Sr. Solutions Architect with Federal Financials at AWS. She works with customers to provide architectural guidance on analytics solutions using AWS Glue, Amazon EMR, Amazon Kinesis, and other services. Outside of work, she loves DIYs, to go on long trails, and yoga.

Build event-driven architectures with Amazon MSK and Amazon EventBridge

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/big-data/build-event-driven-architectures-with-amazon-msk-and-amazon-eventbridge/

Based on immutable facts (events), event-driven architectures (EDAs) allow businesses to gain deeper insights into their customers’ behavior, unlocking more accurate and faster decision-making processes that lead to better customer experiences. In EDAs, modern event brokers, such as Amazon EventBridge and Apache Kafka, play a key role to publish and subscribe to events. EventBridge is a serverless event bus that ingests data from your own apps, software as a service (SaaS) apps, and AWS services and routes that data to targets. Although there is overlap in their role as the backbone in EDAs, both solutions emerged from different problem statements and provide unique features to solve specific use cases. With a solid understanding of both technologies and their primary use cases, developers can create easy-to-use, maintainable, and evolvable EDAs.

If the use case is well defined and directly maps to one event bus, such as event streaming and analytics with streaming events (Kafka) or application integration with simplified and consistent event filtering, transformation, and routing on discrete events (EventBridge), the decision for a particular broker technology is straightforward. However, organizations and business requirements are often more complex and beyond the capabilities of one broker technology. In almost any case, choosing an event broker should not be a binary decision. Combining complementary broker technologies and embracing their unique strengths is a solid approach to build easy-to-use, maintainable, and evolvable EDAs. To make the integration between Kafka and EventBridge even smoother, AWS open-sourced the EventBridge Connector based on Apache Kafka. This allows you to consume from on-premises Kafka deployments and avoid point-to-point communication, while using the existing knowledge and toolset of Kafka Connect.

Streaming applications enable stateful computations over unbound datasets. This allows real-time use cases such as anomaly detection, event-time computations, and much more. These applications can be built using frameworks such as Apache Flink, Apache Spark, or Kafka Streams. Although some of those frameworks support sending events to downstream systems other than Apache Kafka, there is no standardized way of doing so across frameworks. It would require each application owner to build their own logic to send events downstream. In an EDA, the preferred way to handle such a scenario is to publish events to an event bus and then send them downstream.

There are two ways to send events from Apache Kafka to EventBridge: the preferred method using Amazon EventBridge Pipes or the EventBridge sink connector for Kafka Connect. In this post, we explore when to use which option and how to build an EDA using the EventBridge sink connector and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

EventBridge sink connector vs. EventBridge Pipes

EventBridge Pipes connects sources to targets with a point-to-point integration, supporting event filtering, transformations, enrichment, and event delivery to over 14 AWS services and external HTTPS-based targets using API Destinations. This is the preferred and most easy method to send events from Kafka to EventBridge as it simplifies the setup and operations with a delightful developer experience.

Alternatively, under the following circumstances you might want to choose the EventBridge sink connector to send events from Kafka directly to EventBridge Event Buses:

  • You are have already invested in processes and tooling around the Kafka Connect framework as the platform of your choice to integrate with other systems and services
  • You need to integrate with a Kafka-compatible schema registry e.g., the AWS Glue Schema Registry, supporting Avro and Protobuf data formats for event serialization and deserialization
  • You want to send events from on-premises Kafka environments directly to EventBridge Event Buses

Overview of solution

In this post, we show you how to use Kafka Connect and the EventBridge sink connector to send Avro-serialized events from Amazon Managed Streaming for Apache Kafka (Amazon MSK) to EventBridge. This enables a seamless and consistent data flow from Apache Kafka to dozens of supported EventBridge AWS and partner targets, such as Amazon CloudWatch, Amazon SQS, AWS Lambda, and HTTPS targets like Salesforce, Datadog, and Snowflake using EventBridge API destinations. The following diagram illustrates the event-driven architecture used in this blog post based on Amazon MSK and EventBridge.

Architecture Diagram

The workflow consists of the following steps:

  1. The demo application generates credit card transactions, which are sent to Amazon MSK using the Avro format.
  2. An analytics application running on Amazon Elastic Container Service (Amazon ECS) consumes the transactions and analyzes them if there is an anomaly.
  3. If an anomaly is detected, the application emits a fraud detection event back to the MSK notification topic.
  4. The EventBridge connector consumes the fraud detection events from Amazon MSK in Avro format.
  5. The connector converts the events to JSON and sends them to EventBridge.
  6. In EventBridge, we use JSON filtering rules to filter our events and send them to other services or another Event Bus. In this example, fraud detection events are sent to Amazon CloudWatch Logs for auditing and introspection, and to a third-party SaaS provider to showcase how easy it is to integrate with third-party APIs, such as Salesforce.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Deploy the AWS CDK stack

This walkthrough requires you to deploy an AWS CDK stack to your account. You can deploy the full stack end to end or just the required resources to follow along with this post.

  1. In your terminal, run the following command:
    git clone https://github.com/awslabs/eventbridge-kafka-connector/

  2. Navigate to the cdk directory:
    cd eventbridge-kafka-connector/cdk

  3. Deploy the AWS CDK stack based on your preferences:
  4. If you want to see the complete setup explained in this post, run the following command:
    cdk deploy —context deployment=FULL

  5. If you want to deploy the connector on your own but have the required resources already, including the MSK cluster, AWS Identity and Access Management (IAM) roles, security groups, data generator, and so on, run the following command:
    cdk deploy —context deployment=PREREQ

Deploy the EventBridge sink connector on Amazon MSK Connect

If you deployed the CDK stack in FULL mode, you can skip this section and move on to Create EventBridge rules.

The connector needs an IAM role that allows reading the data from the MSK cluster and sending records downstream to EventBridge.

Upload connector code to Amazon S3

Complete the following steps to upload the connector code to Amazon Simple Storage Service (Amazon S3):

  1. Navigate to the GitHub repo.
  2. Download the release 1.0.0 with the AWS Glue Schema Registry dependencies included.
  3. On the Amazon S3 console, choose Buckets in the navigation pane.
  4. Choose Create bucket.
  5. For Bucket name, enter eventbridgeconnector-bucket-${AWS_ACCOUNT_ID}.

As Because S3 buckets must be globally unique, replace ${AWS_ACCOUNT_ID} with your actual AWS account ID. For example, eventbridgeconnector-bucket-123456789012.

  1. Open the bucket and choose Upload.
  2. Select the .jar file that you downloaded from the GitHub repository and choose Upload.

S3 File Upload Console

Create a custom plugin

We now have our application code in Amazon S3. As a next step, we create a custom plugin in Amazon MSK Connect. Complete the following steps:

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane under MSK Connect.
  2. Choose Create custom plugin.
  3. For S3 URI – Custom plugin object, browse to the file named kafka-eventbridge-sink-with-gsr-dependencies.jar in the S3 bucket eventbridgeconnector-bucket-${AWS_ACCOUNT_ID} for the EventBridge connector.
  4. For Custom plugin name, enter msk-eventBridge-sink-plugin-v1.
  5. Enter an optional description.
  6. Choose Create custom plugin.

MSK Connect Plugin Screen

  1. Wait for plugin to transition to the status Active.

Create a connector

Complete the following steps to create a connector in MSK Connect:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the plugin msk-eventBridge-sink-plugin-v1 that you created earlier.
  4. Choose Next.
  5. For Connector name, enter msk-eventBridge-sink-connector.
  6. Enter an optional description.
  7. For Cluster type, select MSK cluster.
  8. For MSK clusters, select the cluster you created earlier.
  9. For Authentication, choose IAM.
  10. Under Connector configurations, enter the following settings (for more details on the configuration, see the GitHub repository):
    auto.offset.reset=earliest
    connector.class=software.amazon.event.kafkaconnector.EventBridgeSinkConnector
    topics=notifications
    aws.eventbridge.connector.id=avro-test-connector
    aws.eventbridge.eventbus.arn=arn:aws:events:us-east-1:123456789012:event-bus/eventbridge-sink-eventbus
    aws.eventbridge.region=us-east-1
    tasks.max=1
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    value.converter.region=us-east-1
    value.converter.registry.name=default-registry
    value.converter.avroRecordType=GENERIC_RECORD

  11. Make sure to replace aws.eventbridge.eventbus.arn, aws.eventbridge.region, and value.converter.region with the values from the prerequisite stack.
  12. In the Connector capacity section, select Autoscaled for Capacity type.
  13. Leave the default value of 1 for MCU count per worker.
  14. Keep all default values for Connector capacity.
  15. For Worker configuration, select Use the MSK default configuration.
  16. Under Access permissions, choose the custom IAM role KafkaEventBridgeSinkStack-connectorRole, which you created during the AWS CDK stack deployment.
  17. Choose Next.
  18. Choose Next again.
  19. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  20. For Log group, choose /aws/mskconnect/eventBridgeSinkConnector.
  21. Choose Next.
  22. Under Review and Create, validate all the settings and choose Create connector.

The connector will be now in the state Creating. It can take up to several minutes for the connector to transition into the status Running.

Create EventBridge rules

Now that the connector is forwarding events to EventBridge, we can use EventBridge rules to filter and send events to other targets. Complete the following steps to create a rule:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Choose Create rule.
  3. Enter eb-to-cloudwatch-logs-and-webhook for Name.
  4. Select eventbridge-sink-eventbus for Event bus.
  5. Choose Next.
  6. Select Custom pattern (JSON editor), choose Insert, and replace the event pattern with the following code:
    {
      "detail": {
        "value": {
          "eventType": ["suspiciousActivity"],
          "source": ["transactionAnalyzer"]
        }
      },
      "detail-type": [{
        "prefix": "kafka-connect-notification"
      }]
    }
    

  7. Choose Next.
  8. For Target 1, select CloudWatch log group and enter kafka-events for Log Group.
  9. Choose Add another target.
  10. (Optional: Create an API destination) For Target 2, select EventBridge API destination for Target types.
  11. Select Create a new API destination.
  12. Enter a descriptive name for Name.
  13. Add the URL and enter it as API destination endpoint. (This can be the URL of your Datadog, Salesforce, etc. endpoint)
  14. Select POST for HTTP method.
  15. Select Create a new connection for Connection.
  16. For Connection Name, enter a name.
  17. Select Other as Destination type and select API Key as Authorization Type.
  18. For API key name and Value, enter your keys.
  19. Choose Next.
  20. Validate your inputs and choose Create rule.

EventBridge Rule

The following screenshot of the CloudWatch Logs console shows several events from EventBridge.

CloudWatch Logs

Run the connector in production

In this section, we dive deeper into the operational aspects of the connector. Specifically, we discuss how the connector scales and how to monitor it using CloudWatch.

Scale the connector

Kafka connectors scale through the number of tasks. The code design of the EventBridge sink connector doesn’t limit the number of tasks that it can run. MSK Connect provides the compute capacity to run the tasks, which can be from Provisioned or Autoscaled type. During the deployment of the connector, we choose the capacity type Autoscaled and 1 MCU per worker (which represents 1vCPU and 4GiB of memory). This means MSK Connect will scale the infrastructure to run tasks but not the number of tasks. The number of tasks is defined by the connector. By default, the connector will start with the number of tasks defined in tasks.max in the connector configuration. If this value is higher than the partition count of the processed topic, the number of tasks will be set to the number of partitions during the Kafka Connect rebalance.

Monitor the connector

MSK Connect emits metrics to CloudWatch for monitoring by default. Besides MSK Connect metrics, the offset of the connector should also be monitored in production. Monitoring the offset gives insights if the connector can keep up with the data produced in the Kafka cluster.

Clean up

To clean up your resources and avoid ongoing charges, complete the following the steps:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Select the connectors you created and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster you created and choose Delete on the Actions menu.
  5. On the EventBridge console, choose Rules in the navigation pane.
  6. Choose the event bus eventbridge-sink-eventbus.
  7. Select all the rules you created and choose Delete.
  8. Confirm the removal by entering delete, then choose Delete.

If you deployed the AWS CDK stack with the context PREREQ, delete the .jar file for the connector.

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Navigate to the bucket where you uploaded your connector and delete the kafka-eventbridge-sink-with-gsr-dependencies.jar file.

Independent from the chosen deployment mode, all other AWS resources can be deleted by using AWS CDK or AWS CloudFormation. Run cdk destroy from the repository directory to delete the CloudFormation stack.

Alternatively, on the AWS CloudFormation console, select the stack KafkaEventBridgeSinkStack and choose Delete.

Conclusion

In this post, we showed how you can use MSK Connect to run the AWS open-sourced Kafka connector for EventBridge, how to configure the connector to forward a Kafka topic to EventBridge, and how to use EventBridge rules to filter and forward events to CloudWatch Logs and a webhook.

To learn more about the Kafka connector for EventBridge, refer to Amazon EventBridge announces open-source connector for Kafka Connect, as well as the MSK Connect Developer Guide and the code for the connector on the GitHub repo.


About the Authors

Florian Mair is a Senior Solutions Architect and data streaming expert at AWS. He is a technologist that helps customers in Germany succeed and innovate by solving business challenges using AWS Cloud services. Besides working as a Solutions Architect, Florian is a passionate mountaineer, and has climbed some of the highest mountains across Europe.

Benjamin Meyer is a Senior Solutions Architect at AWS, focused on Games businesses in Germany to solve business challenges by using AWS Cloud services. Benjamin has been an avid technologist for 7 years, and when he’s not helping customers, he can be found developing mobile apps, building electronics, or tending to his cacti.

Amazon MSK Introduces Managed Data Delivery from Apache Kafka to Your Data Lake

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/amazon-msk-introduces-managed-data-delivery-from-apache-kafka-to-your-data-lake/

I’m excited to announce today a new capability of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that allows you to continuously load data from an Apache Kafka cluster to Amazon Simple Storage Service (Amazon S3). We use Amazon Kinesis Data Firehose—an extract, transform, and load (ETL) service—to read data from a Kafka topic, transform the records, and write them to an Amazon S3 destination. Kinesis Data Firehose is entirely managed and you can configure it with just a few clicks in the console. No code or infrastructure is needed.

Kafka is commonly used for building real-time data pipelines that reliably move massive amounts of data between systems or applications. It provides a highly scalable and fault-tolerant publish-subscribe messaging system. Many AWS customers have adopted Kafka to capture streaming data such as click-stream events, transactions, IoT events, and application and machine logs, and have applications that perform real-time analytics, run continuous transformations, and distribute this data to data lakes and databases in real time.

However, deploying Kafka clusters is not without challenges.

The first challenge is to deploy, configure, and maintain the Kafka cluster itself. This is why we released Amazon MSK in May 2019. MSK reduces the work needed to set up, scale, and manage Apache Kafka in production. We take care of the infrastructure, freeing you to focus on your data and applications. The second challenge is to write, deploy, and manage application code that consumes data from Kafka. It typically requires coding connectors using the Kafka Connect framework and then deploying, managing, and maintaining a scalable infrastructure to run the connectors. In addition to the infrastructure, you also must code the data transformation and compression logic, manage the eventual errors, and code the retry logic to ensure no data is lost during the transfer out of Kafka.

Today, we announce the availability of a fully managed solution to deliver data from Amazon MSK to Amazon S3 using Amazon Kinesis Data Firehose. The solution is serverless–there is no server infrastructure to manage–and requires no code. The data transformation and error-handling logic can be configured with a few clicks in the console.

The architecture of the solution is illustrated by the following diagram.

Amazon MSK to Amazon S3 architecture diagram

Amazon MSK is the data source, and Amazon S3 is the data destination while Amazon Kinesis Data Firehose manages the data transfer logic.

When using this new capability, you no longer need to develop code to read your data from Amazon MSK, transform it, and write the resulting records to Amazon S3. Kinesis Data Firehose manages the reading, the transformation and compression, and the write operations to Amazon S3. It also handles the error and retry logic in case something goes wrong. The system delivers the records that can not be processed to the S3 bucket of your choice for manual inspection. The system also manages the infrastructure required to handle the data stream. It will scale out and scale in automatically to adjust to the volume of data to transfer. There are no provisioning or maintenance operations required on your side.

Kinesis Data Firehose delivery streams support both public and private Amazon MSK provisioned or serverless clusters. It also supports cross-account connections to read from an MSK cluster and to write to S3 buckets in different AWS accounts. The Data Firehose delivery stream reads data from your MSK cluster, buffers the data for a configurable threshold size and time, and then writes the buffered data to Amazon S3 as a single file. MSK and Data Firehose must be in the same AWS Region, but Data Firehose can deliver data to Amazon S3 buckets in other Regions.

Kinesis Data Firehose delivery streams can also convert data types. It has built-in transformations to support JSON to Apache Parquet and Apache ORC formats. These are columnar data formats that save space and enable faster queries on Amazon S3. For non-JSON data, you can use AWS Lambda to transform input formats such as CSV, XML, or structured text into JSON before converting the data to Apache Parquet/ORC. Additionally, you can specify data compression formats from Data Firehose, such as GZIP, ZIP, and SNAPPY, before delivering the data to Amazon S3, or you can deliver the data to Amazon S3 in its raw form.

Let’s See How It Works
To get started, I use an AWS account where there’s an Amazon MSK cluster already configured and some applications streaming data to it. To get started and to create your first Amazon MSK cluster, I encourage you to read the tutorial.

Amazon MSK - List of existing clusters

For this demo, I use the console to create and configure the data delivery stream. Alternatively, I can use the AWS Command Line Interface (AWS CLI), AWS SDKs, AWS CloudFormation, or Terraform.

I navigate to the Amazon Kinesis Data Firehose page of the AWS Management Console and then choose Create delivery stream.

Kinesis Data Firehose - Main console page

I select Amazon MSK as a data Source and Amazon S3 as a delivery Destination. For this demo, I want to connect to a private cluster, so I select Private bootstrap brokers under Amazon MSK cluster connectivity.

I need to enter the full ARN of my cluster. Like most people, I cannot remember the ARN, so I choose Browse and select my cluster from the list.

Finally, I enter the cluster Topic name I want this delivery stream to read from.

Configure the delivery stream

After the source is configured, I scroll down the page to configure the data transformation section.

On the Transform and convert records section, I can choose whether I want to provide my own Lambda function to transform records that aren’t in JSON or to transform my source JSON records to one of the two available pre-built destination data formats: Apache Parquet or Apache ORC.

Apache Parquet and ORC formats are more efficient than JSON format to query data from Amazon S3. You can select these destination data formats when your source records are in JSON format. You must also provide a data schema from a table in AWS Glue.

These built-in transformations optimize your Amazon S3 cost and reduce time-to-insights when downstream analytics queries are performed with Amazon Athena, Amazon Redshift Spectrum, or other systems.

Configure the data transformation in the delivery stream

Finally, I enter the name of the destination Amazon S3 bucket. Again, when I cannot remember it, I use the Browse button to let the console guide me through my list of buckets. Optionally, I enter an S3 bucket prefix for the file names. For this demo, I enter aws-news-blog. When I don’t enter a prefix name, Kinesis Data Firehose uses the date and time (in UTC) as the default value.

Under the Buffer hints, compression and encryption section, I can modify the default values for buffering, enable data compression, or select the KMS key to encrypt the data at rest on Amazon S3.

When ready, I choose Create delivery stream. After a few moments, the stream status changes to ✅  available.

Select the destination S3 bucket

Assuming there’s an application streaming data to the cluster I chose as a source, I can now navigate to my S3 bucket and see data appearing in the chosen destination format as Kinesis Data Firehose streams it.

S3 bucket browsers shows the files streamed from MSK

As you see, no code is required to read, transform, and write the records from my Kafka cluster. I also don’t have to manage the underlying infrastructure to run the streaming and transformation logic.

Pricing and Availability.
This new capability is available today in all AWS Regions where Amazon MSK and Kinesis Data Firehose are available.

You pay for the volume of data going out of Amazon MSK, measured in GB per month. The billing system takes into account the exact record size; there is no rounding. As usual, the pricing page has all the details.

I can’t wait to hear about the amount of infrastructure and code you’re going to retire after adopting this new capability. Now go and configure your first data stream between Amazon MSK and Amazon S3 today.

— seb

Stitch Fix seamless migration: Transitioning from self-managed Kafka to Amazon MSK

Post Syndicated from Karthik Kondamudi original https://aws.amazon.com/blogs/big-data/stitch-fix-seamless-migration-transitioning-from-self-managed-kafka-to-amazon-msk/

This post is co-written with Karthik Kondamudi and Jenny Thompson from Stitch Fix.

Stitch Fix is a personalized clothing styling service for men, women, and kids. At Stitch Fix, we have been powered by data science since its foundation and rely on many modern data lake and data processing technologies. In our infrastructure, Apache Kafka has emerged as a powerful tool for managing event streams and facilitating real-time data processing. At Stitch Fix, we have used Kafka extensively as part of our data infrastructure to support various needs across the business for over six years. Kafka plays a central role in the Stitch Fix efforts to overhaul its event delivery infrastructure and build a self-service data integration platform.

If you’d like to know more background about how we use Kafka at Stitch Fix, please refer to our previously published blog post, Putting the Power of Kafka into the Hands of Data Scientists. This post includes much more information on business use cases, architecture diagrams, and technical infrastructure.

In this post, we will describe how and why we decided to migrate from self-managed Kafka to Amazon Managed Streaming for Apache Kafka (Amazon MSK). We’ll start with an overview of our self-managed Kafka, why we chose to migrate to Amazon MSK, and ultimately how we did it.

  1. Kafka clusters overview
  2. Why migrate to Amazon MSK
  3. How we migrated to Amazon MSK
  4. Navigating challenges and lessons learned
  5. Conclusion

Kafka Clusters Overview

At Stitch Fix, we rely on several different Kafka clusters dedicated to specific purposes. This allows us to scale these clusters independently and apply more stringent SLAs and message delivery guarantees per cluster. This also reduces overall risk by minimizing the impact of changes and upgrades and allows us to isolate and fix any issues that occur within a single cluster.

Our main Kafka cluster serves as the backbone of our data infrastructure. It handles a multitude of critical functions, including managing business events, facilitating microservice communication, supporting feature generation for machine learning workflows, and much more. The stability, reliability, and performance of this cluster are of utmost importance to our operations.

Our logging cluster plays a vital role in our data infrastructure. It serves as a centralized repository for various application logs, including web server logs and Nginx server logs. These logs provide valuable insights for monitoring and troubleshooting purposes. The logging cluster ensures smooth operations and efficient analysis of log data.

Why migrate to Amazon MSK

In the past six years, our data infrastructure team has diligently managed our Kafka clusters. While our team has acquired extensive knowledge in maintaining Kafka, we have also faced challenges such as rolling deployments for version upgrades, applying OS patches, and the overall operational overhead.

At Stitch Fix, our engineers thrive on creating new features and expanding our service offerings to delight our customers. However, we recognized that allocating significant resources to Kafka maintenance was taking away precious time from innovation. To overcome this challenge, we set out to find a managed service provider that could handle maintenance tasks like upgrades and patching while granting us complete control over cluster operations, including partition management and rebalancing. We also sought an effortless scaling solution for storage volumes, keeping our costs in check while being ready to accommodate future growth.

After thorough evaluation of multiple options, we found the perfect match in Amazon MSK because it allows us to offload cluster maintenance to the highly skilled Amazon engineers. With Amazon MSK in place, our teams can now focus their energy on developing innovative applications unique and valuable to Stitch Fix, instead of getting caught up in Kafka administration tasks.

Amazon MSK streamlines the process, eliminating the need for manual configurations, additional software installations, and worries about scaling. It simply works, enabling us to concentrate on delivering exceptional value to our cherished customers.

How we migrated to Amazon MSK

While planning our migration, we desired to switch specific services to Amazon MSK individually with no downtime, ensuring that only a specific subset of services would be migrated at a time. The overall infrastructure would run in a hybrid environment where some services connect to Amazon MSK and others to the existing Kafka infrastructure.

We decided to start the migration with our less critical logging cluster first and then proceed to migrating the main cluster. Although the logs are essential for monitoring and troubleshooting purposes, they hold relatively less significance to the core business operations. Additionally, the number and types of consumers and producers for the logging cluster is smaller, making it an easier choice to start with. Then, we were able to apply our learnings from the logging cluster migration to the main cluster. This deliberate choice enabled us to execute the migration process in a controlled manner, minimizing any potential disruptions to our critical systems.

Over the years, our experienced data infrastructure team has employed Apache Kafka MirrorMaker 2 (MM2) to replicate data between different Kafka clusters. Currently, we rely on MM2 to replicate data from two different production Kafka clusters. Given its proven track record within our organization, we decided to use MM2 as the primary tool for our data migration process.

The general guidance for MM2 is as follows:

  1. Begin with less critical applications.
  2. Perform active migrations.
  3. Familiarize yourself with key best practices for MM2.
  4. Implement monitoring to validate the migration.
  5. Accumulate essential insights for migrating other applications.

MM2 offers flexible deployment options, allowing it to function as a standalone cluster or be embedded within an existing Kafka Connect cluster. For our migration project, we deployed a dedicated Kafka Connect cluster operating in distributed mode.

This setup provided the scalability we needed, allowing us to easily expand the standalone cluster if necessary. Depending on specific use cases such as geoproximity, high availability (HA), or migrations, MM2 can be configured for active-active replication, active-passive replication, or both. In our case, as we migrated from self-managed Kafka to Amazon MSK, we opted for an active-passive configuration, where MirrorMaker was used for migration purposes and subsequently taken offline upon completion.

MirrorMaker configuration and replication policy

By default, MirrorMaker renames replication topics by prefixing the name of the source Kafka cluster to the destination cluster. For instance, if we replicate topic A from the source cluster “existing” to the new cluster “newkafka,” the replicated topic would be named “existing.A” in “newkafka.” However, this default behavior can be modified to maintain consistent topic names within the newly created MSK cluster.

To maintain consistent topic names in the newly created MSK cluster and avoid downstream issues, we utilized the CustomReplicationPolicy jar provided by AWS. This jar, included in our MirrorMaker setup, allowed us to replicate topics with identical names in the MSK cluster. Additionally, we utilized MirrorCheckpointConnector to synchronize consumer offsets from the source cluster to the target cluster and MirrorHeartbeatConnector to ensure connectivity between the clusters.

Monitoring and metrics

MirrorMaker comes equipped with built-in metrics to monitor replication lag and other essential parameters. We integrated these metrics into our MirrorMaker setup, exporting them to Grafana for visualization. Since we have been using Grafana to monitor other systems, we decided to use it during migration as well. This enabled us to closely monitor the replication status during the migration process. The specific metrics we monitored will be described in more detail below.

Additionally, we monitored the MirrorCheckpointConnector included with MirrorMaker, as it periodically emits checkpoints in the destination cluster. These checkpoints contained offsets for each consumer group in the source cluster, ensuring seamless synchronization between the clusters.

Network layout

At Stitch Fix, we use several virtual private clouds (VPCs) through Amazon Virtual Private Cloud (Amazon VPC) for environment isolation in each of our AWS accounts. We have been using separate production and staging VPCs since we initially started using AWS. When necessary, peering of VPCs across accounts is handled through AWS Transit Gateway. To maintain the strong isolation between environments we have been using all along, we created separate MSK clusters in their respective VPCs for production and staging environments.

Side note: It will be easier now to quickly connect Kafka clients hosted in different virtual private clouds with recently announced Amazon MSK multi-VPC private connectivity, which was not available at the time of our migration.

Migration steps: High-level overview

In this section, we outline the high-level sequence of events for the migration process.

Kafka Connect setup and MM2 deploy

First, we deployed a new Kafka Connect cluster on an Amazon Elastic Compute Cloud (Amazon EC2) cluster as an intermediary between the existing Kafka cluster and the new MSK cluster. Next, we deployed the 3 MirrorMaker connectors to this Kafka Connect cluster. Initially, this cluster was configured to mirror all the existing topics and their configurations into the destination MSK cluster. (We eventually changed this configuration to be more granular, as described in the “Navigating challenges and lessons learned” section below.)

Monitor replication progress with MM metrics

Take advantage of the JMX metrics offered by MirrorMaker to monitor the progress of data replication. In addition to comprehensive metrics, we primarily focused on key metrics, namely replication-latency-ms and checkpoint-latency-ms. These metrics provide invaluable insights into the replication status, including crucial aspects such as replication lag and checkpoint latency. By seamlessly exporting these metrics to Grafana, you gain the ability to visualize and closely track the progress of replication, ensuring the successful reproduction of both historical and new data by MirrorMaker.

Evaluate usage metrics and provisioning

Analyze the usage metrics of the new MSK cluster to ensure proper provisioning. Consider factors such as storage, throughput, and performance. If required, resize the cluster to meet the observed usage patterns. While resizing may introduce additional time to the migration process, it is a cost-effective measure in the long run.

Sync consumer offsets between source and target clusters

Ensure that consumer offsets are synchronized between the source in-house clusters and the target MSK clusters. Once the consumer offsets are in sync, redirect the consumers of the existing in-house clusters to consume data from the new MSK cluster. This step ensures a seamless transition for consumers and allows uninterrupted data flow during the migration.

Update producer applications

After confirming that all consumers are successfully consuming data from the new MSK cluster, update the producer applications to write data directly to the new cluster. This final step completes the migration process, ensuring that all data is now being written to the new MSK cluster and taking full advantage of its capabilities.

Navigating challenges and lessons learned

During our migration, we encountered three challenges that required careful attention: scalable storage, more granular configuration of replication configuration, and memory allocation.

Initially, we faced issues with auto scaling Amazon MSK storage. We learned storage auto scaling requires a 24-hour cool-off period before another scaling event can occur. We observed this when migrating the logging cluster, and we applied our learnings from this and factored in the cool-off period during production cluster migration.

Additionally, to optimize MirrorMaker replication speed, we updated the original configuration to divide the replication jobs into batches based on volume and allocated more tasks to high-volume topics.

During the initial phase, we initiated replication using a single connector to transfer all topics from the source to target clusters, encompassing a significant number of tasks. However, we encountered challenges such as increasing replication lag for high-volume topics and slower replication for specific topics. Upon careful examination of the metrics, we adopted an alternative approach by segregating high-volume topics into multiple connectors. In essence, we divided the topics into categories of high, medium, and low volumes, assigning them to respective connectors and adjusting the number of tasks based on replication latency. This strategic adjustment yielded positive outcomes, allowing us to achieve faster and more efficient data replication across the board.

Lastly, we encountered Java virtual machine heap memory exhaustion, resulting in missing metrics while running MirrorMaker replication. To address this, we increased memory allocation and restarted the MirrorMaker process.

Conclusion

Stitch Fix’s migration from self-managed Kafka to Amazon MSK has allowed us to shift our focus from maintenance tasks to delivering value for our customers. It has reduced our infrastructure costs by 40 percent and given us the confidence that we can easily scale the clusters in the future if needed. By strategically planning the migration and using Apache Kafka MirrorMaker, we achieved a seamless transition while ensuring high availability. The integration of monitoring and metrics provided valuable insights during the migration process, and Stitch Fix successfully navigated challenges along the way. The migration to Amazon MSK has empowered Stitch Fix to maximize the capabilities of Kafka while benefiting from the expertise of Amazon engineers, setting the stage for continued growth and innovation.

Further reading


About the Authors

Karthik Kondamudi is an Engineering Manager in the Data and ML Platform Group at StitchFix. His interests lie in Distributed Systems and large-scale data processing. Beyond work, he enjoys spending time with family and hiking. A dog lover, he’s also passionate about sports, particularly cricket, tennis, and football.

Jenny Thompson is a Data Platform Engineer at Stitch Fix. She works on a variety of systems for Data Scientists, and enjoys making things clean, simple, and easy to use. She also likes making pancakes and Pavlova, browsing for furniture on Craigslist, and getting rained on during picnics.

Rahul Nammireddy is a Senior Solutions Architect at AWS, focusses on guiding digital native customers through their cloud native transformation. With a passion for AI/ML technologies, he works with customers in industries such as retail and telecom, helping them innovate at a rapid pace. Throughout his 23+ years career, Rahul has held key technical leadership roles in a diverse range of companies, from startups to publicly listed organizations, showcasing his expertise as a builder and driving innovation. In his spare time, he enjoys watching football and playing cricket.

Todd McGrath is a data streaming specialist at Amazon Web Services where he advises customers on their streaming strategies, integration, architecture, and solutions. On the personal side, he enjoys watching and supporting his 3 teenagers in their preferred activities as well as following his own pursuits such as fishing, pickleball, ice hockey, and happy hour with friends and family on pontoon boats. Connect with him on LinkedIn.

Externalize Amazon MSK Connect configurations with Terraform

Post Syndicated from Ramc Venkatasamy original https://aws.amazon.com/blogs/big-data/externalize-amazon-msk-connect-configurations-with-terraform/

Managing configurations for Amazon MSK Connect, a feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK), can become challenging, especially as the number of topics and configurations grows. In this post, we address this complexity by using Terraform to optimize the configuration of the Kafka topic to Amazon S3 Sink connector. By adopting this strategic approach, you can establish a robust and automated mechanism for handling MSK Connect configurations, eliminating the need for manual intervention or connector restarts. This efficient solution will save time, reduce errors, and provide better control over your Kafka data streaming processes. Let’s explore how Terraform can simplify and enhance the management of MSK Connect configurations for seamless integration with your infrastructure.

Solution overview

At a well-known AWS customer, the management of their constantly growing MSK Connect S3 Sink connector topics has become a significant challenge. The challenges lie in the overhead of managing configurations, as well as dealing with patching and upgrades. Manually handling Kubernetes (K8s) configs and restarting connectors can be cumbersome and error-prone, making it difficult to keep track of changes and updates. At the time of writing this post, MSK Connect does not offer native mechanisms to easily externalize the Kafka topic to S3 Sink configuration.

To address these challenges, we introduce Terraform, an infrastructure as code (IaC) tool. Terraform’s declarative approach and extensive ecosystem make it an ideal choice for managing MSK Connect configurations.

By externalizing Kafka topic to S3 configurations, organizations can achieve the following:

  • Scalability – Effortlessly manage a growing number of topics, ensuring the system can handle increasing data volumes without difficulty
  • Flexibility – Seamlessly integrate MSK Connect configurations with other infrastructure components and services, enabling adaptability to changing business needs
  • Automation – Automate the deployment and management of MSK Connect configurations, reducing manual intervention and streamlining operational tasks
  • Centralized management – Achieve improved governance with centralized management, version control, auditing, and change tracking, ensuring better control and visibility over the configurations

In the following sections, we provide a detailed guide on establishing Terraform for MSK Connect configuration management, defining and decentralizing Topic configurations, and deploying and updating configurations using Terraform.

Prerequisites

Before proceeding with the solution, ensure you have the following resources and access:

  • You need access to an AWS account with sufficient permissions to create and manage resources, including AWS Identity and Access Management (IAM) roles and MSK clusters.
  • To simplify the setup, use the provided AWS CloudFormation template. This template will create the necessary MSK cluster and required resources for this post.
  • For this post, we are using the latest Terraform version (1.5.6).

By ensuring you have these prerequisites in place, you will be ready to follow the instructions and streamline your MSK Connect configurations with Terraform. Let’s get started!

Setup

Setting up Terraform for MSK Connect configuration management includes the following:

  • Installation of Terraform and setting up the environment
  • Setting up the necessary authentication and permissions

Defining and decentralizing topic configurations using Terraform includes the following:

  • Understanding the structure of Terraform configuration files
  • Determining the required variables and resources
  • Utilizing Terraform’s modules and interpolation for flexibility

The decision to externalize the configuration was primarily driven by the customer’s business requirement. They anticipated the need to add topics periodically and wanted to avoid the need to bring down and write specific code each time. Given the limitations of MSK Connect (as of this writing), it’s important to note that MSK Connect can handle up to 300 workers. For this proof of concept (POC), we opted for a configuration with 100 topics directed to a single Amazon Simple Storage Service (Amazon S3) bucket. To ensure compatibility within the 300-worker limit, we set the MCU count to 1 and configured auto scaling with a maximum of 2 workers. This ensures that the configuration remains within the bounds of the 300-worker maximum.

To make the configuration more flexible, we specify the variables that can be utilized in the code.(variables.tf):

variable "aws_region" {
description = "The AWS region to deploy resources in."
type = string
}

variable "s3_bucket_name" {
description = "s3_bucket_name."
type = string
}

variable "topics" {
description = "topics"
type = string
}

variable "msk_connect_name" {
description = "Name of the MSK Connect instance."
type = string
}

variable "msk_connect_description" {
description = "Description of the MSK Connect instance."
type = string
}

# Rest of the variables...

To set up the AWS MSK Connector for the S3 Sink, we need to provide various configurations. Let’s examine the connector_configuration block in the code snippet provided in the main.tf file in more detail:

connector_configuration = {
"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"s3.region" = "us-east-1"
"flush.size" = "5"
"schema.compatibility" = "NONE"
"tasks.max" = "1"
"topics" = var.topics
"format.class" = "io.confluent.connect.s3.format.json.JsonFormat"
"partitioner.class" = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"value.converter.schemas.enable" = "false"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"storage.class" = "io.confluent.connect.s3.storage.S3Storage"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"s3.bucket.name" = var.s3_bucket_name
"topics.dir" = "cxdl-data/KairosTelemetry"
}

The kafka_cluster block in the code snippet defines the Kafka cluster details, including the bootstrap servers and VPC settings. You can reference the variables to specify the appropriate values:

kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = var.bootstrap_servers

vpc {
security_groups = [var.security_groups]
subnets = [var.aws_subnet_example1_id, var.aws_subnet_example2_id, var.aws_subnet_example3_id]
}
}
}

To secure the connection between Kafka and the connector, the code snippet includes configurations for authentication and encryption:

  • The kafka_cluster_client_authentication block sets the authentication type to IAM, enabling the use of IAM for authentication
  • The kafka_cluster_encryption_in_transit block enables TLS encryption for data transfer between Kafka and the connector
  kafka_cluster_client_authentication {
    authentication_type = "IAM"
  }

  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }

You can externalize the variables and provide dynamic values using a var.tfvars file. Let’s assume the content of the var.tfvars file is as follows:

aws_region = "us-east-1"
msk_connect_name = "confluentinc-MSK-connect-s3-2"
msk_connect_description = "My MSK Connect instance"
s3_bucket_name = "msk-lab-xxxxxxxxxxxx-target-bucket"
topics = "salesdb.salesdb.CUSTOMER,salesdb.salesdb.CUSTOMER_SITE,salesdb.salesdb.PRODUCT,salesdb.salesdb.PRODUCT_CATEGORY,salesdb.salesdb.SALES_ORDER,salesdb.salesdb.SALES_ORDER_ALL,salesdb.salesdb.SALES_ORDER_DETAIL,salesdb.salesdb.SALES_ORDER_DETAIL_DS,salesdb.salesdb.SUPPLIER"
bootstrap_servers = "b-2.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098,b-3.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098,b-1.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098“
aws_subnet_example1_id = "subnet-016ef7bb5f5db5759"
aws_subnet_example2_id = "subnet-0114c390d379134fa"
aws_subnet_example3_id = "subnet-0f6352ad89a1454f2"
security_groups = "sg-07eb8f8e4559334e7"
aws_mskconnect_custom_plugin_example_arn = "arn:aws:kafkaconnect:us-east-1:xxxxxxxxxxxx:custom-plugin/confluentinc-kafka-connect-s3-10-0-3/e9aeb52e-d172-4dba-9de5-f5cf73f1cb9e-2"
aws_mskconnect_custom_plugin_example_latest_revision = "1"
aws_iam_role_example_arn = "arn:aws:iam::xxxxxxxxxxxx:role/msk-connect-lab-S3ConnectorIAMRole-3LBTU7YAV9CM"

Deploy and update configurations using Terraform

Once you’ve defined your MSK Connect infrastructure using Terraform, applying these configurations is a straightforward process for creating or updating your infrastructure. This becomes particularly convenient when a new topic needs to be added. Thanks to the externalized configuration, incorporating this change is now a seamless task. The steps are as follows:

  1. Download and install Terraform from the official website (https://www.terraform.io/downloads.html) for your operating system.
  2. Confirm the installation by running the terraform version command on your command line interface.
  3. Ensure that you have configured your AWS credentials using the AWS Command Line Interface (AWS CLI) or by setting environment variables. You can use the aws configure command to configure your credentials if you’re using the AWS CLI.
  4. Place the main.tf, variables.tf, and var.tfvars files in the same Terraform directory.
  5. Open a command line interface, navigate to the directory containing the Terraform files, and run the command terraform init to initialize Terraform and download the required providers.
  6. Run the command terraform plan -var-file="var.tfvars" to review the run plan.

This command shows the changes that Terraform will make to the infrastructure based on the provided variables. This step is optional but is often used as a preview of the changes Terraform will make.

  1. If the plan looks correct, run the command terraform apply -var-file="var.tfvars" to apply the configuration.

Terraform will create the MSK_Connect in your AWS account. This will prompt you for confirmation before proceeding.

  1. After the terraform apply command is complete, verify the infrastructure has been created or updated on the console.
  2. For any changes or updates, modify your Terraform files (main.tf, variables.tf, var.tfvars) as needed, and then rerun the terraform plan and terraform apply commands.
  3. When you no longer need the infrastructure, you can use terraform destroy -var-file="var.tfvars" to remove all resources created by your Terraform files.

Be careful with this command because it will delete all the resources defined in your Terraform files.

Conclusion

In this post, we addressed the challenges faced by a customer in managing MSK Connect configurations and described a Terraform-based solution. By externalizing Kafka topic to Amazon S3 configurations, you can streamline your configuration management processes, achieve scalability, enhance flexibility, automate deployments, and centralize management. We encourage you to use Terraform to optimize your MSK Connect configurations and explore further possibilities in managing your streaming data pipelines efficiently.

To get started with externalizing MSK Connect configurations using Terraform, refer to the provided implementation steps and the Getting Started with Terraform guide, MSK Connect documentation, Terraform documentation, and example GitHub repository.

Using Terraform to externalize the Kafka topic to Amazon S3 Sink configuration in MSK Connect offers a powerful solution for managing and scaling your streaming data pipelines. By automating the deployment, updating, and central management of configurations, you can ensure efficiency, flexibility, and scalability in your data processing workflows.


About the Author

RamC Venkatasamy is a Solutions Architect based in Bloomington, Illinois. He helps AWS Strategic customers transform their businesses in the cloud. With a fervent enthusiasm for Serverless, Event-Driven Architecture and GenAI.

Securely process near-real-time data from Amazon MSK Serverless using an AWS Glue streaming ETL job with IAM authentication

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/securely-process-near-real-time-data-from-amazon-msk-serverless-using-an-aws-glue-streaming-etl-job-with-iam-authentication/

Streaming data has become an indispensable resource for organizations worldwide because it offers real-time insights that are crucial for data analytics. The escalating velocity and magnitude of collected data has created a demand for real-time analytics. This data originates from diverse sources, including social media, sensors, logs, and clickstreams, among others. With streaming data, organizations gain a competitive edge by promptly responding to real-time events and making well-informed decisions.

In streaming applications, a prevalent approach involves ingesting data through Apache Kafka and processing it with Apache Spark Structured Streaming. However, managing, integrating, and authenticating the processing framework (Apache Spark Structured Streaming) with the ingesting framework (Kafka) poses significant challenges, necessitating a managed and serverless framework. For example, integrating and authenticating a client like Spark streaming with Kafka brokers and zookeepers using a manual TLS method requires certificate and keystore management, which is not an easy task and requires a good knowledge of TLS setup.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. In this post, we use Amazon MSK Serverless, a cluster type for Amazon MSK that makes it possible for you to run Apache Kafka without having to manage and scale cluster capacity. To further enhance security and streamline authentication and authorization processes, MSK Serverless enables you to handle both authentication and authorization using AWS Identity and Access Management (IAM) in your cluster. This integration eliminates the need for separate mechanisms for authentication and authorization, simplifying and strengthening data protection. For example, when a client tries to write to your cluster, MSK Serverless uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

To process data effectively, we use AWS Glue, a serverless data integration service that uses the Spark Structured Streaming framework and enables near-real-time data processing. An AWS Glue streaming job can handle large volumes of incoming data from MSK Serverless with IAM authentication. This powerful combination ensures that data is processed securely and swiftly.

The post demonstrates how to build an end-to-end implementation to process data from MSK Serverless using an AWS Glue streaming extract, transform, and load (ETL) job with IAM authentication to connect MSK Serverless from the AWS Glue job and query the data using Amazon Athena.

Solution overview

The following diagram illustrates the architecture that you implement in this post.

The workflow consists of the following steps:

  1. Create an MSK Serverless cluster with IAM authentication and an EC2 Kafka client as the producer to ingest sample data into a Kafka topic. For this post, we use the kafka-console-producer.sh Kafka console producer client.
  2. Set up an AWS Glue streaming ETL job to process the incoming data. This job extracts data from the Kafka topic, loads it into Amazon Simple Storage Service (Amazon S3), and creates a table in the AWS Glue Data Catalog. By continuously consuming data from the Kafka topic, the ETL job ensures it remains synchronized with the latest streaming data. Moreover, the job incorporates the checkpointing functionality, which tracks the processed records, enabling it to resume processing seamlessly from the point of interruption in the event of a job run failure.
  3. Following the data processing, the streaming job stores data in Amazon S3 and generates a Data Catalog table. This table acts as a metadata layer for the data. To interact with the data stored in Amazon S3, you can use Athena, a serverless and interactive query service. Athena enables the run of SQL-like queries on the data, facilitating seamless exploration and analysis.

For this post, we create the solution resources in the us-east-1 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of ingestion and processing part according to your use case and if you have requirements to create specific process resources only.

  • vpc-mskserverless-client.yaml – This template sets up data the ingestion service resources such as a VPC, MSK Serverless cluster, and S3 bucket
  • gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database, connection, and streaming job

Create data ingestion resources

The vpc-mskserverless-client.yaml stack creates a VPC, private and public subnets, security groups, S3 VPC Endpoint, MSK Serverless cluster, EC2 instance with Kafka client, and S3 bucket. To create the solution resources for data ingestion, complete the following steps:

  1. Launch the stack vpc-mskserverless-client using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.
Parameters Description Sample Value
EnvironmentName Environment name that is prefixed to resource names .
PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone .
PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone .
PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone .
PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone .
VpcCIDR IP range (CIDR notation) for this VPC .
InstanceType Instance type for the EC2 instance t2.micro
LatestAmiId AMI used for the EC2 instance /aws/service/ami-amazon-linux- latest/amzn2-ami-hvm-x86_64-gp2
  1. When the stack creation is complete, retrieve the EC2 instance PublicDNS from the vpc-mskserverless-client stack’s Outputs tab.

The stack creation process can take around 15 minutes to complete.

  1. On the Amazon EC2 console, access the EC2 instance that you created using the CloudFormation template.
  2. Choose the EC2 instance whose InstanceId is shown on the stack’s Outputs tab.

Next, you log in to the EC2 instance using Session Manager, a capability of AWS Systems Manager.

  1. On the Amazon EC2 console, select the instanceid and on the Session Manager tab, choose Connect.


After you log in to the EC2 instance, you create a Kafka topic in the MSK Serverless cluster from the EC2 instance.

  1. In the following export command, provide the MSKBootstrapServers value from the vpc-mskserverless- client stack output for your endpoint:
    $ sudo su – ec2-user
    $ BS=<your-msk-serverless-endpoint (e.g.) boot-xxxxxx.yy.kafka-serverless.us-east-1.a>

  2. Run the following command on the EC2 instance to create a topic called msk-serverless-blog. The Kafka client is already installed in the ec2-user home directory (/home/ec2-user).
    $ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-topics.sh \
    --bootstrap-server $BS \
    --command-config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
    --create –topic msk-serverless-blog \
    --partitions 1
    
    Created topic msk-serverless-blog

After you confirm the topic creation, you can push the data to the MSK Serverless.

  1. Run the following command on the EC2 instance to create a console producer to produce records to the Kafka topic. (For source data, we use nycflights.csv downloaded at the ec2-user home directory /home/ec2-user.)
$ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-console-producer.sh \
--broker-list $BS \
--producer.config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
--topic msk-serverless-blog < nycflights.csv

Next, you set up the data processing service resources, specifically AWS Glue components like the database, table, and streaming job to process the data.

Create data processing resources

The gluejob-setup.yaml CloudFormation template creates a database, table, AWS Glue connection, and AWS Glue streaming job. Retrieve the values for VpcId, GluePrivateSubnet, GlueconnectionSubnetAZ, SecurityGroup, S3BucketForOutput, and S3BucketForGlueScript from the vpc-mskserverless-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup:

  1. Provide parameter values as listed in the following table.
Parameters Description Sample value
EnvironmentName Environment name that is prefixed to resource names. Gluejob-setup
VpcId ID of the VPC for security group. Use the VPC ID created with the first stack. Refer to the first stack’s output.
GluePrivateSubnet Private subnet used for creating the AWS Glue connection. Refer to the first stack’s output.
SecurityGroupForGlueConnection Security group used by the AWS Glue connection. Refer to the first stack’s output.
GlueconnectionSubnetAZ Availability Zone for the first private subnet used for the AWS Glue connection. .
GlueDataBaseName Name of the AWS Glue Data Catalog database. glue_kafka_blog_db
GlueTableName Name of the AWS Glue Data Catalog table. blog_kafka_tbl
S3BucketNameForScript Bucket Name for Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws-gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
GlueWorkerType Worker type for AWS Glue job. For example, G.1X. G.1X
NumberOfWorkers Number of workers in the AWS Glue job. 3
S3BucketNameForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
TopicName MSK topic name that needs to be processed. msk-serverless-blog
MSKBootstrapServers Kafka bootstrap server. boot-30vvr5lg.c1.kafka-serverless.us- east-1.amazonaws.com:9098

The stack creation process can take around 1–2 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created a Kafka type AWS Glue connection, which consists of broker information like the MSK bootstrap server, topic name, and VPC in which the MSK Serverless cluster is created. Most importantly, it specifies the IAM authentication option, which helps AWS Glue authenticate and authorize using IAM authentication while consuming the data from the MSK topic. For further clarity, you can examine the AWS Glue connection and the associated AWS Glue table generated through AWS CloudFormation.

After successfully creating the CloudFormation stack, you can now proceed with processing data using the AWS Glue streaming job with IAM authentication.

Run the AWS Glue streaming job

To process the data from the MSK topic using the AWS Glue streaming job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue streaming job from the GlueJobName row. In the following screenshot, the name is GlueStreamingJob-glue-streaming-job.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue streaming job named GlueStreamingJob-glue-streaming-job.
  3. Choose the job name to open its details page.
  4. Choose Run to start the job.
  5. On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template outputs.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

  1. On the AWS Glue console, choose the AWS Glue streaming job you ran, then choose Stop job run.

Because this is a streaming job, it will continue to run indefinitely until manually stopped. After you verify the data is present in the S3 output bucket, you can stop the job to save cost.

Validate the data in Athena

After the AWS Glue streaming job has successfully created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database and table that the AWS Glue streaming job created.
  4. To validate the data, run the following query to find the flight number, origin, and destination that covered the highest distance in a year:
SELECT distinct(flight),distance,origin,dest,year from "glue_kafka_blog_db"."output" where distance= (select MAX(distance) from "glue_kafka_blog_db"."output")

The following screenshot shows the output of our example query.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-mskserverless-client.

Conclusion

In this post, we demonstrated a use case for building a serverless ETL pipeline for streaming with IAM authentication, which allows you to focus on the outcomes of your analytics. You can also modify the AWS Glue streaming ETL code in this post with transformations and mappings to ensure that only valid data gets loaded to Amazon S3. This solution enables you to harness the prowess of AWS Glue streaming, seamlessly integrated with MSK Serverless through the IAM authentication method. It’s time to act and revolutionize your streaming processes.

Appendix

This section provides more information about how to create the AWS Glue connection on the AWS Glue console, which helps establish the connection to the MSK Serverless cluster and allow the AWS Glue streaming job to authenticate and authorize using IAM authentication while consuming the data from the MSK topic.

  1. On the AWS Glue console, in the navigation pane, under Data catalog, choose Connections.
  2. Choose Create connection.
  3. For Connection name, enter a unique name for your connection.
  4. For Connection type, choose Kafka.
  5. For Connection access, select Amazon managed streaming for Apache Kafka (MSK).
  6. For Kafka bootstrap server URLs, enter a comma-separated list of bootstrap server URLs. Include the port number. For example, boot-xxxxxxxx.c2.kafka-serverless.us-east- 1.amazonaws.com:9098.

  1. For Authentication, choose IAM Authentication.
  2. Select Require SSL connection.
  3. For VPC, choose the VPC that contains your data source.
  4. For Subnet, choose the private subnet within your VPC.
  5. For Security groups, choose a security group to allow access to the data store in your VPC subnet.

Security groups are associated to the ENI attached to your subnet. You must choose at least one security group with a self-referencing inbound rule for all TCP ports.

  1. Choose Save changes.

After you create the AWS Glue connection, you can use the AWS Glue streaming job to consume data from the MSK topic using IAM authentication.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specialized in AWS Glue and Amazon Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS with a specialization in AWS Glue. He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS.

Build streaming data pipelines with Amazon MSK Serverless and IAM authentication

Post Syndicated from Marvin Gersho original https://aws.amazon.com/blogs/big-data/build-streaming-data-pipelines-with-amazon-msk-serverless-and-iam-authentication/

Currently, MSK Serverless only directly supports IAM for authentication using Java. This example shows how to use this mechanism. Additionally, it provides a pattern creating a proxy that can easily be integrated into solutions built in languages other than Java.

The rising trend in today’s tech landscape is the use of streaming data and event-oriented structures. They are being applied in numerous ways, including monitoring website traffic, tracking industrial Internet of Things (IoT) devices, analyzing video game player behavior, and managing data for cutting-edge analytics systems.

Apache Kafka, a top-tier open-source tool, is making waves in this domain. It’s widely adopted by numerous users for building fast and efficient data pipelines, analyzing streaming data, merging data from different sources, and supporting essential applications.

Amazon’s serverless Apache Kafka offering, Amazon Managed Streaming for Apache Kafka (Amazon MSK) Serverless, is attracting a lot of interest. It’s appreciated for its user-friendly approach, ability to scale automatically, and cost-saving benefits over other Kafka solutions. However, a hurdle encountered by many users is the requirement of MSK Serverless to use AWS Identity and Access Management (IAM) access control. At the time of writing, the Amazon MSK library for IAM is exclusive to Kafka libraries in Java, creating a challenge for users of other programming languages. In this post, we aim to address this issue and present how you can use Amazon API Gateway and AWS Lambda to navigate around this obstacle.

SASL/SCRAM authentication vs. IAM authentication

Compared to the traditional authentication methods like Salted Challenge Response Authentication Mechanism (SCRAM), the IAM extension into Apache Kafka through MSK Serverless provides a lot of benefits. Before we delve into those, it’s important to understand what SASL/SCRAM authentication is. Essentially, it’s a traditional method used to confirm a user’s identity before giving them access to a system. This process requires users or clients to provide a user name and password, which the system then cross-checks against stored credentials (for example, via AWS Secrets Manager) to decide whether or not access should be granted.

Compared to this approach, IAM simplifies permission management across AWS environments, enables the creation and strict enforcement of detailed permissions and policies, and uses temporary credentials rather than the typical user name and password authentication. Another benefit of using IAM is that you can use IAM for both authentication and authorization. If you use SASL/SCRAM, you have to additionally manage ACLs via a separate mechanism. In IAM, you can use the IAM policy attached to the IAM principal to define the fine-grained access control for that IAM principal. All of these improvements make the IAM integration a more efficient and secure solution for most use cases.

However, for applications not built in Java, utilizing MSK Serverless becomes tricky. The standard SASL/SCRAM authentication isn’t available, and non-Java Kafka libraries don’t have a way to use IAM access control. This calls for an alternative approach to connect to MSK Serverless clusters.

But there’s an alternative pattern. Without having to rewrite your existing application in Java, you can employ API Gateway and Lambda as a proxy in front of a cluster. They can handle API requests and relay them to Kafka topics instantly. API Gateway takes in producer requests and channels them to a Lambda function, written in Java using the Amazon MSK IAM library. It then communicates with the MSK Serverless Kafka topic using IAM access control. After the cluster receives the message, it can be further processed within the MSK Serverless setup.

You can also utilize Lambda on the consumer side of MSK Serverless topics, bypassing the Java requirement on the consumer side. You can do this by setting Amazon MSK as an event source for a Lambda function. When the Lambda function is triggered, the data sent to the function includes an array of records from the Kafka topic—no need for direct contact with Amazon MSK.

Solution overview

This example walks you through how to build a serverless real-time stream producer application using API Gateway and Lambda.

For testing, this post includes a sample AWS Cloud Development Kit (AWS CDK) application. This creates a demo environment, including an MSK Serverless cluster, three Lambda functions, and an API Gateway that consumes the messages from the Kafka topic.

The following diagram shows the architecture of the resulting application including its data flows.

The data flow contains the following steps:

  1. The infrastructure is defined in an AWS CDK application. By running this application, a set of AWS CloudFormation templates is created.
  2. AWS CloudFormation creates all infrastructure components, including a Lambda function that runs during the deployment process to create a topic in the MSK Serverless cluster and to retrieve the authentication endpoint needed for the producer Lambda function. On destruction of the CloudFormation stack, the same Lambda function gets triggered again to delete the topic from the cluster.
  3. An external application calls an API Gateway endpoint.
  4. API Gateway forwards the request to a Lambda function.
  5. The Lambda function acts as a Kafka producer and pushes the message to a Kafka topic using IAM authentication.
  6. The Lambda event source mapping mechanism triggers the Lambda consumer function and forwards the message to it.
  7. The Lambda consumer function logs the data to Amazon CloudWatch.

Note that we don’t need to worry about Availability Zones. MSK Serverless automatically replicates the data across multiple Availability Zones to ensure high availability of the data.

The demo additionally shows how to use Lambda Powertools for Java to streamline logging and tracing and the IAM authenticator for the simple authentication process outlined in the introduction.

The following sections take you through the steps to deploy, test, and observe the example application.

Prerequisites

The example has the following prerequisites:

  • An AWS account. If you haven’t signed up, complete the following steps:
  • The following software installed on your development machine, or use an AWS Cloud9 environment, which comes with all requirements preinstalled:
  • Appropriate AWS credentials for interacting with resources in your AWS account.

Deploy the solution

Complete the following steps to deploy the solution:

  1. Clone the project GitHub repository and change the directory to subfolder serverless-kafka-iac:
git clone https://github.com/aws-samples/apigateway-lambda-msk-serverless-integration
cd apigateway-lambda-msk-serverless-integration/serverless-kafka-iac
  1. Configure environment variables:
export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query 'Account' --output text)
export CDK_DEFAULT_REGION=$(aws configure get region)
  1. Prepare the virtual Python environment:
python3 -m venv .venv

source .venv/bin/activate

pip3 install -r requirements.txt
  1. Bootstrap your account for AWS CDK usage:
cdk bootstrap aws://$CDK_DEFAULT_ACCOUNT/$CDK_DEFAULT_REGION
  1. Run cdk synth to build the code and test the requirements (ensure docker daemon is running on your machine):
cdk synth
  1. Run cdk deploy to deploy the code to your AWS account:
cdk deploy --all

Test the solution

To test the solution, we generate messages for the Kafka topics by sending calls through the API Gateway from our development machine or AWS Cloud9 environment. We then go to the CloudWatch console to observe incoming messages in the log files of the Lambda consumer function.

  1. Open a terminal on your development machine to test the API with the Python script provided under /serverless_kafka_iac/test_api.py:
python3 test-api.py

  1. On the Lambda console, open the Lambda function named ServerlessKafkaConsumer.

  1. On the Monitor tab, choose View CloudWatch logs to access the logs of the Lambda function.

  1. Choose the latest log stream to access the log files of the last run.

You can review the log entry of the received Kafka messages in the log of the Lambda function.

Trace a request

All components integrate with AWS X-Ray. With AWS X-Ray, you can trace the entire application, which is useful to identify bottlenecks when load testing. You can also trace method runs at the Java method level.

Lambda Powertools for Java allows you to shortcut this process by adding the @Trace annotation to a method to see traces on the method level in X-Ray.

To trace a request end to end, complete the following steps:

  1. On the CloudWatch console, choose Service map in the navigation pane.
  2. Select a component to investigate (for example, the Lambda function where you deployed the Kafka producer).
  3. Choose View traces.

  1. Choose a single Lambda method invocation and investigate further at the Java method level.

Implement a Kafka producer in Lambda

Kafka natively supports Java. To stay open, cloud native, and without third-party dependencies, the producer is written in that language. Currently, the IAM authenticator is only available to Java. In this example, the Lambda handler receives a message from an API Gateway source and pushes this message to an MSK topic called messages.

Typically, Kafka producers are long-living and pushing a message to a Kafka topic is an asynchronous process. Because Lambda is ephemeral, you must enforce a full flush of a submitted message until the Lambda function ends by calling producer.flush():

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package software.amazon.samples.kafka.lambda;
 
// This class is part of the AWS samples package and specifically deals with Kafka integration in a Lambda function.
// It serves as a simple API Gateway to Kafka Proxy, accepting requests and forwarding them to a Kafka topic.
public class SimpleApiGatewayKafkaProxy implements RequestHandler<APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent> {
 
    // Specifies the name of the Kafka topic where the messages will be sent
    public static final String TOPIC_NAME = "messages";
 
    // Logger instance for logging events of this class
    private static final Logger log = LogManager.getLogger(SimpleApiGatewayKafkaProxy.class);
    
    // Factory to create properties for Kafka Producer
    public KafkaProducerPropertiesFactory kafkaProducerProperties = new KafkaProducerPropertiesFactoryImpl();
    
    // Instance of KafkaProducer
    private KafkaProducer<String, String>[KT1]  producer;
 
    // Overridden method from the RequestHandler interface to handle incoming API Gateway proxy events
    @Override
    @Tracing
    @Logging(logEvent = true)
    public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent input, Context context) {
        
        // Creating a response object to send back 
        APIGatewayProxyResponseEvent response = createEmptyResponse();
        try {
            // Extracting the message from the request body
            String message = getMessageBody(input);
 
            // Create a Kafka producer
            KafkaProducer<String, String> producer = createProducer();
 
            // Creating a record with topic name, request ID as key and message as value 
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME, context.getAwsRequestId(), message);
 
            // Sending the record to Kafka topic and getting the metadata of the record
            Future<RecordMetadata>[KT2]  send = producer.send(record);
            producer.flush();
 
            // Retrieve metadata about the sent record
            RecordMetadata metadata = send.get();
 
            // Logging the partition where the message was sent
            log.info(String.format("Message was send to partition %s", metadata.partition()));
 
            // If the message was successfully sent, return a 200 status code
            return response.withStatusCode(200).withBody("Message successfully pushed to kafka");
        } catch (Exception e) {
            // In case of exception, log the error message and return a 500 status code
            log.error(e.getMessage(), e);
            return response.withBody(e.getMessage()).withStatusCode(500);
        }
    }
 
    // Creates a Kafka producer if it doesn't already exist
    @Tracing
    private KafkaProducer<String, String> createProducer() {
        if (producer == null) {
            log.info("Connecting to kafka cluster");
            producer = new KafkaProducer<String, String>(kafkaProducerProperties.getProducerProperties());
        }
        return producer;
    }
 
    // Extracts the message from the request body. If it's base64 encoded, it's decoded first.
    private String getMessageBody(APIGatewayProxyRequestEvent input) {
        String body = input.getBody();
 
        if (input.getIsBase64Encoded()) {
            body = decode(body);
        }
        return body;
    }
 
    // Creates an empty API Gateway proxy response event with predefined headers.
    private APIGatewayProxyResponseEvent createEmptyResponse() {
        Map<String, String> headers = new HashMap<>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Custom-Header", "application/json");
        APIGatewayProxyResponseEvent response = new APIGatewayProxyResponseEvent().withHeaders(headers);
        return response;
    }
}

Connect to Amazon MSK using IAM authentication

This post uses IAM authentication to connect to the respective Kafka cluster. For information about how to configure the producer for connectivity, refer to IAM access control.

Because you configure the cluster via IAM, grant Connect and WriteData permissions to the producer so that it can push messages to Kafka:

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {            
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”
            ],
            “Resource”: “arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid “
        }
    ]
}
 
 
{
    “Version”: “2012-10-17”,
    “Statement”: [
        {            
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”,
                “kafka-cluster: DescribeTopic”,
            ],
            “Resource”: “arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name“
        }
    ]
}

This shows the Kafka excerpt of the IAM policy, which must be applied to the Kafka producer. When using IAM authentication, be aware of the current limits of IAM Kafka authentication, which affect the number of concurrent connections and IAM requests for a producer. Refer to Amazon MSK quota and follow the recommendation for authentication backoff in the producer client:

        Map<String, String> configuration = Map.of(
                “key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “bootstrap.servers”, getBootstrapServer(),
                “security.protocol”, “SASL_SSL”,
                “sasl.mechanism”, “AWS_MSK_IAM”,
                “sasl.jaas.config”, “software.amazon.msk.auth.iam.IAMLoginModule required;”,
                “sasl.client.callback.handler.class”,
				“software.amazon.msk.auth.iam.IAMClientCallbackHandler”,
                “connections.max.idle.ms”, “60”,
                “reconnect.backoff.ms”, “1000”
        );

Additional considerations

Each MSK Serverless cluster can handle 100 requests per second. To reduce IAM authentication requests from the Kafka producer, place it outside of the handler. For frequent calls, there is a chance that Lambda reuses the previously created class instance and only reruns the handler.

For bursting workloads with a high number of concurrent API Gateway requests, this can lead to dropped messages. Although this might be tolerable for some workloads, for others this might not be the case.

In these cases, you can extend the architecture with a buffering technology like Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis Data Streams between API Gateway and Lambda.

To reduce latency, reduce cold start times for Java by changing the tiered compilation level to 1, as described in Optimizing AWS Lambda function performance for Java. Provisioned concurrency ensures that polling Lambda functions don’t need to warm up before requests arrive.

Conclusion

In this post, we showed how to create a serverless integration Lambda function between API Gateway and MSK Serverless as a way to do IAM authentication when your producer is not written in Java. You also learned about the native integration of Lambda and Amazon MSK on the consumer side. Additionally, we showed how to deploy such an integration with the AWS CDK.

The general pattern is suitable for many use cases where you want to use IAM authentication but your producers or consumers are not written in Java, but you still want to take advantage of the benefits of MSK Serverless, like its ability to scale up and down with unpredictable or spikey workloads or its little to no operational overhead of running Apache Kafka.

You can also use MSK Serverless to reduce operational complexity by automating provisioning and the management of capacity needs, including the need to constantly monitor brokers and storage.

For more serverless learning resources, visit Serverless Land.

For more information on MSK Serverless, check out the following:


About the Authors

Philipp Klose is a Global Solutions Architect at AWS based in Munich. He works with enterprise FSI customers and helps them solve business problems by architecting serverless platforms. In this free time, Philipp spends time with his family and enjoys every geek hobby possible.

Daniel Wessendorf is a Global Solutions Architect at AWS based in Munich. He works with enterprise FSI customers and is primarily specialized in machine learning and data architectures. In his free time, he enjoys swimming, hiking, skiing, and spending quality time with his family.

Marvin Gersho is a Senior Solutions Architect at AWS based in New York City. He works with a wide range of startup customers. He previously worked for many years in engineering leadership and hands-on application development, and now focuses on helping customers architect secure and scalable workloads on AWS with a minimum of operational overhead. In his free time, Marvin enjoys cycling and strategy board games.

Nathan Lichtenstein is a Senior Solutions Architect at AWS based in New York City. Primarily working with startups, he ensures his customers build smart on AWS, delivering creative solutions to their complex technical challenges. Nathan has worked in cloud and network architecture in the media, financial services, and retail spaces. Outside of work, he can often be found at a Broadway theater.

Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion

Post Syndicated from Muthu Pitchaimani original https://aws.amazon.com/blogs/big-data/introducing-amazon-msk-as-a-source-for-amazon-opensearch-ingestion/

Ingesting a high volume of streaming data has been a defining characteristic of operational analytics workloads with Amazon OpenSearch Service. Many of these workloads involve either self-managed Apache Kafka or Amazon Managed Streaming for Apache Kafka (Amazon MSK) to satisfy their data streaming needs. Consuming data from Amazon MSK and writing to OpenSearch Service has been a challenge for customers. AWS Lambda, custom code, Kafka Connect, and Logstash have been used for ingesting this data. These methods involve tools that must be built and maintained. In this post, we introduce Amazon MSK as a source to Amazon OpenSearch Ingestion, a serverless, fully managed, real-time data collector for OpenSearch Service that makes this ingestion even easier.

Solution overview

The following diagram shows the flow from data sources to Amazon OpenSearch Service.

The flow contains the following steps:

  1. Data sources produce data and send that data to Amazon MSK
  2. OpenSearch Ingestion consumes the data from Amazon MSK.
  3. OpenSearch Ingestion transforms, enriches, and writes the data into OpenSearch Service.
  4. Users search, explore, and analyze the data with OpenSearch Dashboards.

Prerequisites

You will need a provisioned MSK cluster created with appropriate data sources. The sources, as producers, write data into Amazon MSK. The cluster should be created with the appropriate Availability Zone, storage, compute, security and other configurations to suit your workload needs. To provision your MSK cluster and have your sources producing data, see Getting started using Amazon MSK.

As of this writing, OpenSearch Ingestion supports Amazon MSK provisioned, but not Amazon MSK Serverless. However, OpenSearch Ingestion can reside in the same or different account where Amazon MSK is present. OpenSearch Ingestion uses AWS PrivateLink to read data, so you must turn on multi-VPC connectivity on your MSK cluster. For more information, see Amazon MSK multi-VPC private connectivity in a single Region. OpenSearch Ingestion can write data to Amazon Simple Storage Service (Amazon S3), provisioned OpenSearch Service, and Amazon OpenSearch Service. In this solution, we use a provisioned OpenSearch Service domain as a sink for OSI. Refer to Getting started with Amazon OpenSearch Service to create a provisioned OpenSearch Service domain. You will need appropriate permission to read data from Amazon MSK and write data to OpenSearch Service. The following sections outline the required permissions.

Permissions required

To read from Amazon MSK and write to Amazon OpenSearch Service, you need to create a an AWS Identity and Access Management (IAM) role used by Amazon OpenSearch Ingestion. In this post we use a role called pipeline-Role for this purpose. To create this role please see Creating IAM roles.

Reading from Amazon MSK

OpenSearch Ingestion will need permission to create a PrivateLink connection and other actions that can be performed on your MSK cluster. Edit your MSK cluster policy to include the following snippet with appropriate permissions. If your OpenSearch Ingestion pipeline resides in an account different from your MSK cluster, you will need a second section to allow this pipeline. Use proper semantic conventions when providing the cluster, topic, and group permissions and remove the comments from the policy before using.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "osis-pipelines.aws.internal"
      },
      "Action": [
        "kafka:CreateVpcConnection",
        "kafka:GetBootstrapBrokers",
        "kafka:DescribeCluster"
      ],
      # Change this to your msk arn
      "Resource": "arn:aws:kafka:us-east-1:XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx"
    },    
    ### Following permissions are required if msk cluster is in different account than osi pipeline
    {
      "Effect": "Allow",
      "Principal": {
        # Change this to your sts role arn used in the pipeline
        "AWS": "arn:aws:iam:: XXXXXXXXXXXX:role/PipelineRole"
      },
      "Action": [
        "kafka-cluster:*",
        "kafka:*"
      ],
      "Resource": [
        # Change this to your msk arn
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx",
        # Change this as per your cluster name & kafka topic name
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:topic/test-cluster/xxxxxxxx-xxxx-xx/*",
        # Change this as per your cluster name
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:group/test-cluster/*"
      ]
    }
  ]
}

Edit the pipeline role’s inline policy to include the following permissions. Ensure that you have removed the comments before using the policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": [
                # Change this to your msk arn
                "arn:aws:kafka:us-east-1:XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                # Change this to your kafka topic and cluster name
                "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:topic/test-cluster/xxxxxxxx-xxxx-xx/topic-to-consume"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                # change this as per your cluster name
                "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:group/test-cluster/*"
            ]
        }
    ]
}

Writing to OpenSearch Service

In this section, you provide the pipeline role with necessary permissions to write to OpenSearch Service. As a best practice, we recommend using fine-grained access control in OpenSearch Service. Use OpenSearch dashboards to map a pipeline role to an appropriate backend role. For more information on mapping roles to users, see Managing permissions. For example, all_access is a built-in role that grants administrative permission to all OpenSearch functions. When deploying to a production environment, ensure that you use a role with enough permissions to write to your OpenSearch domain.

Creating OpenSearch Ingestion pipelines

The pipeline role now has the correct set of permissions to read from Amazon MSK and write to OpenSearch Service. Navigate to the OpenSearch Service console, choose Pipelines, then choose Create pipeline.

Choose a suitable name for the pipeline. and se the pipeline capacity with appropriate minimum and maximum OpenSearch Compute Unit (OCU). Then choose ‘AWS-MSKPipeline’ from the dropdown menu as shown below.

Use the provided template to fill in all the required fields. The snippet in the following section shows the fields that needs to be filled in red.

Configuring Amazon MSK source

The following sample configuration snippet shows every setting you need to get the pipeline running:

msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                     # Default is false  
      topics: 
         - name: "<topic name>" 
           group_id: "<consumer group id>" 
           serde_format: json                   # Remove, if Schema Registry is used. (Other option is plaintext)  
 
           # Below defaults can be tuned as needed 
           # fetch_max_bytes: 52428800          Optional 
           # fetch_max_wait: 500                Optional (in msecs) 
           # fetch_min_bytes: 1                 Optional (in MB) 
           # max_partition_fetch_bytes: 1048576 Optional 
           # consumer_max_poll_records: 500     Optional                                
           # auto_offset_reset: "earliest"      Optional (other option is "earliest") 
           # key_mode: include_as_field         Optional (other options are include_as_field, discard)  
 
       
           serde_format: json                   # Remove, if Schema Registry is used. (Other option is plaintext)   
 
      # Enable this configuration if Glue schema registry is used            
      # schema:                                 
      #   type: aws_glue 
 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        # sts_role_arn: "arn:aws:iam::XXXXXXXXXXXX:role/Example-Role" 
        # Provide the region of the domain. 
        # region: "us-west-2" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-west-2:XXXXXXXXXXXX:cluster/msk-prov-1/id" 
 
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          # hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
          # sts_role_arn: "arn:aws:iam::XXXXXXXXXXXX:role/Example-Role" 
          # Provide the region of the domain. 
          # region: "us-east-1" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_${getMetadata(\"kafka_topic\")}-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          # distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          # dlq: 
            # s3: 
            # Provide an S3 bucket 

We use the following parameters:

  • acknowledgements – Set to true for OpenSearch Ingestion to ensure that the data is delivered to the sinks before committing the offsets in Amazon MSK. The default value is set to false.
  • name – This specifies topic OpenSearch Ingestion can read from. You can read a maximum of four topics per pipeline.
  • group_id – This parameter specifies that the pipeline is part of the consumer group. With this setting, a single consumer group can be scaled to as many pipelines as needed for very high throughput.
  • serde_format – Specifies a deserialization method to be used for the data read from Amazon MSK. The options are JSON and plaintext.
  • AWS sts_role_arn and OpenSearch sts_role_arn – Specifies the role OpenSearch Ingestion uses for reading and writing. Specify the ARN of the role you created from the last section. OpenSearch Ingestion currently uses the same role for reading and writing.
  • MSK arn – Specifies the MSK cluster to consume data from.
  • OpenSearch host and index – Specifies the OpenSearch domain URL and where the index should write.

When you have configured the Kafka source, choose the network access type and log publishing options. Public pipelines do not involve PrivateLink and they will not incur a cost associated with PrivateLink. Choose Next and review all configurations. When you are satisfied, choose Create pipeline.

Log in to OpenSearch Dashboards to see your indexes and search the data.

Recommended compute units (OCUs) for the MSK pipeline

Each compute unit has one consumer per topic. Brokers will balance partitions among these consumers for a given topic. However, when the number of partitions is greater than the number of consumers, Amazon MSK will host multiple partitions on every consumer. OpenSearch Ingestion has built-in auto scaling to scale up or down based on CPU usage or number of pending records in the pipeline. For optimal performance, partitions should be distributed across many compute units for parallel processing. If topics have a large number of partitions, for example, more than 96 (maximum OCUs per pipeline), we recommend configuring a pipeline with 1–96 OCUs because it will auto scale as needed. If a topic has a low number of partitions, for example, less than 96, then keep the maximum compute unit to same as the number of partitions. When pipeline has more than one topic, user can pick a topic with highest number of partitions as a reference to configure maximum computes units. By adding another pipeline with a new set of OCUs to the same topic and consumer group, you can scale the throughput almost linearly.

Clean up

To avoid future charges, clean up any unused resources from your AWS account.

Conclusion

In this post, you saw how to use Amazon MSK as a source for OpenSearch Ingestion. This not only addresses the ease of data consumption from Amazon MSK, but it also relieves you of the burden of self-managing and manually scaling consumers for varying and unpredictable high-speed, streaming operational analytics data. Please refer to the ‘sources’ list under ‘supported plugins’ section for exhaustive list of sources from which you can ingest data.


About the authors

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Arjun Nambiar is a Product Manager with Amazon OpenSearch Service. He focusses on ingestion technologies that enable ingesting data from a wide variety of sources into Amazon OpenSearch Service at scale. Arjun is interested in large scale distributed systems and cloud-native technologies and is based out of Seattle, Washington.

Raj Sharma is a Sr. SDM with Amazon OpenSearch Service. He builds large-scale distributed applications and solutions. Raj is interested in the topics of Analytics, databases, networking and security, and is based out of Palo Alto, California.

Best practices for implementing event-driven architectures in your organization

Post Syndicated from Emanuele Levi original https://aws.amazon.com/blogs/architecture/best-practices-for-implementing-event-driven-architectures-in-your-organization/

Event-driven architectures (EDA) are made up of components that detect business actions and changes in state, and encode this information in event notifications. Event-driven patterns are becoming more widespread in modern architectures because:

  • they are the main invocation mechanism in serverless patterns.
  • they are the preferred pattern for decoupling microservices, where asynchronous communications and event persistence are paramount.
  • they are widely adopted as a loose-coupling mechanism between systems in different business domains, such as third-party or on-premises systems.

Event-driven patterns have the advantage of enabling team independence through the decoupling and decentralization of responsibilities. This decentralization trend in turn, permits companies to move with unprecedented agility, enhancing feature development velocity.

In this blog, we’ll explore the crucial components and architectural decisions you should consider when adopting event-driven patterns, and provide some guidance on organizational structures.

Division of responsibilities

The communications flow in EDA (see What is EDA?) is initiated by the occurrence of an event. Most production-grade event-driven implementations have three main components, as shown in Figure 1: producers, message brokers, and consumers.

Three main components of an event-driven architecture

Figure 1. Three main components of an event-driven architecture

Producers, message brokers, and consumers typically assume the following roles:

Producers

Producers are responsible for publishing the events as they happen. They are the owners of the event schema (data structure) and semantics (meaning of the fields, such as the meaning of the value of an enum field). As this is the only contract (coupling) between producers and the downstream components of the system, the schema and its semantics are crucial in EDA. Producers are responsible for implementing a change management process, which involves both non-breaking and breaking changes. With introduction of breaking changes, consumers are able to negotiate the migration process with producers.

Producers are “consumer agnostic”, as their boundary of responsibility ends when an event is published.

Message brokers

Message brokers are responsible for the durability of the events, and will keep an event available for consumption until it is successfully processed. Message brokers ensure that producers are able to publish events for consumers to consume, and they regulate access and permissions to publish and consume messages.

Message brokers are largely “events agnostic”, and do not generally access or interpret the event content. However, some systems provide a routing mechanism based on the event payload or metadata.

Consumers

Consumers are responsible for consuming events, and own the semantics of the effect of events. Consumers are usually bounded to one business context. This means the same event will have different effect semantics for different consumers. Crucial architectural choices when implementing a consumer involve the handling of unsuccessful message deliveries or duplicate messages. Depending on the business interpretation of the event, when recovering from failure a consumer might permit duplicate events, such as with an idempotent consumer pattern.

Crucially, consumers are “producer agnostic”, and their boundary of responsibility begins when an event is ready for consumption. This allows new consumers to onboard into the system without changing the producer contracts.

Team independence

In order to enforce the division of responsibilities, companies should organize their technical teams by ownership of producers, message brokers, and consumers. Although the ownership of producers and consumers is straightforward in an EDA implementation, the ownership of the message broker may not be. Different approaches can be taken to identify message broker ownership depending on your organizational structure.

Decentralized ownership

Ownership of the message broker in a decentralized ownership organizational structure

Figure 2. Ownership of the message broker in a decentralized ownership organizational structure

In a decentralized ownership organizational structure (see Figure 2), the teams producing events are responsible for managing their own message brokers and the durability and availability of the events for consumers.

The adoption of topic fanout patterns based on Amazon Simple Queue Service (SQS) and Amazon Simple Notification Service (SNS) (see Figure 3), can help companies implement a decentralized ownership pattern. A bus-based pattern using Amazon EventBridge can also be similarly utilized (see Figure 4).

Topic fanout pattern based on Amazon SQS and Amazon SNS

Figure 3. Topic fanout pattern based on Amazon SQS and Amazon SNS

Events bus pattern based on Amazon EventBridge

Figure 4. Events bus pattern based on Amazon EventBridge

The decentralized ownership approach has the advantage of promoting team independence, but it is not a fit for every organization. In order to be implemented effectively, a well-established DevOps culture is necessary. In this scenario, the producing teams are responsible for managing the message broker infrastructure and the non-functional requirements standards.

Centralized ownership

Ownership of the message broker in a centralized ownership organizational structure

Figure 5. Ownership of the message broker in a centralized ownership organizational structure

In a centralized ownership organizational structure, a central team (we’ll call it the platform team) is responsible for the management of the message broker (see Figure 5). Having a specialized platform team offers the advantage of standardized implementation of non-functional requirements, such as reliability, availability, and security. One disadvantage is that the platform team is a single point of failure in both the development and deployment lifecycle. This could become a bottleneck and put team independence and operational efficiency at risk.

Streaming pattern based on Amazon MSK and Kinesis Data Streams

Figure 6. Streaming pattern based on Amazon MSK and Kinesis Data Streams

On top of the implementation patterns mentioned in the previous section, the presence of a dedicated team makes it easier to implement streaming patterns. In this case, a deeper understanding on how the data is partitioned and how the system scales is required. Streaming patterns can be implemented using services such as Amazon Managed Streaming for Apache Kafka (MSK) or Amazon Kinesis Data Streams (see Figure 6).

Best practices for implementing event-driven architectures in your organization

The centralized and decentralized ownership organizational structures enhance team independence or standardization of non-functional requirements respectively. However, they introduce possible limits to the growth of the engineering function in a company. Inspired by the two approaches, you can implement a set of best practices which are aimed at minimizing those limitations.

Best practices for implementing event-driven architectures

Figure 7. Best practices for implementing event-driven architectures

  1. Introduce a cloud center of excellence (CCoE). A CCoE standardizes non-functional implementation across engineering teams. In order to promote a strong DevOps culture, the CCoE should not take the form of an external independent team, but rather be a collection of individual members representing the various engineering teams.
  2. Decentralize team ownership. Decentralize ownership and maintenance of the message broker to producing teams. This will maximize team independence and agility. It empowers the team to use the right tool for the right job, as long as they conform to the CCoE guidelines.
  3. Centralize logging standards and observability strategies. Although it is a best practice to decentralize team ownership of the components of an event-driven architecture, logging standards and observability strategies should be centralized and standardized across the engineering function. This centralization provides for end-to-end tracing of requests and events, which are powerful diagnosis tools in case of any failure.

Conclusion

In this post, we have described the main architectural components of an event-driven architecture, and identified the ownership of the message broker as one of the most important architectural choices you can make. We have described a centralized and decentralized organizational approach, presenting the strengths of the two approaches, as well as the limits they impose on the growth of your engineering organization. We have provided some best practices you can implement in your organization to minimize these limitations.

Further reading:
To start your journey building event-driven architectures in AWS, explore the following:

Orca Security’s journey to a petabyte-scale data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/orca-securitys-journey-to-a-petabyte-scale-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Eliad Gat and Oded Lifshiz from Orca Security.

With data becoming the driving force behind many industries today, having a modern data architecture is pivotal for organizations to be successful. One key component that plays a central role in modern data architectures is the data lake, which allows organizations to store and analyze large amounts of data in a cost-effective manner and run advanced analytics and machine learning (ML) at scale.

Orca Security is an industry-leading Cloud Security Platform that identifies, prioritizes, and remediates security risks and compliance issues across your AWS Cloud estate. Orca connects to your environment in minutes with patented SideScanning technology to provide complete coverage across vulnerabilities, malware, misconfigurations, lateral movement risk, weak and leaked passwords, overly permissive identities, and more.

The Orca Platform is powered by a state-of-the-art anomaly detection system that uses cutting-edge ML algorithms and big data capabilities to detect potential security threats and alert customers in real time, ensuring maximum security for their cloud environment. At the core of Orca’s anomaly detection system is its transactional data lake, which enables the company’s data scientists, analysts, data engineers, and ML specialists to extract valuable insights from vast amounts of data and deliver innovative cloud security solutions to its customers.

In this post, we describe Orca’s journey building a transactional data lake using Amazon Simple Storage Service (Amazon S3), Apache Iceberg, and AWS Analytics. We explore why Orca chose to build a transactional data lake and examine the key considerations that guided the selection of Apache Iceberg as the preferred table format.

In addition, we describe the Orca Platform architecture and the technologies used. Lastly, we discuss the challenges encountered throughout the project, present the solutions used to address them, and share valuable lessons learned.

Why did Orca build a data lake?

Prior to the creation of the data lake, Orca’s data was distributed among various data silos, each owned by a different team with its own data pipelines and technology stack. This setup led to several issues, including scaling difficulties as the data size grew, maintaining data quality, ensuring consistent and reliable data access, high costs associated with storage and processing, and difficulties supporting streaming use cases. Moreover, running advanced analytics and ML on disparate data sources proved challenging. To overcome these issues, Orca decided to build a data lake.

A data lake is a centralized data repository that enables organizations to store and manage large volumes of structured and unstructured data, eliminating data silos and facilitating advanced analytics and ML on the entire data. By decoupling storage and compute, data lakes promote cost-effective storage and processing of big data.

Why did Orca choose Apache Iceberg?

Orca considered several table formats that have evolved in recent years to support its transactional data lake. Amongst the options, Apache Iceberg stood out as the ideal choice because it met all of Orca’s requirements.

First, Orca sought a transactional table format that ensures data consistency and fault tolerance. Apache Iceberg’s transactional and ACID guarantees, which allow concurrent read and write operations while ensuring data consistency and simplified fault handling, fulfill this requirement. Furthermore, Apache Iceberg’s support for time travel and rollback capabilities makes it highly suitable for addressing data quality issues by reverting to a previous state in a consistent manner.

Second, a key requirement was to adopt an open table format that integrates with various processing engines. This was to avoid vendor lock-in and allow teams to choose the processing engine that best suits their needs. Apache Iceberg’s engine-agnostic and open design meets this requirement by supporting all popular processing engines, including Apache Spark, Amazon Athena, Apache Flink, Trino, Presto, and more.

In addition, given the substantial data volumes handled by the system, an efficient table format was required that can support querying petabytes of data very fast. Apache Iceberg’s architecture addresses this need by efficiently filtering and reducing scanned data, resulting in accelerated query times.

An additional requirement was to allow seamless schema changes without impacting end-users. Apache Iceberg’s range of features, including schema evolution, hidden partitions, and partition evolution, addresses this requirement.

Lastly, it was important for Orca to choose a table format that is widely adopted. Apache Iceberg’s growing and active community aligned with the requirement for a popular and community-backed table format.

Solution overview

Orca’s data lake is based on open-source technologies that seamlessly integrate with Apache Iceberg. The system ingests data from various sources such as cloud resources, cloud activity logs, and API access logs, and processes billions of messages, resulting in terabytes of data daily. This data is sent to Apache Kafka, which is hosted on Amazon Managed Streaming for Apache Kafka (Amazon MSK). It is then processed using Apache Spark Structured Streaming running on Amazon EMR and stored in the data lake. Amazon EMR streamlines the process of loading all required Iceberg packages and dependencies, ensuring that the data is stored in Apache Iceberg format and ready for consumption as quickly as possible.

The data lake is built on top of Amazon S3 using Apache Iceberg table format with Apache Parquet as the underlying file format. In addition, the AWS Glue Data Catalog enables data discovery, and AWS Identity and Access Management (IAM) enforces secure access controls for the lake and its operations.

The data lake serves as the foundation for a variety of capabilities that are supported by different engines.

Data pipelines built on Apache Spark and Athena SQL analyze and process the data stored in the data lake. These data pipelines generate valuable insights and curated data that are stored in Apache Iceberg tables for downstream usage. This data is then used by various applications for streaming analytics, business intelligence, and reporting.

Amazon SageMaker is used to build, train, and deploy a range of ML models. Specifically, the system uses Amazon SageMaker Processing jobs to process the data stored in the data lake, employing the AWS SDK for Pandas (previously known as AWS Wrangler) for various data transformation operations, including cleaning, normalization, and feature engineering. This ensures that the data is suitable for training purposes. Additionally, SageMaker training jobs are employed for training the models. After the models are trained, they are deployed and used to identify anomalies and alert customers in real time to potential security threats. The following diagram illustrates the solution architecture.

Orca security Data Lake Architecture

Challenges and lessons learned

Orca faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing EMR streaming ingestion for high throughput
  • Taming the small files problem for fast reads
  • Maximizing performance with Athena version 3
  • Maintaining Apache Iceberg tables
  • Managing data retention
  • Monitoring the data lake infrastructure and operations
  • Mitigating data quality issues

In this section, we describe each of these challenges and the solutions implemented to address them.

Determining optimal table partitioning

Determining optimal partitioning for each table is very important in order to optimize query performance and minimize the impact on teams querying the tables when partitioning changes. Apache Iceberg’s hidden partitions combined with partition transformations proved to be valuable in achieving this goal because it allowed for transparent changes to partitioning without impacting end-users. Additionally, partition evolution enables experimentation with various partitioning strategies to optimize cost and performance without requiring a rewrite of the table’s data every time.

For example, with these features, Orca was able to easily change several of its table partitioning from DAY to HOUR with no impact on user queries. Without this native Iceberg capability, they would have needed to coordinate the new schema with all the teams that query the tables and rewrite the entire data, which would have been a costly, time-consuming, and error-prone process.

Optimizing EMR streaming ingestion for high throughput

As mentioned previously, the system ingests billions of messages daily, resulting in terabytes of data processed and stored each day. Therefore, optimizing the EMR clusters for this type of load while maintaining high throughput and low costs has been an ongoing challenge. Orca addressed this in several ways.

First, Orca chose to use instance fleets with its EMR clusters because they allow optimized resource allocation by combining different instance types and sizes. Instance fleets improve resilience by allowing multiple Availability Zones to be configured. As a result, the cluster will launch in an Availability Zone with all the required instance types, preventing capacity limitations. Additionally, instance fleets can use both Amazon Elastic Compute Cloud (Amazon EC2) On-Demand and Spot instances, resulting in cost savings.

The process of sizing the cluster for high throughput and lower costs involved adjusting the number of core and task nodes, selecting suitable instance types, and fine-tuning CPU and memory configurations. Ultimately, Orca was able to find an optimal configuration consisting of on-demand core nodes and spot task nodes of varying sizes, which provided high throughput but also ensured compliance with SLAs.

Orca also found that using different Kafka Spark Structured Streaming properties, such as minOffsetsPerTrigger, maxOffsetsPerTrigger, and minPartitions, provided higher throughput and better control of the load. Using minPartitions, which enables better parallelism and distribution across a larger number of tasks, was particularly useful for consuming high lags quickly.

Lastly, when dealing with a high data ingestion rate, Amazon S3 may throttle the requests and return 503 errors. To address this scenario, Iceberg offers a table property called write.object-storage.enabled, which incorporates a hash prefix into the stored S3 object path. This approach effectively mitigates throttling problems.

Taming the small files problem for fast reads

A common challenge often encountered when ingesting streaming data into the data lake is the creation of many small files. This can have a negative impact on read performance when querying the data with Athena or Apache Spark. Having a high number of files leads to longer query planning and runtimes due to the need to process and read each file, resulting in overhead for file system operations and network communication. Additionally, this can result in higher costs due to the large number of S3 PUT and GET requests required.

To address this challenge, Apache Spark Structured Streaming provides the trigger mechanism, which can be used to tune the rate at which data is committed to Apache Iceberg tables. The commit rate has a direct impact on the number of files being produced. For instance, a higher commit rate, corresponding to a shorter time interval, results in lots of data files being produced.

In certain cases, launching the Spark cluster on an hourly basis and configuring the trigger to AvailableNow facilitated the processing of larger data batches and reduced the number of small files created. Although this approach led to cost savings, it did involve a trade-off of reduced data freshness. However, this trade-off was deemed acceptable for specific use cases.

In addition, to address preexisting small files within the data lake, Apache Iceberg offers a data files compaction operation that combines these smaller files into larger ones. Running this operation on a schedule is highly recommended to optimize the number and size of the files. Compaction also proves valuable in handling late-arriving data and enables the integration of this data into consolidated files.

Maximizing performance with Athena version 3

Orca was an early adopter of Athena version 3, Amazon’s implementation of the Trino query engine, which provides extensive support for Apache Iceberg. Whenever possible, Orca preferred using Athena over Apache Spark for data processing. This preference was driven by the simplicity and serverless architecture of Athena, which led to reduced costs and easier usage, unlike Spark, which typically required provisioning and managing a dedicated cluster at higher costs.

In addition, Orca used Athena as part of its model training and as the primary engine for ad hoc exploratory queries conducted by data scientists, business analysts, and engineers. However, for maintaining Iceberg tables and updating table properties, Apache Spark remained the more scalable and feature-rich option.

Maintaining Apache Iceberg tables

Ensuring optimal query performance and minimizing storage overhead became a significant challenge as the data lake grew to a petabyte scale. To address this challenge, Apache Iceberg offers several maintenance procedures, such as the following:

  • Data files compaction – This operation, as mentioned earlier, involves combining smaller files into larger ones and reorganizing the data within them. This operation not only reduces the number of files but also enables data sorting based on different columns or clustering similar data using z-ordering. Using Apache Iceberg’s compaction results in significant performance improvements, especially for large tables, making a noticeable difference in query performance between compacted and uncompacted data.
  • Expiring old snapshots – This operation provides a way to remove outdated snapshots and their associated data files, enabling Orca to maintain low storage costs.

Running these maintenance procedures efficiently and cost-effectively using Apache Spark, particularly the compaction operation, which operates on terabytes of data daily, requires careful consideration. This entails appropriately sizing the Spark cluster running on EMR and adjusting various settings such as CPU and memory.

In addition, using Apache Iceberg’s metadata tables proved to be very helpful in identifying issues related to the physical layout of Iceberg’s tables, which can directly impact query performance. Metadata tables offer insights into the physical data storage layout of the tables and offer the convenience of querying them with Athena version 3. By accessing the metadata tables, crucial information about tables’ data files, manifests, history, partitions, snapshots, and more can be obtained, which aids in understanding and optimizing the table’s data layout.

For instance, the following queries can uncover valuable information about the underlying data:

  • The number of files and their average size per partition:
    >SELECT partition, file_count, (total_size / file_count) AS avg_file_size FROM "db"."table$partitions"

  • The number of data files pointed to by each manifest:
    SELECT path, added_data_files_count + existing_data_files_count AS number_of_data_files FROM "db"."table$manifests"

  • Information about the data files:
    SELECT file_path, file_size_in_bytes FROM "db"."table$files"

  • Information related to data completeness:
    SELECT record_count, partition FROM "db"."table$partitions"

Managing data retention

Effective management of data retention in a petabyte-scale data lake is crucial to ensure low storage costs as well as to comply with GDPR. However, implementing such a process can be challenging when dealing with Iceberg data stored in S3 buckets, because deleting files based on simple S3 lifecycle policies could potentially cause table corruption. This is because Iceberg’s data files are referenced in manifest files, so any changes to data files must also be reflected in the manifests.

To address this challenge, certain considerations must be taken into account while handling data retention properly. Apache Iceberg provides two modes for handling deletes, namely copy-on-write (CoW), and merge-on-read (MoR). In CoW mode, Iceberg rewrites data files at the time of deletion and creates new data files, whereas in MoR mode, instead of rewriting the data files, a delete file is written that lists the position of deleted records in files. These files are then reconciled with the remaining data during read time.

In favor of faster read times, CoW mode is preferable and when used in conjunction with the expiring old snapshots operation, it allows for the hard deletion of data files that have exceeded the set retention period.

In addition, by storing the data sorted based on the field that will be utilized for deletion (for example, organizationID), it’s possible to reduce the number of files that require rewriting. This optimization significantly enhances the efficiency of the deletion process, resulting in improved deletion times.

Monitoring the data lake infrastructure and operations

Managing a data lake infrastructure is challenging due to the various components it encompasses, including those responsible for data ingestion, storage, processing, and querying.

Effective monitoring of all these components involves tracking resource utilization, data ingestion rates, query runtimes, and various other performance-related metrics, and is essential for maintaining optimal performance and detecting issues as soon as possible.

Monitoring Amazon EMR was crucial because it played a vital role in the system for data ingestion, processing, and maintenance. Orca monitored the cluster status and resource usage of Amazon EMR by utilizing the available metrics through Amazon CloudWatch. Furthermore, it used JMX Exporter and Prometheus to scrape specific Apache Spark metrics and create custom metrics to further improve the pipelines’ observability.

Another challenge emerged when attempting to further monitor the ingestion progress through Kafka lag. Although Kafka lag tracking is the standard method for monitoring ingestion progress, it posed a challenge because Spark Structured Streaming manages its offsets internally and doesn’t commit them back to Kafka. To overcome this, Orca utilized the progress of the Spark Structured Streaming Query Listener (StreamingQueryListener) to monitor the processed offsets, which were then committed to a dedicated Kafka consumer group for lag monitoring.

In addition, to ensure optimal query performance and identify potential performance issues, it was essential to monitor Athena queries. Orca addressed this by using key metrics from Athena and the AWS SDK for Pandas, specifically TotalExecutionTime and ProcessedBytes. These metrics helped identify any degradation in query performance and keep track of costs, which were based on the size of the data scanned.

Mitigating data quality issues

Apache Iceberg’s capabilities and overall architecture played a key role in mitigating data quality challenges.

One of the ways Apache Iceberg addresses these challenges is through its schema evolution capability, which enables users to modify or add columns to a table’s schema without rewriting the entire data. This feature prevents data quality issues that may arise due to schema changes, because the table’s schema is managed as part of the manifest files, ensuring safe changes.

Furthermore, Apache Iceberg’s time travel feature provides the ability to review a table’s history and roll back to a previous snapshot. This functionality has proven to be extremely useful in identifying potential data quality issues and swiftly resolving them by reverting to a previous state with known data integrity.

These robust capabilities ensure that data within the data lake remains accurate, consistent, and reliable.

Conclusion

Data lakes are an essential part of a modern data architecture, and now it’s easier than ever to create a robust, transactional, cost-effective, and high-performant data lake by using Apache Iceberg, Amazon S3, and AWS Analytics services such as Amazon EMR and Athena.

Since building the data lake, Orca has observed significant improvements. The data lake infrastructure has allowed Orca’s platform to have seamless scalability while reducing the cost of running its data pipelines by over 50% utilizing Amazon EMR. Additionally, query costs were reduced by more than 50% using the efficient querying capabilities of Apache Iceberg and Athena version 3.

Most importantly, the data lake has made a profound impact on Orca’s platform and continues to play a key role in its success, supporting new use cases such as change data capture (CDC) and others, and enabling the development of cutting-edge cloud security solutions.

If Orca’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how this solution can address them.
  • Reach out to experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. This hands-on experience will provide valuable insights into the complexities of a transactional data lake.

Embarking on a journey to a transactional data lake using Amazon S3, Apache Iceberg, and AWS Analytics can vastly improve your organization’s data infrastructure, enabling advanced analytics and machine learning, and unlocking insights that drive innovation.


About the Authors

Eliad Gat is a Big Data & AI/ML Architect at Orca Security. He has over 15 years of experience designing and building large-scale cloud-native distributed systems, specializing in big data, analytics, AI, and machine learning.

Oded Lifshiz is a Principal Software Engineer at Orca Security. He enjoys combining his passion for delivering innovative, data-driven solutions with his expertise in designing and building large-scale machine learning pipelines.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan also leads the Apache Iceberg Israel community.

Carlos Rodrigues is a Big Data Specialist Solutions Architect at Amazon Web Services. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Hudi and Apache Iceberg.

Sofia Zilberman is a Sr. Analytics Specialist Solutions Architect at Amazon Web Services. She has a track record of 15 years of creating large-scale, distributed processing systems. She remains passionate about big data technologies and architecture trends, and is constantly on the lookout for functional and technological innovations.

Let’s Architect! Open-source technologies on AWS

Post Syndicated from Vittorio Denti original https://aws.amazon.com/blogs/architecture/lets-architect-open-source-technologies-on-aws/

We brought you a Let’s Architect! blog post about open-source on AWS that covered some technologies with development led by AWS/Amazon, as well as well-known solutions available on managed AWS services. Today, we’re following the same approach to share more insights about the process itself for developing open-source. That’s why the first topic we discuss in this post is a re:Invent talk from Heitor Lessa, Principal Solutions Architect at AWS, explaining some interesting approaches for developing and scaling successful open-source projects.

This edition of Let’s Architect! also touches on observability with Open Telemetry, Apache Kafka on AWS, and Infrastructure as Code with an hands-on workshop on AWS Cloud Development Kit (AWS CDK).

Powertools for AWS Lambda: Lessons from the road to 10 million downloads

Powertools for AWS Lambda is an open-source library to help engineering teams implement serverless best practices. In two years, Powertools went from an initial prototype to a fast-growing project in the open-source world. Rapid growth along with support from a wide community led to challenges from balancing new features with operational excellence to triaging bug reports and RFCs and scaling and redesigning documentation.

In this session, you can learn about Powertools for AWS Lambda to understand what it is and the problems it solves. Moreover, there are many valuable lessons to learn how to create and scale a successful open-source project. From managing the trade-off between releasing new features and achieving operational stability to measuring the impact of the project, there are many challenges in open-source projects that require careful thought.

Take me to this video!

Heitor Lessa describing one the key lessons: development and releasing new features should be as important as the other activities (governance, operational excellence, and more)

Heitor Lessa describing one of the key lessons: development and releasing new features should be as important as the other activities (governance, operational excellence, and more).

Observability the open-source way

The recent blog post Let’s Architect! Monitoring production systems at scale talks about the importance of monitoring. Setting up observability is critical to maintain application and infrastructure health, but instrumenting applications to collect monitoring signals such as metrics and logs can be challenging when using vendor-specific SDKs.

This video introduces you to OpenTelemetry, an open-source observability framework. OpenTelemetry provides a flexible, single vendor-agnostic SDK based on open-source specifications that developers can use to instrument and collect signals from applications. This resource explains how it works in practice and how to monitor microservice-based applications with the OpenTelemetry SDK.

Take me to this video!

With AWS Distro for OpenTelemetry, you can collect data from your AWS resources.

With AWS Distro for OpenTelemetry, you can collect data from your AWS resources.

Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost

Apache Kafka is an open-source streaming data store that decouples applications producing streaming data (producers) into its data store from applications consuming streaming data (consumers) from its data store. Amazon Managed Streaming for Apache Kafka (Amazon MSK) allows you to use the open-source version of Apache Kafka with the service managing infrastructure and operations for you.

This blog post explains how the underlying infrastructure configuration can affect Apache Kafka performance. You can learn strategies on how to size the clusters to meet the desired throughput, availability, and latency requirements. This resource helps you discover strategies to find the optimal sizing for your resources, and learn the mental models adopted to conduct the investigation and derive the conclusions.

Take me to this blog!

Comparisons of put latencies for three clusters with different broker sizes

Comparisons of put latencies for three clusters with different broker sizes

AWS Cloud Development Kit workshop

AWS Cloud Development Kit (AWS CDK) is an open-source software development framework that allows you to provision cloud resources programmatically (Infrastructure as Code or IaC) by using familiar programming languages such as Python, Typescript, Javascript, Java, Go, and C#/.Net.

CDK allows you to create reusable template and assets, test your infrastructure, make deployments repeatable, and make your cloud environment stable by removing manual (and error-prone) operations. This workshop introduces you to CDK, where you can learn how to provision an initial simple application as well as become familiar with more advanced concepts like CDK constructs.

Take me to this workshop!

This construct can be attached to any Lambda function that is used as an API Gateway backend. It counts how many requests were issued to each URL.

This construct can be attached to any Lambda function that is used as an API Gateway backend. It counts how many requests were issued to each URL.

See you next time!

Thanks for joining our conversation! To find all the blogs from this series, you can check out the Let’s Architect! list of content on the AWS Architecture Blog.