All posts by Marvin Gersho

Build streaming data pipelines with Amazon MSK Serverless and IAM authentication

Post Syndicated from Marvin Gersho original https://aws.amazon.com/blogs/big-data/build-streaming-data-pipelines-with-amazon-msk-serverless-and-iam-authentication/

Currently, MSK Serverless only directly supports IAM for authentication using Java. This example shows how to use this mechanism. Additionally, it provides a pattern creating a proxy that can easily be integrated into solutions built in languages other than Java.

The rising trend in today’s tech landscape is the use of streaming data and event-oriented structures. They are being applied in numerous ways, including monitoring website traffic, tracking industrial Internet of Things (IoT) devices, analyzing video game player behavior, and managing data for cutting-edge analytics systems.

Apache Kafka, a top-tier open-source tool, is making waves in this domain. It’s widely adopted by numerous users for building fast and efficient data pipelines, analyzing streaming data, merging data from different sources, and supporting essential applications.

Amazon’s serverless Apache Kafka offering, Amazon Managed Streaming for Apache Kafka (Amazon MSK) Serverless, is attracting a lot of interest. It’s appreciated for its user-friendly approach, ability to scale automatically, and cost-saving benefits over other Kafka solutions. However, a hurdle encountered by many users is the requirement of MSK Serverless to use AWS Identity and Access Management (IAM) access control. At the time of writing, the Amazon MSK library for IAM is exclusive to Kafka libraries in Java, creating a challenge for users of other programming languages. In this post, we aim to address this issue and present how you can use Amazon API Gateway and AWS Lambda to navigate around this obstacle.

SASL/SCRAM authentication vs. IAM authentication

Compared to the traditional authentication methods like Salted Challenge Response Authentication Mechanism (SCRAM), the IAM extension into Apache Kafka through MSK Serverless provides a lot of benefits. Before we delve into those, it’s important to understand what SASL/SCRAM authentication is. Essentially, it’s a traditional method used to confirm a user’s identity before giving them access to a system. This process requires users or clients to provide a user name and password, which the system then cross-checks against stored credentials (for example, via AWS Secrets Manager) to decide whether or not access should be granted.

Compared to this approach, IAM simplifies permission management across AWS environments, enables the creation and strict enforcement of detailed permissions and policies, and uses temporary credentials rather than the typical user name and password authentication. Another benefit of using IAM is that you can use IAM for both authentication and authorization. If you use SASL/SCRAM, you have to additionally manage ACLs via a separate mechanism. In IAM, you can use the IAM policy attached to the IAM principal to define the fine-grained access control for that IAM principal. All of these improvements make the IAM integration a more efficient and secure solution for most use cases.

However, for applications not built in Java, utilizing MSK Serverless becomes tricky. The standard SASL/SCRAM authentication isn’t available, and non-Java Kafka libraries don’t have a way to use IAM access control. This calls for an alternative approach to connect to MSK Serverless clusters.

But there’s an alternative pattern. Without having to rewrite your existing application in Java, you can employ API Gateway and Lambda as a proxy in front of a cluster. They can handle API requests and relay them to Kafka topics instantly. API Gateway takes in producer requests and channels them to a Lambda function, written in Java using the Amazon MSK IAM library. It then communicates with the MSK Serverless Kafka topic using IAM access control. After the cluster receives the message, it can be further processed within the MSK Serverless setup.

You can also utilize Lambda on the consumer side of MSK Serverless topics, bypassing the Java requirement on the consumer side. You can do this by setting Amazon MSK as an event source for a Lambda function. When the Lambda function is triggered, the data sent to the function includes an array of records from the Kafka topic—no need for direct contact with Amazon MSK.

Solution overview

This example walks you through how to build a serverless real-time stream producer application using API Gateway and Lambda.

For testing, this post includes a sample AWS Cloud Development Kit (AWS CDK) application. This creates a demo environment, including an MSK Serverless cluster, three Lambda functions, and an API Gateway that consumes the messages from the Kafka topic.

The following diagram shows the architecture of the resulting application including its data flows.

The data flow contains the following steps:

  1. The infrastructure is defined in an AWS CDK application. By running this application, a set of AWS CloudFormation templates is created.
  2. AWS CloudFormation creates all infrastructure components, including a Lambda function that runs during the deployment process to create a topic in the MSK Serverless cluster and to retrieve the authentication endpoint needed for the producer Lambda function. On destruction of the CloudFormation stack, the same Lambda function gets triggered again to delete the topic from the cluster.
  3. An external application calls an API Gateway endpoint.
  4. API Gateway forwards the request to a Lambda function.
  5. The Lambda function acts as a Kafka producer and pushes the message to a Kafka topic using IAM authentication.
  6. The Lambda event source mapping mechanism triggers the Lambda consumer function and forwards the message to it.
  7. The Lambda consumer function logs the data to Amazon CloudWatch.

Note that we don’t need to worry about Availability Zones. MSK Serverless automatically replicates the data across multiple Availability Zones to ensure high availability of the data.

The demo additionally shows how to use Lambda Powertools for Java to streamline logging and tracing and the IAM authenticator for the simple authentication process outlined in the introduction.

The following sections take you through the steps to deploy, test, and observe the example application.

Prerequisites

The example has the following prerequisites:

  • An AWS account. If you haven’t signed up, complete the following steps:
  • The following software installed on your development machine, or use an AWS Cloud9 environment, which comes with all requirements preinstalled:
  • Appropriate AWS credentials for interacting with resources in your AWS account.

Deploy the solution

Complete the following steps to deploy the solution:

  1. Clone the project GitHub repository and change the directory to subfolder serverless-kafka-iac:
git clone https://github.com/aws-samples/apigateway-lambda-msk-serverless-integration
cd apigateway-lambda-msk-serverless-integration/serverless-kafka-iac
  1. Configure environment variables:
export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query 'Account' --output text)
export CDK_DEFAULT_REGION=$(aws configure get region)
  1. Prepare the virtual Python environment:
python3 -m venv .venv

source .venv/bin/activate

pip3 install -r requirements.txt
  1. Bootstrap your account for AWS CDK usage:
cdk bootstrap aws://$CDK_DEFAULT_ACCOUNT/$CDK_DEFAULT_REGION
  1. Run cdk synth to build the code and test the requirements (ensure docker daemon is running on your machine):
cdk synth
  1. Run cdk deploy to deploy the code to your AWS account:
cdk deploy --all

Test the solution

To test the solution, we generate messages for the Kafka topics by sending calls through the API Gateway from our development machine or AWS Cloud9 environment. We then go to the CloudWatch console to observe incoming messages in the log files of the Lambda consumer function.

  1. Open a terminal on your development machine to test the API with the Python script provided under /serverless_kafka_iac/test_api.py:
python3 test-api.py

  1. On the Lambda console, open the Lambda function named ServerlessKafkaConsumer.

  1. On the Monitor tab, choose View CloudWatch logs to access the logs of the Lambda function.

  1. Choose the latest log stream to access the log files of the last run.

You can review the log entry of the received Kafka messages in the log of the Lambda function.

Trace a request

All components integrate with AWS X-Ray. With AWS X-Ray, you can trace the entire application, which is useful to identify bottlenecks when load testing. You can also trace method runs at the Java method level.

Lambda Powertools for Java allows you to shortcut this process by adding the @Trace annotation to a method to see traces on the method level in X-Ray.

To trace a request end to end, complete the following steps:

  1. On the CloudWatch console, choose Service map in the navigation pane.
  2. Select a component to investigate (for example, the Lambda function where you deployed the Kafka producer).
  3. Choose View traces.

  1. Choose a single Lambda method invocation and investigate further at the Java method level.

Implement a Kafka producer in Lambda

Kafka natively supports Java. To stay open, cloud native, and without third-party dependencies, the producer is written in that language. Currently, the IAM authenticator is only available to Java. In this example, the Lambda handler receives a message from an API Gateway source and pushes this message to an MSK topic called messages.

Typically, Kafka producers are long-living and pushing a message to a Kafka topic is an asynchronous process. Because Lambda is ephemeral, you must enforce a full flush of a submitted message until the Lambda function ends by calling producer.flush():

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package software.amazon.samples.kafka.lambda;
 
// This class is part of the AWS samples package and specifically deals with Kafka integration in a Lambda function.
// It serves as a simple API Gateway to Kafka Proxy, accepting requests and forwarding them to a Kafka topic.
public class SimpleApiGatewayKafkaProxy implements RequestHandler<APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent> {
 
    // Specifies the name of the Kafka topic where the messages will be sent
    public static final String TOPIC_NAME = "messages";
 
    // Logger instance for logging events of this class
    private static final Logger log = LogManager.getLogger(SimpleApiGatewayKafkaProxy.class);
    
    // Factory to create properties for Kafka Producer
    public KafkaProducerPropertiesFactory kafkaProducerProperties = new KafkaProducerPropertiesFactoryImpl();
    
    // Instance of KafkaProducer
    private KafkaProducer<String, String>[KT1]  producer;
 
    // Overridden method from the RequestHandler interface to handle incoming API Gateway proxy events
    @Override
    @Tracing
    @Logging(logEvent = true)
    public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent input, Context context) {
        
        // Creating a response object to send back 
        APIGatewayProxyResponseEvent response = createEmptyResponse();
        try {
            // Extracting the message from the request body
            String message = getMessageBody(input);
 
            // Create a Kafka producer
            KafkaProducer<String, String> producer = createProducer();
 
            // Creating a record with topic name, request ID as key and message as value 
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME, context.getAwsRequestId(), message);
 
            // Sending the record to Kafka topic and getting the metadata of the record
            Future<RecordMetadata>[KT2]  send = producer.send(record);
            producer.flush();
 
            // Retrieve metadata about the sent record
            RecordMetadata metadata = send.get();
 
            // Logging the partition where the message was sent
            log.info(String.format("Message was send to partition %s", metadata.partition()));
 
            // If the message was successfully sent, return a 200 status code
            return response.withStatusCode(200).withBody("Message successfully pushed to kafka");
        } catch (Exception e) {
            // In case of exception, log the error message and return a 500 status code
            log.error(e.getMessage(), e);
            return response.withBody(e.getMessage()).withStatusCode(500);
        }
    }
 
    // Creates a Kafka producer if it doesn't already exist
    @Tracing
    private KafkaProducer<String, String> createProducer() {
        if (producer == null) {
            log.info("Connecting to kafka cluster");
            producer = new KafkaProducer<String, String>(kafkaProducerProperties.getProducerProperties());
        }
        return producer;
    }
 
    // Extracts the message from the request body. If it's base64 encoded, it's decoded first.
    private String getMessageBody(APIGatewayProxyRequestEvent input) {
        String body = input.getBody();
 
        if (input.getIsBase64Encoded()) {
            body = decode(body);
        }
        return body;
    }
 
    // Creates an empty API Gateway proxy response event with predefined headers.
    private APIGatewayProxyResponseEvent createEmptyResponse() {
        Map<String, String> headers = new HashMap<>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Custom-Header", "application/json");
        APIGatewayProxyResponseEvent response = new APIGatewayProxyResponseEvent().withHeaders(headers);
        return response;
    }
}

Connect to Amazon MSK using IAM authentication

This post uses IAM authentication to connect to the respective Kafka cluster. For information about how to configure the producer for connectivity, refer to IAM access control.

Because you configure the cluster via IAM, grant Connect and WriteData permissions to the producer so that it can push messages to Kafka:

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {            
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”
            ],
            “Resource”: “arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid “
        }
    ]
}
 
 
{
    “Version”: “2012-10-17”,
    “Statement”: [
        {            
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”,
                “kafka-cluster: DescribeTopic”,
            ],
            “Resource”: “arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name“
        }
    ]
}

This shows the Kafka excerpt of the IAM policy, which must be applied to the Kafka producer. When using IAM authentication, be aware of the current limits of IAM Kafka authentication, which affect the number of concurrent connections and IAM requests for a producer. Refer to Amazon MSK quota and follow the recommendation for authentication backoff in the producer client:

        Map<String, String> configuration = Map.of(
                “key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “bootstrap.servers”, getBootstrapServer(),
                “security.protocol”, “SASL_SSL”,
                “sasl.mechanism”, “AWS_MSK_IAM”,
                “sasl.jaas.config”, “software.amazon.msk.auth.iam.IAMLoginModule required;”,
                “sasl.client.callback.handler.class”,
				“software.amazon.msk.auth.iam.IAMClientCallbackHandler”,
                “connections.max.idle.ms”, “60”,
                “reconnect.backoff.ms”, “1000”
        );

Additional considerations

Each MSK Serverless cluster can handle 100 requests per second. To reduce IAM authentication requests from the Kafka producer, place it outside of the handler. For frequent calls, there is a chance that Lambda reuses the previously created class instance and only reruns the handler.

For bursting workloads with a high number of concurrent API Gateway requests, this can lead to dropped messages. Although this might be tolerable for some workloads, for others this might not be the case.

In these cases, you can extend the architecture with a buffering technology like Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis Data Streams between API Gateway and Lambda.

To reduce latency, reduce cold start times for Java by changing the tiered compilation level to 1, as described in Optimizing AWS Lambda function performance for Java. Provisioned concurrency ensures that polling Lambda functions don’t need to warm up before requests arrive.

Conclusion

In this post, we showed how to create a serverless integration Lambda function between API Gateway and MSK Serverless as a way to do IAM authentication when your producer is not written in Java. You also learned about the native integration of Lambda and Amazon MSK on the consumer side. Additionally, we showed how to deploy such an integration with the AWS CDK.

The general pattern is suitable for many use cases where you want to use IAM authentication but your producers or consumers are not written in Java, but you still want to take advantage of the benefits of MSK Serverless, like its ability to scale up and down with unpredictable or spikey workloads or its little to no operational overhead of running Apache Kafka.

You can also use MSK Serverless to reduce operational complexity by automating provisioning and the management of capacity needs, including the need to constantly monitor brokers and storage.

For more serverless learning resources, visit Serverless Land.

For more information on MSK Serverless, check out the following:


About the Authors

Philipp Klose is a Global Solutions Architect at AWS based in Munich. He works with enterprise FSI customers and helps them solve business problems by architecting serverless platforms. In this free time, Philipp spends time with his family and enjoys every geek hobby possible.

Daniel Wessendorf is a Global Solutions Architect at AWS based in Munich. He works with enterprise FSI customers and is primarily specialized in machine learning and data architectures. In his free time, he enjoys swimming, hiking, skiing, and spending quality time with his family.

Marvin Gersho is a Senior Solutions Architect at AWS based in New York City. He works with a wide range of startup customers. He previously worked for many years in engineering leadership and hands-on application development, and now focuses on helping customers architect secure and scalable workloads on AWS with a minimum of operational overhead. In his free time, Marvin enjoys cycling and strategy board games.

Nathan Lichtenstein is a Senior Solutions Architect at AWS based in New York City. Primarily working with startups, he ensures his customers build smart on AWS, delivering creative solutions to their complex technical challenges. Nathan has worked in cloud and network architecture in the media, financial services, and retail spaces. Outside of work, he can often be found at a Broadway theater.

Microservice observability with Amazon OpenSearch Service part 2: Create an operational panel and incident report

Post Syndicated from Marvin Gersho original https://aws.amazon.com/blogs/big-data/microservice-observability-with-amazon-opensearch-service-part-2-create-an-operational-panel-and-incident-report/

In the first post in our series , we discussed setting up a microservice observability architecture and application troubleshooting steps using log and trace correlation with Amazon OpenSearch Service. In this post, we discuss using PPL to create visualizations in operational panels, and creating a simple incident report using notebooks.

To try out the solution yourself, start from part 1 of the series.

Microservice observability with Amazon OpenSearch Service

Piped Processing Language (PPL)

PPL is a new query language for OpenSearch. It’s simpler and more straightforward to use than query DSL (Domain Specific Language), and a better fit for DevOps than ODFE SQL. PPL handles semi-structured data and uses a sequence of commands delimited by pipes (|). For more information about PPL, refer to Using pipes to explore, discover and find data in Amazon OpenSearch Service with Piped Processing Language.

The following PPL query retrieves the same record as our search on the Discover page in our previous post. If you’re following along, use your trace ID in place of <Trace-ID>:

source = sample_app_logs | where stream = 'stderr' and locate(‘<Trace-ID>’,`log`) > 0

The query has the following components:

  • | separates commands in the statement.
  • Source=sample_app_logs means that we’re searching sample_app_logs.
  • where stream = ‘stderr’, stream is a field in sample_app_logs. We’re matching the value to stderr.
  • The locate function allows us to search for a string in a field. For our query, we search for the trace_id in the log field. The locate function returns 0 if the string is not found, otherwise the character number where it is found. We’re testing that trace_id is in the log field. This lets us find the entry that has the payment trace_id with the error.

Note that log is PPL keyword, but also a field in our log file. We put backquotes around a field name if it’s also a keyword if we need to reference it in a PPL statement.

To start using PPL, complete the following steps:

  1. On OpenSearch Dashboards, choose Observability in the navigation pane.
  2. Choose Event analytics.
  3. Choose the calendar icon, then choose the time period you want for your query (for this post, Year to date).
  4. Enter your PPL statement.

Note that results are shown in table format by default, but you can also choose to view them in JSON format.

Monitor your services using visualizations

We can use the PPL on the Event analytics page to create real-time visualizations. We now use these visualizations to create a dashboard for real-time monitoring of our microservices on the Operational panels page.

Event analytics has two modes: events and visualizations. With events, we’re looking at the query results as a table or JSON. With visualizations, the results are shown as a graph. For this post, we create a PPL query that monitors a value over time, and see the results in a graph. We can then save the graph to use in our dashboard. See the following code:

source = sample_app_logs | where stream = 'stderr' and locate('payment',`log`) > 0 | stats count() by span(time, 5m)

This code is similar to the PPL we used earlier, with two key differences:

  • We specify the name of our service in the log field (for this post, payment).
  • We use the aggregation function stats count() by span(time, 5m). We take the count of matches in the log field and aggregate by 5-minute intervals.

The following screenshot shows the visualization.

OpenSearch Service offers a choice of several different visualizations, such as line, bar, and pie charts.

We now save the results as a visualization, giving it the name Payment Service Errors.

We want to create and save a visualization for each of the five services. To create a new visualization, choose Add new, then modify the query by changing the service name.

We save this one and repeat the process by choosing Add new again for each of the five micro-services. Each microservice is now available on its own tab.

Create an operational panel

Operational panels in OpenSearch Dashboards are collections of visualizations created using PPL queries. Now that we have created the visualizations in the Event analytics dashboard, we can create a new operational panel.

  1. On the Operational panel page, choose Create panel.
  2. For Name, enter e-Commerce Error Monitoring.
  3. Open that panel and choose Add Visualization.
  4. Choose Payment Service Errors.

The following screenshot shows our visualization.

We now repeat the process for our other four services. However, the layout isn’t good. The graphs are too big, and laid out vertically, so they can’t all be seen at once.

We can choose Edit to adjust the size of each visualization and move them around. We end up with the layout in the following screenshot.

We can now monitor errors over time for all of our services. Notice that the y axis of each service visualization adjusts based on the error count.

This will be a useful tool for monitoring our services in the future.

Next, we create an incident report on the error that we found.

Create an OpenSearch incident report

The e-Commerce Error Monitoring panel can help us monitor our application in the future. However, we want to send out an incident report to our developers about our current findings. We do this by using OpenSearch PPL and Notebooks features introduced in OpenSearch Service 1.3 to create an incident report. A notebook can be downloaded as a PDF. An incident report is useful to share our findings with others.

First, we need to create a new notebook.

  1. Under Observability in the navigation pane, choose Notebooks.
  2. Choose Create notebook.
  3. For Name, enter e-Commerce Error Report.
  4. Choose Create.

    The following screenshot shows our new notebook page.

    A notebook consists of code blocks: narrative, PPL, and SQL, and visualizations created on the Event analytics page with PPL.
  5. Choose Add code block.
    We can now write a new code block.

    We can use %md, %sql, or %ppl to add code. In this first block, we just enter text.
  6. Use %md to add narrative text.
  7. Choose Run to see the output.

    The following screenshot shows our code block.

    Now we want to add our PPL query to show the error we found earlier.
  8. On the Add paragraph menu, choose Code block.
  9. Enter our PPL query, then choose Run.

    The following screenshot shows our output.

    Let’s drill down on the log field to get details of the error.
    We could have many narrative and code blocks, as well as visualizations of PPL queries. Let’s add a visualization.
  10. On the Add paragraph menu, choose Visualization.
  11. Choose Payment Service Errors to view the report we created earlier.

    This visualization shows a pattern of payment service errors this afternoon. Note that we chose a date range because we’re focusing on today’s errors to communicate with the development team.

    Notebook visualizations can be refreshed to provide updated information. The following screenshot shows our visualization an hour later.
    We’re now going to take our completed notebook and export it as a PDF report to share with other teams.
  12. Choose Output only to make the view cleaner to share.
  13. On the Reporting actions menu, choose Download PDF.

We can send this PDF report to the developers supporting the payment service.

Summary

In this post, we used OpenSearch Service v1.3 to create a dashboard to monitor errors in our microservices application. We then created a notebook to use a PPL query on a specific trace ID for a payment service error to provide details, and a graph of payment service errors to visualize the pattern of errors. Finally, we saved our notebook as a PDF to share with the payment service development team. If you would like to explore these features further check out the latest Amazon OpenSearch Observability documentation or, for open source, OpenSearch Observability latest open source documentation. You can also contact your AWS Solutions Architects, who can be of assistance alongside your innovation journey.


About the Authors

Marvin Gersho is a Senior Solutions Architect at AWS based in New York City. He works with a wide range of startup customers. He previously worked for many years in engineering leadership and hands-on application development, and now focuses on helping customers architect secure and scalable workloads on AWS with a minimum of operational overhead. In his free time, Marvin enjoys cycling and strategy board games.

Subham Rakshit is a Streaming Specialist Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build search and streaming data platforms that help them achieve their business objective. Outside of work, he enjoys spending time solving jigsaw puzzles with his daughter.

Rafael Gumiero is a Senior Analytics Specialist Solutions Architect at AWS. An open-source and distributed systems enthusiast, he provides guidance to customers who develop their solutions with AWS Analytics services, helping them optimize the value of their solutions.