Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

Developer guidance on how to do local testing with Amazon MSK Serverless

Post Syndicated from Simon Peyer original https://aws.amazon.com/blogs/big-data/developer-guidance-on-how-to-do-local-testing-with-amazon-msk-serverless/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that makes it easy to build and run Kafka clusters on Amazon Web Services (AWS). When working with Amazon MSK, developers are interested in accessing the service locally. This allows developers to test their application with a Kafka cluster that has the same configuration as production and provides an identical infrastructure to the actual environment without needing to run Kafka locally.

An Amazon MSK Serverless private DNS endpoint is only accessible from Amazon Virtual Private Cloud (Amazon VPC) connections that have been configured to connect. It isn’t directly resolvable from your local development environment. One option is to use AWS Direct Connect or AWS VPN to be able to Connect to Amazon MSK Serverless from your on-premises network. However, building such a solution may incur cost and complexity, and it needs to be set up by a platform team.

This post presents a practical approach to accessing your Amazon MSK environment for development purposes through a bastion host using a Secure Shell (SSH) tunnel (a commonly used secure connection method). Whether you’re working with Amazon MSK Serverless, where public access is unavailable, or with provisioned MSK clusters that are intentionally kept private, this post guides you through the steps to establish a secure connection and seamlessly integrate your local development environment with your MSK resources.

Solution overview

The solution allows you to directly connect to the Amazon MSK Serverless service from your local development environment without using Direct Connect or a VPN. The service is accessed with the bootstrap server DNS endpoint boot-<<xxxxxx>>.c<<x>>.kafka-serverless.<<region-name>>.amazonaws.com on port 9098, then routed through an SSH tunnel to a bastion host, which connects to the MSK Serverless cluster. In the next step, let’s explore how to set up this connection.

The flow of the solution is as follows:

  1. The Kafka client sends a request to connect to the bootstrap server
  2. The DNS query for your MSK Serverless endpoint is routed to a locally configured DNS server
  3. The locally configured DNS server routes the DNS query to localhost.
  4. The SSH tunnel forwards all the traffic on port 9098 from the localhost to the MSK Serverless server through the Amazon Elastic Compute Cloud (Amazon EC2) bastion host.

The following image shows the architecture diagram.

Architecture Diagram for accessing Serverless MSK from local

Prerequisites

Before deploying the solution, you need to have the following resources deployed in your account:

  1. An MSK Serverless cluster configured with AWS Identity and Access Management (IAM) authentication.
  2. A bastion host instance with network access to the MSK Serverless cluster and SSH public key authentication.
  3. AWS CLI configured with an IAM user and able to read and create topics on Amazon MSK. Use the IAM policy from Step 2: Create an IAM role in the Getting started using MSK Serverless clusters
  4. For Windows users, install Linux on Windows with Windows Subsystem for Linux 2 (WSL 2) using Ubuntu 24.04. For guidance, refer to How to install Linux on Windows with WSL.

This guide assumes an MSK Serverless deployment in us-east-1, but it can be used in every AWS Region where MSK Serverless is available. Furthermore, we are using OS X as operating system. In the following steps replace msk-endpoint-url with your MSK Serverless endpoint URL with IAM authentication. The MSK endpoint URL has a format like boot-<<xxxxxx>>.c<<x>>.kafka-serverless.<<region-name>>.amazonaws.com.

Solution walkthrough

To access your Amazon MSK environment for development purposes, use the following walkthrough.

Configure local DNS server OSX

Install Dnsmasq as a local DNS server and configure the resolver to resolve the Amazon MSK. The solution uses Dnsmasq because it can compare DNS requests against a database of patterns and use these to determine the correct response. This functionality can match any request that ends in kafka-serverless.us-east-1.amazonaws.com and send 127.0.0.1 in response. Follow these steps to install Dnsmasq:

  1. Update brew and install Dnsmasq using brew
    brew up
    brew install dnsmasq

  2. Start the Dnsmasq service
    sudo brew services start dnsmasq

  3. Reroute all traffic for Serverless MSK (kafka-serverless.us-east-1.amazonaws.com) to 127.0.0.1
    echo address=/kafka-serverless.us-east-1.amazonaws.com/127.0.0.1 >> $(brew --prefix)/etc/dnsmasq.conf

  4. Reload Dnsmasq configuration and clear cache
    sudo launchctl unload /Library/LaunchDaemons/homebrew.mxcl.dnsmasq.plist
    sudo launchctl load /Library/LaunchDaemons/homebrew.mxcl.dnsmasq.plist
    dscacheutil -flushcache

Configure OS X resolver

Now that you have a working DNS server, you can configure your operating system to use it. Configure the server to send only .kafka-serverless.us-east-1.amazonaws.com queries to Dnsmasq. Most operating systems that are similar to UNIX have a configuration file called /etc/resolv.conf that controls the way DNS queries are performed, including the default server to use for DNS queries. Use the following steps to configure the OS X resolver:

  1. OS X also allows you to configure additional resolvers by creating configuration files in the /etc/resolver/ This directory probably won’t exist on your system, so your first step should be to create it:
    sudo mkdir -p /etc/resolver

  2. Create a new file with the same name as your new top-level domain (kafka-serverless.us-east-1.amazonaws.com) in the /etc/resolver/ directory and add 127.0.0.1 as a nameserver to it by entering the following command.
    sudo tee /etc/resolver/kafka-serverless.us-east-1.amazonaws.com >/dev/null <<EOF
    nameserver 127.0.0.1
    EOF

Configure local DNS server Windows

In Windows Subsystem for Linux, first install Dnsmasq, then configure the resolver to resolve the Amazon MSK and finally add localhost as the first nameserver.

  1. Update apt and install Dnsmasq using apt. Install the telnet utility for later tests:
    sudo apt update
    sudo apt install dnsmasq
    sudo apt install telnet

  2. Reroute all traffic for Serverless MSK (kafka-serverless.us-east-1.amazonaws.com) to 127.0.0.1.
    echo "address=/kafka-serverless.us-east-1.amazonaws.com/127.0.0.1" | sudo tee -a /etc/dnsmasq.conf

  3. Reload Dnsmasq configuration and clear cache.
    sudo /etc/init.d/dnsmasq restart

  4. Open /etc/resolv.conf and add the following code in the first line.
    nameserver 127.0.0.1

    The output should look like the following code.

    #Some comments
    nameserver 127.0.0.1
    nameserver <<your_nameservers>>
    ..

Create SSH tunnel

The next step is to create the SSH tunnel, which will allow any connections made to localhost:9098 on your local machine to be forwarded over the SSH tunnel to the target Kafka broker. Use the following steps to create the SSH tunnel:

  1. Replace bastion-host-dns-endpoint with the public DNS endpoint of the bastion host, which comes in the style of <<xyz>>.compute-1.amazonaws.com, and replace ec2-key-pair.pem with the key pair of the bastion host. Then create the SSH tunnel by entering the following command.
    ssh -i "~/<<ec2-key-pair.pem>>" ec2-user@<<bastion-host-dns-endpoint>> -L 127.0.0.1:9098:<<msk-endpoint-url>>:9098

  2. Leave the SSH tunnel running and open a new terminal window.
  3. Test the connection to the Amazon MSK server by entering the following command.
    telnet <<msk-endpoint-url>> 9098

    The output should look like the following example.

    Trying 127.0.0.1...
    Connected to boot-<<xxxxxxxx>>.c<<x>>.kafka-serverless.us-east-1.amazonaws.com.
    Escape character is '^]'.

Testing

Now configure the Kafka client to use IAM Authentication and then test the setup. You find the latest Kafka installation at the Apache Kafka Download site. Then unzip and copy the content of the Dafka folder into ~/kafka.

  1. Download the IAM authentication and unpack it
    cd ~/kafka/libs
    wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar
    cd ~

  2. Configure Kafka properties to use IAM as the authentication mechanism
    cat <<EOF > ~/kafka/config/client-config.properties
    
    # Sets up TLS for encryption and SASL for authN.
    
    security.protocol = SASL_SSL
    
    # Identifies the SASL mechanism to use.
    
    sasl.mechanism = AWS_MSK_IAM
    
    # Binds SASL client implementation.
    
    sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
    
    
    # Encapsulates constructing a SigV4 signature based on extracted credentials.
    
    # The SASL client bound by "sasl.jaas.config" invokes this class.
    
    sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    EOF

  3. Enter the following command in ~/kafka/bin to create an example topic. Make sure that the SSH tunnel created in the previous section is still open and running.
    ./kafka-topics.sh --bootstrap-server <<msk-endpoint-url>>:9098 --command-config ~/kafka/config/client-config.properties --create --topic ExampleTopic --partitions 10 --replication-factor 3 --config retention.ms=3600000

Cleanup

To remove the solution, complete the following steps for Mac users:

  1. Delete the file /etc/resolver/kafka-serverless.us-east-1.amazonaws.com
  2. Delete the entry address=/kafka-serverless.us-east-1.amazonaws.com/127.0.0.1 in the file $(brew --prefix)/etc/dnsmasq.conf
  3. Stop the Dnsmasq service sudo brew services stop dnsmasq
  4. Remove the Dnsmasq service sudo brew uninstall dnsmasq

To remove the solution, complete the following steps for WSL users:

  1. Delete the file /etc/dnsmasq.conf
  2. Delete the entry nameserver 127.0.0.1 in the file /etc/resolv.conf
  3. Remove the Dnsmasq service sudo apt remove dnsmasq
  4. Remove the telnet utility sudo apt remove telnet

Conclusion

In this post, I presented you with guidance on how developers can connect to Amazon MSK Serverless from local environments. The connection is done using an Amazon MSK endpoint through an SSH tunnel and a bastion host. This enables developers to experiment and test locally, without needing to setup a separate Kafka cluster.


About the Author

Simon Peyer is a Solutions Architect at Amazon Web Services (AWS) based in Switzerland. He is a practical doer and passionate about connecting technology and people using AWS Cloud services. A special focus for him is data streaming and automations. Besides work, Simon enjoys his family, the outdoors, and hiking in the mountains.

Publish and enrich real-time financial data feeds using Amazon MSK and Amazon Managed Service for Apache Flink

Post Syndicated from Rana Dutt original https://aws.amazon.com/blogs/big-data/publish-and-enrich-real-time-financial-data-feeds-using-amazon-msk-and-amazon-managed-service-for-apache-flink/

Financial data feeds are real-time streams of stock quotes, commodity prices, options trades, or other real-time financial data. Companies involved with capital markets such as hedge funds, investment banks, and brokerages use these feeds to inform investment decisions.

Financial data feed providers are increasingly being asked by their customers to deliver the feed directly to them through the AWS Cloud. That’s because their customers already have infrastructure on AWS to store and process the data and want to consume it with minimal effort and latency. In addition, the AWS Cloud’s cost-effectiveness enables even small and mid-size companies to become financial data providers. They can deliver and monetize data feeds that they have enriched with their own valuable information.

An enriched data feed can combine data from multiple sources, including financial news feeds, to add information such as stock splits, corporate mergers, volume alerts, and moving average crossovers to a basic feed.

In this post, we demonstrate how you can publish an enriched real-time data feed on AWS using Amazon Managed Streaming for Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink. You can apply this architecture pattern to various use cases within the capital markets industry; we discuss some of those use cases in this post.

Apache Kafka is a high-throughput, low-latency distributed event streaming platform. Financial exchanges such as Nasdaq and NYSE are increasingly turning to Kafka to deliver their data feeds because of its exceptional capabilities in handling high-volume, high-velocity data streams.

Amazon MSK is a fully managed service that makes it easy for you to build and run applications on AWS that use Kafka to process streaming data.

Apache Flink is an opensource distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing, event time semantics, checkpointing, snapshots and rollback. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink is a fully managed, serverless experience in running Apache Flink applications. Customers can easily build real time Flink applications using any of Flink’s languages and APIs.

In this post, we use a real-time stock quotes feed from financial data provider Alpaca and add an indicator when the price moves above or below a certain threshold. The code provided in the GitHub repo allows you to deploy the solution to your AWS account. This solution was built by AWS Partner NETSOL Technologies.

Solution overview

In this solution, we deploy an Apache Flink application that enriches the raw data feed, an MSK cluster that contains the messages streams for both the raw and enriched feeds, and an Amazon OpenSearch Service cluster that acts as a persistent data store for querying the data. In a separate virtual private cloud (VPC) that acts as the customer’s VPC, we also deploy an Amazon EC2 instance running a Kafka client that consumes the enriched data feed. The following diagram illustrates this architecture.

Solution Architecture
Figure 1 – Solution architecture

The following is a step-by-step breakdown of the solution:

  1. The EC2 instance in your VPC is running a Python application that fetches stock quotes from your data provider through an API. In this case, we use Alpaca’s API.
  2. The application sends these quotes using Kafka client library to your kafka topic on MSK cluster. The kafka topic stores the raw quotes.
  3. The Apache Flink application takes the Kafka message stream and enriches it by adding an indicator whenever the stock price rises or declines 5% or more from the previous business day’s closing price.
  4. The Apache Flink application then sends the enriched data to a separate Kafka topic on your MSK cluster.
  5. The Apache Flink application also sends the enriched data stream to Amazon OpenSearch using a Flink connector for OpenSearch. Amazon Opensearch stores the data, and OpenSearch Dashboards allows applications to query the data at any point in the future.
  6. Your customer is running a Kafka consumer application on an EC2 instance in a separate VPC in their own AWS account. This application uses AWS PrivateLink to consume the enriched data feed securely, in real time.
  7. All Kafka user names and passwords are encrypted and stored in AWS Secrets Manager. The SASL/SCRAM authentication protocol used here makes sure all data to and from the MSK cluster is encrypted in transit. Amazon MSK encrypts all data at rest in the MSK cluster by default.

The deployment process consists of the following high-level steps:

  1. Launch the Amazon MSK cluster, Apache Flink application, Amazon OpenSearch Service domain, and Kafka producer EC2 instance in the producer AWS account. This step usually completes within 45 minutes.
  2. Set up multi-VPC connectivity and SASL/SCRAM authentication for the MSK cluster. This step can take up to 30 minutes.
  3. Launch the VPC and Kafka consumer EC2 instance in the consumer account. This step takes about 10 minutes.

Prerequisites

To deploy this solution, complete the following prerequisite steps:

  1. Create an AWS account if you don’t already have one and log in. We refer to this as the producer account.
  2. Create an AWS Identity and Access Management (IAM) user with full admin permissions. For instructions, refer to Create an IAM user.
  3. Sign out and sign back in to the AWS Management Console as this IAM admin user.
  4. Create an EC2 key pair named my-ec2-keypair in the producer account. If you already have an EC2 key pair, you can skip this step.
  5. Follow the instructions in ALPACA_README to sign up for a free Basic account at Alpaca to get your Alpaca API key and secret key. Alpaca will provide the real-time stock quotes for our input data feed.
  6. Install the AWS Command Line Interface (AWS CLI) on your local development machine and create a profile for the admin user. For instructions, see Set up the AWS Command Line Interface (AWS CLI).
  7. Install the latest version of the AWS Cloud Development Kit (AWS CDK) globally:
 npm install -g aws-cdk@latest

Deploy the Amazon MSK cluster

These steps create a new provider VPC and launch the Amazon MSK cluster there. You also deploy the Apache Flink application and launch a new EC2 instance to run the application that fetches the raw stock quotes.

  1. On your development machine, clone the GitHub repo and install the Python packages:
    git clone https://github.com/aws-samples/msk-powered-financial-data-feed.git
    cd msk-powered-financial-data-feed
    pip install -r requirements.txt

  2. Set the following environment variables to specify your producer AWS account number and AWS Region:
    export CDK_DEFAULT_ACCOUNT={your_AWS_account_no}
    export CDK_DEFAULT_REGION=us-east-1

  3. Run the following commands to create your config.py file:
    echo "mskCrossAccountId = <Your producer AWS account ID>" > config.py
    echo "producerEc2KeyPairName = '' " >> config.py
    echo "consumerEc2KeyPairName = '' " >> config.py
    echo "mskConsumerPwdParamStoreValue= '' " >> config.py
    echo "mskClusterArn = '' " >> config.py

  4. Run the following commands to create your alpaca.conf file:
    echo [alpaca] > dataFeedMsk/alpaca.conf
    echo ALPACA_API_KEY=your_api_key >> dataFeedMsk/alpaca.conf
    echo ALPACA_SECRET_KEY=your_secret_key >> dataFeedMsk/alpaca.conf

  5. Edit the alpaca.conf file and replace your_api_key and your_secret_key with your Alpaca API key.
  6. Bootstrap the environment for the producer account:
    cdk bootstrap aws://{your_AWS_account_no}/{your_aws_region}

  7. Using your editor or integrated development environment (IDE), edit the config.py file:
    1. Update the mskCrossAccountId parameter with your AWS producer account number.
    2. If you have an existing EC2 key pair, update the producerEc2KeyPairName parameter with the name of your key pair.
  8. View the dataFeedMsk/parameters.py file:
    1. If you are deploying in a Region other than us-east-1, update the Availability Zone IDs az1 and az2 accordingly. For example, the Availability Zones for us-west-2 would us-west-2a and us-west-2b.
    2. Make sure that the enableSaslScramClientAuth, enableClusterConfig, and enableClusterPolicy parameters in the parameters.py file are set to False.
  9. Make sure you are in the directory where the app1.py file is located. Then deploy as follows:
    cdk deploy --all --app "python app1.py" --profile {your_profile_name}

  10. Check that you now have an Amazon Simple Storage Service (Amazon S3) bucket whose name starts with awsblog-dev-artifacts containing a folder with some Python scripts and the Apache Flink application JAR file.

Deploy multi-VPC connectivity and SASL/SCRAM

Complete the following steps to deploy multi-VPC connectivity and SASL/SCRAM authentication for the MSK cluster:

  1. Set the enableSaslScramClientAuth, enableClusterConfig, and enableClusterPolicy parameters in the config.py file to True.
  2. Make sure you’re in the directory where the config.py file is located and deploy the multi-VPC connectivity and SASL/SCRAM authentication for the MSK cluster:

cdk deploy --all --app "python app1.py" --profile {your_profile_name}

This step can take up to 30 minutes.

  1. To check the results, navigate to your MSK cluster on the Amazon MSK console, and choose the Properties

You should see PrivateLink turned on, and SASL/SCRAM as the authentication type.

BDB-3696-multiVPC

  1. Copy the MSK cluster ARN.
  2. Edit your config.py file and enter the ARN as the value for the mskClusterArn parameter, then save the updated file.

Deploy the data feed consumer

Complete the steps in this section to create an EC2 instance in a new consumer account to run the Kafka consumer application. The application will connect to the MSK cluster through PrivateLink and SASL/SCRAM.

  1. Navigate to Parameter Store, a capability of AWS Systems Manager, in your producer account.
  2. Copy the value of the blogAws-dev-mskConsumerPwd-ssmParamStore parameter and update the mskConsumerPwdParamStoreValue parameter in the config.py file.
  3. Check the value of the parameter named blogAws-dev-getAzIdsParamStore and make a note of these two values.
  4. Create another AWS account for the Kafka consumer if you don’t already have one, and log in.
  5. Create an IAM user with admin permissions.
  6. Log out and log back in to the console using this IAM admin user.
  7. Make sure you are in the same Region as the Region you used in the producer account. Then create a new EC2 key pair named, for example, my-ec2-consumer-keypair, in this consumer account.
  8. Update the value of consumerEc2KeyPairName in your config.py file with the name of the key pair you just created.
  9. Open the AWS Resource Access Manager (AWS RAM) console in your consumer account.
  10. Compare the Availability Zone IDs from the Systems Manager parameter store with the Availability Zone IDs shown on the AWS RAM console.
  11. Identify the corresponding Availability Zone names for the matching Availability Zone IDs.
  12. Open the parameters.py file in the dataFeedMsk folder and insert these Availability Zone names into the variables crossAccountAz1 and crossAccountAz2. For example, in Parameter Store, if the values are “use1-az4” and “use1-az6”, then, when you switch to the consumer account’s AWS RAM console and compare, you may find that these values correspond to the Availability Zone names “us-east-1a” and “us-east-1b”. In that case, you need to update the parameters.py file with these Availability Zone names by setting crossAccountAz1 to “us-east-1a” and crossAccountAz2 to “us-east-1b”.
  13. Set the following environment variables, specifying your consumer AWS account ID:
export CDK_DEFAULT_ACCOUNT={your_aws_account_id}
export CDK_DEFAULT_REGION=us-east-1
  1. Bootstrap the consumer account environment. You need to add specific policies to the AWS CDK role in this case.
    cdk bootstrap aws://{your_aws_account_id}/{your_aws_region} --cloudformation-execution-policies "arn:aws:iam::aws:policy/AmazonMSKFullAccess,arn:aws:iam::aws:policy/AdministratorAccess" –-profile <your-user-profile>

You now need to grant the consumer account access to the MSK cluster.

  1. On the console, copy the consumer AWS account number to your clipboard.
  2. Sign out and sign back in to your producer AWS account.
  3. On the Amazon MSK console, navigate to your MSK cluster.
  4. Choose Properties and scroll down to Security settings.
  5. Choose Edit cluster policy and add the consumer account root to the Principal section as follows, then save the changes:
    "Principal": {
        "AWS": ["arn:aws:iam::<producer-acct-no>:root", "arn:aws:iam::<consumer-acct-no>:root"]
    },
    

  6. Create the IAM role that needs to be attached to the EC2 consumer instance:
    aws iam create-role --role-name awsblog-dev-app-consumerEc2Role --assume-role-policy-document file://dataFeedMsk/ec2ConsumerPolicy.json --profile <your-user-profile>

  7. Deploy the consumer account infrastructure, including the VPC, consumer EC2 instance, security groups, and connectivity to the MSK cluster:
    cdk deploy --all --app "python app2.py" --profile {your_profile_name}

Run the applications and view the data

Now that we have the infrastructure up, we can produce a raw stock quotes feed from the producer EC2 instance to the MSK cluster, enrich it using the Apache Flink application, and consume the enriched feed from the consumer application through PrivateLink. For this post, we use the Flink DataStream Java API for the stock data feed processing and enrichment. We also use Flink aggregations and windowing capabilities to identify insights in a certain time window.

Run the managed Flink application

Complete the following steps to run the managed Flink application:

  1. In your producer account, open the Amazon Managed Service for Apache Flink console and navigate to your application.
  2. To run the application, choose Run, select Run with latest snapshot, and choose Run.
    BDB-3696-FlinkJobRun
  3. When the application changes to the Running state, choose Open Apache Flink dashboard.

You should see your application under Running Jobs.

BDB-3696-FlinkDashboard

Run the Kafka producer application

Complete the following steps to run the Kafka producer application:

  1. On the Amazon EC2 console, locate the IP address of the producer EC2 instance named awsblog-dev-app-kafkaProducerEC2Instance.
  2. Connect to the instance using SSH and run the following commands:
    sudo su
    cd environment
    source alpaca-script/bin/activate
    python3 ec2-script-live.py AMZN NVDA

You need to start the script during market open hours. This will run the script that creates a connection to the Alpaca API. You should see lines of output showing that it is making the connection and subscribing to the given ticker symbols.

View the enriched data feed in OpenSearch Dashboards

Complete the following steps to create an index pattern to view the enriched data in your OpenSearch dashboard:

  1. To find the master user name for OpenSearch, open the config.py file and locate the value assigned to the openSearchMasterUsername parameter.
  2. Open Secrets Manager and click on awsblog-dev-app-openSearchSecrets secret to retrieve the password for OpenSearch.
  3. Navigate to your OpenSearch console and find the URL to your OpenSearch dashboard by clicking on the domain name for your OpenSearch cluster. Click on the URL and sign in using your master user name and password.
  4. In the OpenSearch navigation bar on the left, select Dashboards Management under the Management section.
  5. Choose Index patterns, then choose Create index pattern.
  6. Enter amzn* in the Index pattern name field to match the AMZN ticker, then choose Next step.
    BDB-3696-Opensearch
  7. Select timestamp under Time field and choose Create index pattern.
  8. Choose Discover in the OpenSearch Dashboards navigation pane.
  9. With amzn selected on the index pattern dropdown, select the fields to view the enriched quotes data.

The indicator field has been added to the raw data by Amazon Managed Service for Apache Flink to indicate whether the current price direction is neutral, bullish, or bearish.

Run the Kafka consumer application

To run the consumer application to consume the data feed, you first need to get the multi-VPC brokers URL for the MSK cluster in the producer account.

  1. On the Amazon MSK console, navigate to your MSK cluster and choose View client information.
  2. Copy the value of the Private endpoint (multi-VPC).
  3. SSH to your consumer EC2 instance and run the following commands:
    sudo su
    alias kafka-consumer=/kafka_2.13-3.5.1/bin/kafka-console-consumer.sh
    kafka-consumer --bootstrap-server {$MULTI_VPC_BROKER_URL} --topic amznenhanced --from-beginning --consumer.config ./customer_sasl.properties
    

You should then see lines of output for the enriched data feed like the following:

{"symbol":"AMZN","close":194.64,"open":194.58,"low":194.58,"high":194.64,"volume":255.0,"timestamp":"2024-07-11 19:49:00","%change":-0.8784661217630548,"indicator":"Neutral"}
{"symbol":"AMZN","close":194.77,"open":194.615,"low":194.59,"high":194.78,"volume":1362.0,"timestamp":"2024-07-11 19:50:00","%change":-0.8122628778040887,"indicator":"Neutral"}
{"symbol":"AMZN","close":194.82,"open":194.79,"low":194.77,"high":194.82,"volume":1143.0,"timestamp":"2024-07-11 19:51:00","%change":-0.7868000916660381,"indicator":"Neutral"}

In the output above, no significant changes are happening to the stock prices, so the indicator shows “Neutral”. The Flink application determines the appropriate sentiment based on the stock price movement.

Additional financial services use cases

In this post, we demonstrated how to build a solution that enriches a raw stock quotes feed and identifies stock movement patterns using Amazon MSK and Amazon Managed Service for Apache Flink. Amazon Managed Service for Apache Flink offers various features such as snapshot, checkpointing, and a recently launched Rollback API. These features allow you to build resilient real-time streaming applications.

You can apply this approach to a variety of other use cases in the capital markets domain. In this section, we discuss other cases in which you can use the same architectural patterns.

Real-time data visualization

Using real-time feeds to create charts of stocks is the most common use case for real-time market data in the cloud. You can ingest raw stock prices from data providers or exchanges into an MSK topic and use Amazon Managed Service for Apache Flink to display the high price, low price, and volume over a period of time. This is known as aggregates and is the foundation for displaying candlestick bar graphs. You can also use Flink to determine stock price ranges over time.

BDB-3696-real-time-dv

Stock implied volatility

Implied volatility (IV) is a measure of the market’s expectation of how much a stock’s price is likely to fluctuate in the future. IV is forward-looking and derived from the current market price of an option. It is also used to price new options contracts and is sometimes referred to as the stock market’s fear gauge because it tends to spike higher during market stress or uncertainty. With Amazon Managed Service for Apache Flink, you can consume data from a securities feed that will provide current stock prices and combine this with an options feed that provides contract values and strike prices to calculate the implied volatility.

Technical indicator engine

Technical indicators are used to analyze stock price and volume behavior, provide trading signals, and identify market opportunities, which can help in the decision-making process of trading. Although implied volatility is a technical indicator, there are many other indicators. There can be simple indicators such as “Simple Moving Average” that represent a measure of trend in a specific stock price based on the average of price over a period of time. There are also more complex indicators such as Relative Strength Index (RSI) that measures the momentum of a stock’s price movement. RSI is a mathematical formula that uses the exponential moving average of upward movements and downward movements.

Market alert engine

Graphs and technical indicators aren’t the only tools that you can use to make investment decisions. Alternative data sources are important, such as ticker symbol changes, stock splits, dividend payments, and others. Investors also act on recent news about the company, its competitors, employees, and other potential company-related information. You can use the compute capacity provided by Amazon Managed Service for Apache Flink to ingest, filter, transform, and correlate the different data sources to the stock prices and create an alert engine that can recommend investment actions based on these alternate data sources. Examples can range from invoking an action if dividend prices increase or decrease to using generative artificial intelligence (AI) to summarize several correlated news items from different sources into a single alert about an event.

Market surveillance

Market surveillance is the monitoring and investigation of unfair or illegal trading practices in the stock markets to maintain fair and orderly markets. Both private companies and government agencies conduct market surveillance to uphold rules and protect investors.

You can use Amazon Managed Service for Apache Flink streaming analytics as a powerful surveillance tool. Streaming analytics can detect even subtle instances of market manipulation in real time. By integrating market data feeds with external data sources, such as company merger announcements, news feeds, and social media, streaming analytics can quickly identify potential attempts at market manipulation. This allows regulators to be alerted in real time, enabling them to take prompt action even before the manipulation can fully unfold.

Markets risk management

In fast-paced capital markets, end-of-day risk measurement is insufficient. Firms need real-time risk monitoring to stay competitive. Financial institutions can use Amazon Managed Service for Apache Flink to compute intraday value-at-risk (VaR) in real time. By ingesting market data and portfolio changes, Amazon Managed Service for Apache Flink provides a low-latency, high-performance solution for continuous VaR calculations.

This allows financial institutions to proactively manage risk by quickly identifying and mitigating intraday exposures, rather than reacting to past events. The ability to stream risk analytics empowers firms to optimize portfolios and stay resilient in volatile markets.

Clean up

It’s always a good practice to clean up all the resources you created as part of this post to avoid any additional cost. To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stacks from the consumer account.
  2. Delete the CloudFormation stacks from the provider account.

Conclusion

In this post, we showed you how to provide a real-time financial data feed that can be consumed by your customers using Amazon MSK and Amazon Managed Service for Apache Flink. We used Amazon Managed Service for Apache Flink to enrich a raw data feed and deliver it to Amazon OpenSearch. Using this solution as a template, you can aggregate multiple source feeds, use Flink to calculate in real time any technical indicator, display data and volatility, or create an alert engine. You can add value for your customers by inserting additional financial information within your feed in real time.

We hope you found this post helpful and encourage you to try out this solution to solve interesting financial industry challenges.


About the Authors

Rana Dutt is a Principal Solutions Architect at Amazon Web Services. He has a background in architecting scalable software platforms for financial services, healthcare, and telecom companies, and is passionate about helping customers build on AWS.

Amar Surjit is a Senior Solutions Architect at Amazon Web Services (AWS), where he specializes in data analytics and streaming services. He advises AWS customers on architectural best practices, helping them design reliable, secure, efficient, and cost-effective real-time analytics data systems. Amar works closely with customers to create innovative cloud-based solutions that address their unique business challenges and accelerate their transformation journeys.

Diego Soares is a Principal Solutions Architect at AWS with over 20 years of experience in the IT industry. He has a background in infrastructure, security, and networking. Prior to joining AWS in 2021, Diego worked for Cisco, supporting financial services customers for over 15 years. He works with large financial institutions to help them achieve their business goals with AWS. Diego is passionate about how technology solves business challenges and provides beneficial outcomes by developing complex solution architectures.

AWS Glue mutual TLS authentication for Amazon MSK

Post Syndicated from Edward Ondari original https://aws.amazon.com/blogs/big-data/aws-glue-mutual-tls-authentication-for-amazon-msk/

In today’s landscape, data streams continuously from countless sources such as social media interactions to Internet of Things (IoT) device readings. This torrent of real-time information presents both a challenge and an opportunity for businesses. To harness the power of this data effectively, organizations need robust systems for ingesting, processing, and analyzing streaming data at scale. Enter Apache Kafka: a distributed streaming platform that has revolutionized how companies handle real-time data pipelines and build responsive, event-driven applications. AWS Glue is used to process and analyze large volumes of real-time data and perform complex transformations on the streaming data from Apache Kafka.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed Apache Kafka service. You can activate a combination of authentication modes on new or existing MSK clusters. The supported authentication modes are AWS Identity and Access Management (IAM) access control, mutual Transport Layer Security (TLS), and Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM). For more information about using IAM authentication, refer to Securely process near-real-time data from Amazon MSK Serverless using an AWS Glue streaming ETL job with IAM authentication.

Mutual TLS authentication requires both the server and the client to present certificates to prove their identity. It’s ideal for hybrid applications that need a common authentication model. It’s also a commonly used authentication mechanism for business-to-business applications and is used in standards such as open banking, which enables secure open API integrations for financial institutions. For Amazon MSK, AWS Private Certificate Authority (AWS Private CA) is used to issue the X.509 certificates and for authenticating clients.

This post describes how to set up AWS Glue jobs to produce, consume, and process messages on an MSK cluster using mutual TLS authentication. AWS Glue will automatically infer the schema from the streaming data and store the metadata in the AWS Glue Data Catalog for analysis using analytics tools such as Amazon Athena.

Example use case

In our example use case, a hospital facility regularly monitors the body temperatures for patients admitted in the emergency ward using smart thermometers. Each device automatically records the patients’ temperature readings and posts the records to a central monitoring application API. Each posted record is a JSON formatted message that contains the deviceId that uniquely identifies the thermometer, a patientId to identify the patient, the patient’s temperature reading, and the eventTime when the temperature was recorded.

Record schema

The central monitoring application checks the hourly average temperature readings for each patient and notifies the hospital’s healthcare workers when a patient’s average temperature exceeds accepted thresholds (36.1–37.2°C). In our case, we use the Athena console to analyze the readings.

Overview of the solution

In this post, we use an AWS Glue Python shell job to simulate incoming data from the hospital thermometers. This job produces messages that are securely written to an MSK cluster using mutual TLS authentication.

To process the streaming data from the MSK cluster, we deploy an AWS Glue Streaming extract, transform, and load (ETL) job. This job automatically infers the schema from the incoming data, stores the schema metadata in the Data Catalog, and then stores the processed data as efficient Parquet files in Amazon Simple Storage Service (Amazon S3). We use Athena to query the output table in the Data Catalog and uncover insights.

The following diagram illustrates the architecture of the solution.

Solution architecture

The solution workflow consists of the following steps:

  1. Create a private certificate authority (CA) using AWS Certificate Manager (ACM).
  2. Set up an MSK cluster with mutual TLS authentication.
  3. Create a Java keystore (JKS) file and generate a client certificate and private key.
  4. Create a Kafka connection in AWS Glue.
  5. Create a Python shell job in AWS Glue to create a topic and push messages to Kafka.
  6. Create an AWS Glue Streaming job to consume and process the messages.
  7. Analyze the processed data in Athena.

Prerequisites

You should have the following prerequisites:

Cloud Formation stack set

This template creates two NAT gateways as shown in the following diagram. However, it’s possible to route the traffic to a single NAT gateway in one Availability Zone for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT gateway available in each Availability Zone.

VPC setup

The stack also creates a security group with a self-referencing rule to allow communication between AWS Glue components.

Create a private CA using ACM

Complete the following steps to create a root CA. For more details, refer to Creating a private CA.

  1. On the AWS Private CA console, choose Create a private CA.
  2. For Mode options, select either General-purpose or Short-lived certificate for lower pricing.
  3. For CA type options, select Root.
  4. Provide certificate details by providing at least one distinguished name.

Create private CA

  1. Leave the remaining default options and select the acknowledge checkbox.
  2. Choose Create CA.
  3. On the Actions menu, choose Install CA certificate and choose Confirm and install.

Install certificate

Set up an MSK cluster with mutual TLS authentication

Before setting up the MSK cluster, make sure you have a VPC with at least two private subnets in different Availability Zones and a NAT gateway with a route to the internet. A CloudFormation template is provided in the prerequisites section.

Complete the following steps to set up your cluster:

  1. On the Amazon MSK console, choose Create cluster.
  2. For Creation method, Custom create.
  3. For Cluster type, select Provisioned.
  4. For Broker size, you can choose kafka.t3.small for the purpose of this post.
  5. For Number of zones, choose 2.
  6. Choose Next.
  7. In the Networking section, select the VPC, private subnets, and security group you created in the prerequisites section.
  8. In the Security settings section, under Access control methods, select TLS client authentication through AWS Certificate Manager (ACM).
  9. For AWS Private CAs, choose the AWS private CA you created earlier.

The MSK cluster creation can take up to 30 minutes to complete.

Create a JKS file and generate a client certificate and private key

Using the root CA, you generate client certificates to use for authentication. The following instructions are for CloudShell, but can also be adapted for a client machine with Java and the AWS CLI installed.

  1. Open a new CloudShell session and run the following commands to create the certs directory and install Java:
mkdir certs
cd certs
sudo yum -y install java-11-amazon-corretto-headless
  1. Run the following command to create a keystore file with a private key in JKS format. Replace Distinguished-NameExample-AliasYour-Store-Pass, and Your-Key-Pass with strings of your choice:

keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass Your-Store-Pass -keypass Your-Key-Pass -dname "CN=Distinguished-Name" -alias Example-Alias -storetype pkcs12

  1. Generate a certificate signing request (CSR) with the private key created in the preceding step:

keytool -keystore kafka.client.keystore.jks -certreq -file csr.pem -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass

  1. Run the following command to remove the word NEW (and the single space that follows it) from the beginning and end of the file:

sed -i -E '1,$ s/NEW //' csr.pem

The file should start with -----BEGIN CERTIFICATE REQUEST----- and end with -----END CERTIFICATE REQUEST-----

  1. Using the CSR file, create a client certificate using the following command. Replace Private-CA-ARN with the ARN of the private CA you created.

aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://csr.pem --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS"

The command should print out the ARN of the issued certificate. Save the CertificateArn value for use in the next step.

{
"CertificateArn": "arn:aws:acm-pca:region:account:certificate-authority/CA_ID/certificate/certificate_ID"
}
  1. Use the Private-CA-ARN together with the CertificateArn (arn:aws:acp-pca:<region>:...) generated in the preceding step to retrieve the signed client certificate. This will create a client-cert.pem file.

aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN | jq -r '.Certificate + "\n" + .CertificateChain' >> client-cert.pem

  1. Add the certificate into the Java keystore so you can present it when you talk to the MSK brokers:

keytool -keystore kafka.client.keystore.jks -import -file client-cert.pem -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass -noprompt

  1. Extract the private key from the JKS file. Provide the same destkeypass and deststorepass and enter the keystore password when prompted.

keytool -importkeystore -srckeystore kafka.client.keystore.jks -destkeystore keystore.p12 -srcalias Example-Alias -deststorepass Your-Store-Pass -destkeypass Your-Key-Pass -deststoretype PKCS12

  1. Convert the private key to PEM format. Enter the keystore password you provided in the previous step when prompted.

openssl pkcs12 -in keystore.p12 -nodes -nocerts -out private-key.pem

  1. Remove the lines that begin with Bag Attributes.. from the top of the file:

sed -i -ne '/-BEGIN PRIVATE KEY-/,/-END PRIVATE KEY-/p' private-key.pem

  1. Upload the client-cert.pem, client.keystore.jks, and private-key.pem files to Amazon S3. You can either create a new S3 bucket or use an existing bucket to store the following objects. Replace <s3://aws-glue-assets-11111111222222-us-east-1/certs/> with your S3 location.

aws s3 sync ~/certs s3://aws-glue-assets-11111111222222-us-east-1/certs/ --exclude '*' --include 'client-cert.pem' --include 'private-key.pem' --include 'kafka.client.keystore.jks'

Create a Kafka connection in AWS Glue

Complete the following steps to create a Kafka connection:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. Select Apache Kafka and choose Next.
  4. For Amazon Managed Streaming for Apache Kafka Cluster, choose the MSK cluster you created earlier.

Create Glue Kafka connection

  1. Choose TLS client authentication for Authentication method.
  2. Enter the S3 path to the keystore you created earlier and provide the keystore and client key passwords you used for the -storepass and -keypass

Add authentication method to connection

  1. Under Networking options, choose your VPC, a private subnet, and a security group. The security group should contain a self-referencing rule.
  2. On the next page, provide a name for the connection (for example, Kafka-connection) and choose Create connection.

Create a Python shell job in AWS Glue to create a topic and push messages to Kafka

In this section, you create a Python shell job to create a new Kafka topic and push JSON messages to the topic. Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs.
  2. In the Script section, for Engine, choose Python shell.
  3. Choose Create script.

Create Python shell job

  1. Enter the following script in the editor:
import sys
from awsglue.utils import getResolvedOptions
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka.errors import TopicAlreadyExistsError
from urllib.parse import urlparse

import json
import uuid
import datetime
import boto3
import time
import random

# Fetch job parameters
args = getResolvedOptions(sys.argv, ['connection-names', 'client-cert', 'private-key'])

# Download client certificate and private key files from S3
TOPIC = 'example_topic'
client_cert = urlparse(args['client_cert'])
private_key = urlparse(args['private_key'])

s3 = boto3.client('s3')
s3.download_file(client_cert.netloc, client_cert.path.lstrip('/'),  client_cert.path.split('/')[-1])
s3.download_file(private_key.netloc, private_key.path.lstrip('/'),  private_key.path.split('/')[-1])

# Fetch bootstrap servers from connection
args = getResolvedOptions(sys.argv, ['connection-names'])
if ',' in args['connection_names']:
    raise ValueError("Choose only one connection name in the job details tab!")
glue_client = boto3.client('glue')
response = glue_client.get_connection(Name=args['connection_names'], HidePassword=True)
bootstrapServers = response['Connection']['ConnectionProperties']['KAFKA_BOOTSTRAP_SERVERS']

# Create topic and push messages 
admin_client = KafkaAdminClient(bootstrap_servers= bootstrapServers, security_protocol= 'SSL', ssl_certfile= client_cert.path.split('/')[-1], ssl_keyfile= private_key.path.split('/')[-1])
try:
    admin_client.create_topics(new_topics=[NewTopic(name=TOPIC, num_partitions=1, replication_factor=1)], validate_only=False)
except TopicAlreadyExistsError:
    # Topic already exists
    pass
admin_client.close()

# Generate JSON messages for the new topic
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers=bootstrapServers, security_protocol='SSL', 
                         ssl_check_hostname=True, ssl_certfile= client_cert.path.split('/')[-1], ssl_keyfile= private_key.path.split('/')[-1])
                         
for i in range(1200):
    _event = {
        "deviceId": str(uuid.uuid4()),
        "patientId": "PI" + str(random.randint(1,15)).rjust(5, '0'),
        "temperature": round(random.uniform(32.1, 40.9), 1),
        "eventTime": str(datetime.datetime.now())
    }
    producer.send(TOPIC, _event)
    time.sleep(3)
    
producer.close()
  1. On the Job details tab, provide a name for your job, such as Kafka-msk-producer.
  2. Choose an IAM role. If you don’t have one, create one following the instructions in Configuring IAM permissions for AWS Glue.
  3. Under Advanced properties, for Connections, choose the Kafka-connection connection you created.
  4. Under Job parameters, add the following parameters and values:
    1. Key: --additional-python-modules, value: kafka-python.
    2. Key: --client-cert, value: s3://aws-glue-assets-11111111222222-us-east-1/certs/client-cert.pem. Replace with your client-cert.pem Amazon S3 location from earlier.
    3. Key: --private-key, value: s3://aws-glue-assets-11111111222222-us-east-1/certs/private-key.pem. Replace with your private-key.pem Amazon S3 location from earlier.
      AWS Glue Job parameters
  5. Save and run the job.

You can confirm that the job run status is Running on the Runs tab.

At this point, we have successfully created a Python shell job to simulate the thermometers sending temperature readings to the monitoring application. The job will run for approximately 1 hour and push 1,200 records to Amazon MSK.

Alternatively, you can replace the Python shell job with a Scala ETL job to act as a producer to send messages to the MSK cluster. In this case, use the JKS file for authentication using ssl.keystore.type=JKS. If you’re using PEM format keys, the current version of Kafka clients libraries (2.4.1) installed in AWS Glue version 4 don’t yet support authentication through certificates in PEM format (as of this writing).

Create an AWS Glue Streaming job to consume and process the messages

You can now create an AWS Glue ETL job to consume and process the messages in the Kafka topic. AWS Glue will automatically infer the schema from the files. Complete the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. Choose Visual ETL to author a new job.
  3. For Sources, choose Apache Kafka.
  4. For Connection name, choose the node and connection name you created earlier.
  5. For Topic name, enter the topic name (example_topic) you created earlier.
  6. Leave the rest of the options as default.

Kafka data source

  1. Add a new target node called Amazon S3 to store the output Parquet files generated from the streaming data.
  2. Choose Parquet as the data format and provide an S3 output location for the generated files.
  3. Select the option to allow AWS Glue to create a table in the Data Catalog and provide the database and table names.

S3 Output node

  1. On the job details tab, provide the following options:
    1. For the requested number of workers, enter 2.
    2. For IAM Role, choose an IAM role with permissions to read and write to the S3 output location.
    3. For Job timeout, enter 60 (for the job to stop after 60 minutes).
    4. Under Advanced properties, for Connections, choose the connection you created.
  2. Save and run the job.

You can confirm the S3 output location for new Parquet files created under the prefixes s3://<output-location>/ingest_year=XXXX/ingest_month=XX/ingest_day=XX/ingest_hour=XX/.

At this point, you have created a streaming job to process events from Amazon MSK and store the JSON formatted records as Parquet files in Amazon S3. AWS Glue streaming jobs are meant to be running continuously to process streaming data. We have set the timeout to stop the job after 60 minutes. You can also stop the job manually after the records have been processed to Amazon S3.

Analyze the data in Athena

Going back to our example use case, you can run the following query in Athena to monitor and track the hourly average temperature readings for patients that exceed the normal thresholds (36.1–37.2°C):

SELECT
date_format(parse_datetime(eventTime, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), '%h %p') hour,
patientId,
round(avg(temperature), 1) average_temperature,
count(temperature) readings
FROM "default"."devices_data"
GROUP BY 1, 2
HAVING avg(temperature) > 37.2 or avg(temperature) < 36.1
ORDER BY 2, 1 DESC

Amazon Athena Console

Run the query multiple times and observe how the average_temperature and the number of readings changes with new incoming data from the AWS Glue Streaming job. In our example scenario, healthcare workers can use this information to identify patients who are experiencing consistent high or low body temperatures and give the required attention.

At this point, we have successfully created and ingested streaming data to our MSK cluster using mutual TLS authentication. We only needed the certificates generated by AWS Private CA to authenticate our AWS Glue clients to the MSK cluster and process the streaming data with an AWS Glue Streaming job. Finally, we used Athena to visualize the data and observed how the data changes in near real time.

Clean up

To clean up the resources created in this post, complete the following steps:

  1. Delete the private CA you created.
  2. Delete the MSK cluster you created.
  3. Delete the AWS Glue connection you created.
  4. Stop the jobs if they are still running and delete the jobs you created.
  5. If you used the CloudFormation stack provided in the prerequisites, delete the CloudFormation stack to delete the VPC and other networking components.

Conclusion

This post demonstrated how you can use AWS Glue to consume, process, and store streaming data for Amazon MSK using mutual TLS authentication. AWS Glue Streaming automatically infers the schema and creates a table in the Data Catalog. You can then query the table using other data analysis tools like Athena, Amazon Redshift, and Amazon QuickSight to provide insights into the streaming data.

Try out the solution for yourself, and let us know your questions and feedback in the comments section.


About the Authors

Edward Okemwa OndariEdward Okemwa is a Big Data Cloud Support Engineer (ETL) at AWS Nairobi specializing in AWS Glue and Amazon Athena. He is dedicated to providing customers with technical guidance and resolving issues related to processing and analyzing large volumes of data. In his free time, he enjoys singing choral music and playing football.

Edward Okemwa OndariEmmanuel Mashandudze is a Senior Big Data Cloud Engineer specializing in AWS Glue. He collaborates with product teams to help customers efficiently transform data in the cloud. He helps customers design and implements robust data pipelines. Outside of work, Emmanuel is an avid marathon runner, sports enthusiast and enjoys creating memories with his family.

Improve Apache Kafka scalability and resiliency using Amazon MSK tiered storage

Post Syndicated from Sai Maddali original https://aws.amazon.com/blogs/big-data/improve-apache-kafka-scalability-and-resiliency-using-amazon-msk-tiered-storage/

Since the launch of tiered storage for Amazon Managed Streaming for Apache Kafka (Amazon MSK), customers have embraced this feature for its ability to optimize storage costs and improve performance. In previous posts, we explored the inner workings of Kafka, maximized the potential of Amazon MSK, and delved into the intricacies of Amazon MSK tiered storage. In this post, we deep dive into how tiered storage helps with faster broker recovery and quicker partition migrations, facilitating faster load balancing and broker scaling.

Apache Kafka availability

Apache Kafka is a distributed log service designed to provide high availability and fault tolerance. At its core, Kafka employs several mechanisms to provide reliable data delivery and resilience against failures:

  • Kafka replication – Kafka organizes data into topics, which are further divided into partitions. Each partition is replicated across multiple brokers, with one broker acting as the leader and the others as followers. If the leader broker fails, one of the follower brokers is automatically elected as the new leader, providing continuous data availability. The replication factor determines the number of replicas for each partition. Kafka maintains a list of in-sync replicas (ISRs) for each partition, which are the replicas that are up to date with the leader.
  • Producer acknowledgments – Kafka producers can specify the required acknowledgment level for write operations. This makes sure the data is durably persisted on the configured number of replicas before the producer receives an acknowledgment, reducing the risk of data loss.
  • Consumer group rebalancing – Kafka consumers are organized into consumer groups, where each consumer in the group is responsible for consuming a subset of the partitions. If a consumer fails, the partitions it was consuming are automatically reassigned to the remaining consumers in the group, providing continuous data consumption.
  • Zookeeper or KRaft for cluster coordination – Kafka relies on Apache ZooKeeper or KRaft for cluster coordination and metadata management. It maintains information about brokers, topics, partitions, and consumer offsets, enabling Kafka to recover from failures and maintain a consistent state across the cluster.

Kafka’s storage architecture and its impact on availability and resiliency

Although Kafka provides robust fault-tolerance mechanisms, in the traditional Kafka architecture, brokers store data locally on their attached storage volumes. This tight coupling of storage and compute resources can lead to several issues, impacting availability and resiliency of the cluster:

  • Slow broker recovery – When a broker fails, the recovery process involves transferring data from the remaining replicas to the new broker. This data transfer can be slow, especially for large data volumes, leading to prolonged periods of reduced availability and increased recovery times.
  • Inefficient load balancing – Load balancing in Kafka involves moving partitions between brokers to distribute the load evenly. However, this process can be resource-intensive and time-consuming, because it requires transferring large amounts of data between brokers.
  • Scaling limitations – Scaling a Kafka cluster traditionally involves adding new brokers and rebalancing partitions across the expanded set of brokers. This process can be disruptive and time-consuming, especially for large clusters with high data volumes.

How Amazon MSK tiered storage improves availability and resiliency

Amazon MSK offers tiered storage, a feature that allows configuring local and remote tiers. This greatly decouples compute and storage resources and thereby addresses the aforementioned challenges, improving availability and resiliency of Kafka clusters. You can benefit from the following:

  • Faster broker recovery – With tiered storage, data automatically moves from the faster Amazon Elastic Block Store (Amazon EBS) volumes to the more cost-effective storage tier over time. New messages are initially written to Amazon EBS for fast performance. Based on your local data retention policy, Amazon MSK transparently transitions that data to tiered storage. This frees up space on the EBS volumes for new messages. When broker fails and recovers either due to node or volume failure, the catch-up is faster because it only needs to catch up data stored on the local tier from the leader.
  • Efficient load balancing – Load balancing in Amazon MSK with tiered storage is more efficient because there is less data to move while reassigning partition. This process is faster and less resource-intensive, enabling more frequent and seamless load balancing operations.
  • Faster scaling – Scaling an MSK cluster with tiered storage is a seamless process. New brokers can be added to the cluster without the need for a large amount of data transfer and longer time for partition rebalancing. The new brokers can start serving traffic much faster, because the catch-up process takes less time, improving the overall cluster throughput and reducing downtime during scaling operations.

As shown in the following figure, MSK brokers and EBS volumes are tightly coupled. On a three-AZ deployed cluster, when you create a topic with replication factor three, Amazon MSK spreads those three replicas across all three Availability Zones and the EBS volumes attached with that broker store all the topic data spread across three Availability Zones. If you need to move a partition from one broker to another, Amazon MSK needs to move all the segments (both active and closed) from the existing broker to the new brokers, as illustrated in the following figure.

However, when you enable tiered storage for that topic, Amazon MSK transparently moves all closed segments for a topic from EBS volumes to tiered storage. That storage provides the built-in capability for durability and high availability with virtually unlimited storage capacity. With closed segments moved to tiered storage and only active segments on the local volume, your local storage footprint remains minimal regardless of topic size. If you need to move the partition to a new broker, the data movement is very minimal across the brokers. The following figure illustrates this updated configuration.

Amazon MSK tiered storage addresses the challenges posed by Kafka’s traditional storage architecture, enabling faster broker recovery, efficient load balancing, and seamless scaling, thereby improving availability and resiliency of your cluster. To learn more about the core components of Amazon MSK tiered storage, refer to Deep dive on Amazon MSK tiered storage.

A real-world test

We hope that you now understand how Amazon MSK tiered storage can improve your Kafka resiliency and availability. To test it, we created a three-node cluster with the new m7g instance type. We created a topic with a replication factor of three and without using tiered storage. Using the Kafka performance tool, we ingested 300 GB of data into the topic. Next, we added three new brokers to the cluster. Because Amazon MSK doesn’t automatically move partitions to these three new brokers, they will remain idle until we rebalance the partitions across all six brokers.

Let’s consider a scenario where we need to move all the partitions from the existing three brokers to the three new brokers. We used the kafka-reassign-partitions tool to move the partitions from the existing three brokers to the newly added three brokers. During this partition movement operation, we observed that the CPU usage was high, even though we weren’t performing any other operations on the cluster. This indicates that the high CPU usage was due to the data replication to the new brokers. As shown in the following metrics, the partition movement operation from broker 1 to broker 2 took approximately 75 minutes to complete.

Additionally, during this period, CPU utilization was elevated.

After completing the test, we enabled tiered storage on the topic with local.retention.ms=3600000 (1 hour) and retention.ms=31536000000. We continuously monitored the RemoteCopyBytesPerSec metrics to determine when the data migration to tiered storage was complete. After 6 hours, we observed zero activity on the RemoteCopyBytesPerSec metrics, indicating that all closed segments had been successfully moved to tiered storage. For instructions to enable tiered storage on an existing topic, refer to Enabling and disabling tiered storage on an existing topic.

We then performed the same test again, moving partitions to three empty brokers. This time, the partition movement operation was completed in just under 15 minutes, with no noticeable CPU usage, as shown in the following metrics. This is because, with tiered storage enabled, all the data has already been moved to the tiered storage, and we only have the active segment in the EBS volume. The partition movement operation is only moving the small active segment, which is why it takes less time and minimal CPU to complete the operation.

Conclusion

In this post, we explored how Amazon MSK tiered storage can significantly improve the scalability and resilience of Kafka. By automatically moving older data to the cost-effective tiered storage, Amazon MSK reduces the amount of data that needs to be managed on the local EBS volumes. This dramatically improves the speed and efficiency of critical Kafka operations like broker recovery, leader election, and partition reassignment. As demonstrated in the test scenario, enabling tiered storage reduced the time taken to move partitions between brokers from 75 minutes to just under 15 minutes, with minimal CPU impact. This enhanced the responsiveness and self-healing ability of the Kafka cluster, which is crucial for maintaining reliable, high-performance operations, even as data volumes continue to grow.

If you’re running Kafka and facing challenges with scalability or resilience, we highly recommend using Amazon MSK with the tiered storage feature. By taking advantage of this powerful capability, you can unlock the true scalability of Kafka and make sure your mission-critical applications can keep pace with ever-increasing data demands.

To get started, refer to Enabling and disabling tiered storage on an existing topic. Additionally, check out Automated deployment template of Cruise Control for Amazon MSK for effortlessly rebalancing your workload.


About the Authors

Sai Maddali is a Senior Manager Product Management at AWS who leads the product team for Amazon MSK. He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for real time tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Synchronize data lakes with CDC-based UPSERT using open table format, AWS Glue, and Amazon MSK

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/synchronize-data-lakes-with-cdc-based-upsert-using-open-table-format-aws-glue-and-amazon-msk/

In the current industry landscape, data lakes have become a cornerstone of modern data architecture, serving as repositories for vast amounts of structured and unstructured data. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in a downstream system. Capturing every change from transactions in a source database and moving them to the target keeps the systems synchronized, and helps with analytics use cases and zero-downtime database migrations.

However, efficiently managing and synchronizing data within these lakes presents a significant challenge. Maintaining data consistency and integrity across distributed data lakes is crucial for decision-making and analytics. Inaccurate or outdated data can lead to flawed insights and business decisions. Businesses require synchronized data to gain actionable insights and respond swiftly to changing market conditions. Scalability is a critical concern for data lakes, because they need to accommodate growing volumes of data without compromising performance or incurring exorbitant costs.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. We use MSK connect—an AWS managed service to deploy and run Kafka Connect to build an end-to-end CDC application that uses Debezium MySQL connector to process, insert, update, and delete records from MySQL and a confluent Amazon Simple Storage Service (Amazon S3) sink connector to write to Amazon S3 as raw data that can be consumed by other downstream application for further use cases. To process batch data effectively, we use AWS Glue, a serverless data integration service that uses the Spark framework to process the data from S3 and copies the data to the open table format layer. Open table format manages large collections of files as tables and supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. We chose Delta Lake as an example open table format, but you can achieve the same results using Apache Iceberg or Apache Hudi.

The post illustrates the construction of a comprehensive CDC system, enabling the processing of CDC data sourced from Amazon Relational Database Service (Amazon RDS) for MySQL. Initially, we’re creating a raw data lake of all modified records in the database in near real time using Amazon MSK and writing to Amazon S3 as raw data. This raw data can then be used to build a data warehouse or even a special type of data storage that’s optimized for analytics, such as a Delta Lake on S3. Later, we use an AWS Glue exchange, transform, and load (ETL) job for batch processing of CDC data from the S3 raw data lake. A key advantage of this setup is that you have complete control over the entire process, from capturing the changes in your database to transforming the data for your specific needs. This flexibility allows you to adapt the system to different use cases.

This is achieved through integration with MSK Connect using the Debezium MySQL connector, followed by writing data to Amazon S3 facilitated by the Confluent S3 Sink Connector. Subsequently, the data is processed from S3 using an AWS Glue ETL job, and then stored in the data lake layer. Finally, the Delta Lake table is queried using Amazon Athena.

Note: If you require real-time data processing of the CDC data, you can bypass the batch approach and use an AWS Glue streaming job instead. This job would directly connect to the Kafka topic in MSK, grabbing the data as soon as changes occur. It can then process and transform the data as needed, creating a Delta Lake on Amazon S3 that reflects the latest updates according to your business needs. This approach ensures you have the most up-to-date data available for real-time analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this blog post. Each number represents a major component of the solution.

The workflow consists of the following:

  1. Near real-time data capture from MySQL and streaming to Amazon S3
    1. The process starts with data originating from Amazon RDS for
    2. A Debezium connector is used to capture changes to the data in the RDS instance in near real time. Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect and immediately respond to row-level changes in the databases. Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors.
    3. The captured data changes are then streamed to an Amazon MSK topic. MSK is a managed service that simplifies running Apache Kafka on AWS.
    4. The processed data stream (topic) is streamed from MSK to Amazon S3 in JSON format. The Confluent S3 Sink Connector allows near real-time data transfer from an MSK cluster to an S3 bucket.
  2. Batch processing the CDC raw data and writing it into the data lake
    1. Set up an AWS Glue ETL job to process the raw CDC
    2. This job reads bookmarked data from an S3 raw bucket and writes into the data lake in open file format (Delta). The job also creates the Delta Lake table in AWS Glue Data Catalog.
    3. Delta Lake is an open-source storage layer built on top of existing data lakes. It adds functionalities like ACID transactions and versioning to improve data reliability and manageability.
  3. Analyze the data using serverless interactive query service
    1. Athena, a serverless interactive query service, can be used to query the Delta Lake table created in Glue Data Catalog. This allows for interactive data analysis without managing infrastructure.

For this post, we create the solution resources in the us-east-1 AWS Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of the CDC pipeline and AWS Glue processing according to your use case, and if you have requirements to create specific process resources only.

  1. vpc-msk-mskconnect-rds-client.yaml – This template sets up the CDC pipeline resources such as a virtual private cloud (VPC), subnet, security group, AWS Identity and Access Management (IAM) roles, NAT, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, Amazon MSK, MSKConnect, RDS, and S3
  2. gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database and ETL

Configure MSK and MSK connect

To start, you’ll configure MKS and MSK connect using Debezium connector to capture incremental changes in table and write into Amazon S3 using an S3 sink connector. The vpc-msk-mskconnect-rds-client.yaml stack creates a VPC, private and public subnets, security groups, S3 buckets, Amazon MSK cluster, EC2 instance with Kafka client, RDS database, and MSK connectors, and its worker configurations.

  1. Launch the stack vpc-msk-mskconnect-rds-client using the CloudFormation template:
    BDB-4100-CFN-Launch-Stack
  2. Provide the parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName An environment name that is prefixed to resource names. msk-delta-cdc-pipeline
3 DatabasePassword Database admin account password. S3cretPwd99
4 InstanceType MSK client EC2 instance type. t2.micro
5 LatestAmiId Latest AMI ID of Amazon Linux 2023 for EC2 instance. You can use the default value. /aws/service/ami-amazon-linux- latest/al2023-ami-kernel-6.1-x86_64
6 VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
7 PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
8 PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
9 PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
10 PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24
11 PrivateSubnet3CIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. 10.192.22.0/24
  1. The stack creation process can take approximately one hour to complete. Check the Outputs tab for the stack after the stack is created.

Next, you set up the AWS Glue data processing resources such as the AWS Glue database, table, and ETL job.

Implement UPSERT on an S3 data lake with Delta Lake using AWS Glue

The gluejob-setup.yaml CloudFormation template creates a database, IAM role, and AWS Glue ETL job. Retrieve the values for S3BucketNameForOutput, and S3BucketNameForScript from the vpc-msk-mskconnect-rds-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup.
    Launch Cloudformation Stack
  2. Provide parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName Environment name that is prefixed to resource names. gluejob-setup
3 GlueDataBaseName Name of the Data Catalog database. glue_cdc_blog_db
4 GlueTableName Name of the Data Catalog table. blog_cdc_tbl
5 S3BucketForGlueScript Bucket name for the AWS Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws- gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentNam e
6 GlueWorkerType Worker type for AWS Glue job. For example, G.1X G.1X
7 NumberOfWorkers Number of workers in the AWS Glue job. 3
8 S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
9 S3ConnectorTargetBucketname Bucket name where the Amazon MSK S3 sink connector writes the data from the Kafka topic. msk-lab-${AWS::AccountId}- target-bucket
  1. The stack creation process can take approximately 2 minutes to complete. Check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created an AWS Glue database and AWS Glue job. For further clarity, you can examine the AWS Glue database and job generated using the CloudFormation template.

After successfully creating the CloudFormation stack, you can proceed with processing data using the AWS Glue ETL job.

Run the AWS Glue ETL job

To process the data created in the S3 bucket from Amazon MSK using the AWS Glue ETL job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue ETL job from the GlueJobName In the following screenshot, the name is GlueCDCJob-glue-delta-cdc.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue ETL job named GlueCDCJob-glue-delta-cdc.
  3. Choose the job name to open its details page.
  4. Choose Run to start the On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template output.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

Note: We have enabled AWS Glue job bookmark, which will make sure job will process the new data in each job run.

Query the Delta Lake table using Athena

After the AWS Glue ETL job has successfully created the Delta Lake table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database glue_cdc_blog_db created using gluejob-setup stack.
  4. To validate the data, run the following query to preview the data and find the total count.
SELECT * FROM "glue_cdc_blog_db"."blog_cdc_tbl" ORDER BY cust_id DESC LIMIT 40;
SELECT COUNT(*) FROM "glue_cdc_blog_db"."blog_cdc_tbl";

The following screenshot shows the output of our example query.

Upload incremental (CDC) data for further processing

After we process the initial full load, let’s perform insert, update, and delete records in MySQL, which will be processed by the Debezium mysql connector and written to Amazon S3 using a confluent S3 sink connector.

  1. On the Amazon EC2 console, go to the EC2 instance named KafkaClientInstance that you created using the CloudFormation template.

  1. Sign in to the EC2 instance using SSM. Select KafkaClientInstance and then choose Connect.

  1. Run the following commands to insert the data into the RDS table. Use the database password from the CloudFormation stack parameter tab.
sudo su - ec2-user
RDS_AURORA_ENDPOINT=`aws rds describe-db-instances --region us-east-1 | jq -r '.DBInstances[] | select(.DBName == "salesdb") | .Endpoint.Address'`
mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password
  1. Now perform the insert into the CUSTOMER table.
use salesdb;
INSERT into CUSTOMER values(8887,'Customer Name 8887','Market segment 8887');
INSERT into CUSTOMER values(8888,'Customer Name 8888','Market segment 8888');
INSERT into CUSTOMER values(8889,'Customer Name 8889','Market segment 8889');

  1. Run the AWS Glue job again to update the Delta Lake table with new records.
  2. Use the Athena console to validate the data.
  3. Perform the insert, update, and delete in the CUSTOMER table.
    UPDATE CUSTOMER SET NAME='Customer Name update 8888',MKTSEGMENT='Market segment update 8888' where CUST_ID = 8888;
    UPDATE CUSTOMER SET NAME='Customer Name update 8889',MKTSEGMENT='Market segment update 8889' where CUST_ID = 8889;
    DELETE FROM CUSTOMER where CUST_ID = 8887;
    INSERT into CUSTOMER values(9000,'Customer Name 9000','Market segment 9000');
    

  4. Run the AWS Glue job again to update the Delta Lake table with the insert, update, and delete records.
  5. Use the Athena console to validate the data to verify the update and delete records in the Delta Lake table.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-msk-mskconnect-rds-client.

Conclusion

Organizations continually seek high-performance, cost-effective, and scalable analytical solutions to extract value from their operational data sources in near real time. The analytical platform must be capable of receiving updates to operational data as they happen. Traditional data lake solutions often struggle with managing changes in source data, but the Delta Lake framework addresses this challenge. This post illustrates the process of constructing an end-to-end change data capture (CDC) application using Amazon MSK, MSK Connect, AWS Glue, and native Delta Lake tables, alongside guidance on querying Delta Lake tables from Amazon Athena. This architectural pattern can be adapted to other data sources employing various Kafka connectors, enabling the creation of data lakes supporting UPSERT operations using AWS Glue and native Delta Lake tables. For further insights, see the MSK Connect examples.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specializing in AWS Glue and Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specializing in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Building a scalable streaming data platform that enables real-time and batch analytics of electric vehicles on AWS

Post Syndicated from Ayush Agrawal original https://aws.amazon.com/blogs/big-data/building-a-scalable-streaming-data-platform-that-enables-real-time-and-batch-analytics-of-electric-vehicles-on-aws/

The automobile industry has undergone a remarkable transformation because of the increasing adoption of electric vehicles (EVs). EVs, known for their sustainability and eco-friendliness, are paving the way for a new era in transportation. As environmental concerns and the push for greener technologies have gained momentum, the adoption of EVs has surged, promising to reshape our mobility landscape.

The surge in EVs brings with it a profound need for data acquisition and analysis to optimize their performance, reliability, and efficiency. In the rapidly evolving EV industry, the ability to harness, process, and derive insights from the massive volume of data generated by EVs has become essential for manufacturers, service providers, and researchers alike.

As the EV market is expanding with many new and incumbent players trying to capture the market, the major differentiating factor will be the performance of the vehicles.

Modern EVs are equipped with an array of sensors and systems that continuously monitor various aspects of their operation including parameters such as voltage, temperature, vibration, speed, and so on. From battery management to motor performance, these data-rich machines provide a wealth of information that, when effectively captured and analyzed, can revolutionize vehicle design, enhance safety, and optimize energy consumption. The data can be used to do predictive maintenance, device anomaly detection, real-time customer alerts, remote device management, and monitoring.

However, managing this deluge of data isn’t without its challenges. As the adoption of EVs accelerates, the need for robust data pipelines capable of collecting, storing, and processing data from an exponentially growing number of vehicles becomes more pronounced. Moreover, the granularity of data generated by each vehicle has increased significantly, making it essential to efficiently handle the ever-increasing number of data points. The challenges include not only the technical intricacies of data management but also concerns related to data security, privacy, and compliance with evolving regulations.

In this blog post, we delve into the intricacies of building a reliable data analytics pipeline that can scale to accommodate millions of vehicles, each generating hundreds of metrics every second using Amazon OpenSearch Ingestion. We also provide guidelines and sample configurations to help you implement a solution.

Of the prerequisites that follow, the IOT topic rule and the Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster can be set up by following How to integrate AWS IoT Core with Amazon MSK. The steps to create an Amazon OpenSearch Service cluster are available in Creating and managing Amazon OpenSearch Service domains.

Prerequisites

Before you begin the implementing the solution, you need the following:

  • IOT topic rule
  • Amazon MSK Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM) cluster
  • Amazon OpenSearch Service domain

Solution overview

The following architecture diagram provides a scalable and fully managed modern data streaming platform. The architecture uses Amazon OpenSearch Ingestion to stream data into OpenSearch Service and Amazon Simple Storage Service (Amazon S3) to store the data. The data in OpenSearch powers real-time dashboards. The data can also be used to notify customers of any failures occurring on the vehicle (see Configuring alerts in Amazon OpenSearch Service). The data in Amazon S3 is used for business intelligence and long-term storage.

Architecture diagram

In the following sections, we focus on the following three critical pieces of the architecture in depth:

1. Amazon MSK to OpenSearch ingestion pipeline

2. Amazon OpenSearch Ingestion pipeline to OpenSearch Service

3. Amazon OpenSearch Ingestion to Amazon S3

Solution Walkthrough

Step 1: MSK to Amazon OpenSearch Ingestion pipeline

Because each electric vehicle streams massive volumes of data to Amazon MSK clusters through AWS IoT Core, making sense of this data avalanche is critical. OpenSearch Ingestion provides a fully managed serverless integration to tap into these data streams.

The Amazon MSK source in OpenSearch Ingestion uses Kafka’s Consumer API to read records from one or more MSK topics. The MSK source in OpenSearch Ingestion seamlessly connects to MSK to ingest the streaming data into OpenSearch Ingestion’s processing pipeline.

The following snippet illustrates the pipeline configuration for an OpenSearch Ingestion pipeline used to ingest data from an MSK cluster.

While creating an OpenSearch Ingestion pipeline, add the following snippet in the Pipeline configuration section.

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                  
      topics: 
         - name: "ev-device-topic " 
           group_id: "opensearch-consumer" 
           serde_format: json                 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam:: ::<<account-id>>:role/opensearch-pipeline-Role"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:<<region>>:<<account-id>>:cluster/<<name>>/<<id>>" 

When configuring Amazon MSK and OpenSearch Ingestion, it’s essential to establish an optimal relationship between the number of partitions in your Kafka topics and the number of OpenSearch Compute Units (OCUs) allocated to your ingestion pipelines. This optimal configuration ensures efficient data processing and maximizes throughput. You can read more about it in Configure recommended compute units (OCUs) for the Amazon MSK pipeline.

Step 2: OpenSearch Ingestion pipeline to OpenSearch Service

OpenSearch Ingestion offers a direct method for streaming EV data into OpenSearch. The OpenSearch sink plugin channels data from multiple sources directly into the OpenSearch domain. Instead of manually provisioning the pipeline, you define the capacity for your pipeline using OCUs. Each OCU provides 6 GB of memory and two virtual CPUs. To use OpenSearch Ingestion auto-scaling optimally, it’s essential to configure the maximum number of OCUs for a pipeline based on the number of partitions in the topics being ingested. If a topic has a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), it’s recommended to configure the pipeline with a maximum of 1–96 OCUs. This way, the pipeline can automatically scale up or down within this range as needed. However, if a topic has a low number of partitions (for example, fewer than 96), it’s advisable to set the maximum number of OCUs to be equal to the number of partitions. This approach ensures that each partition is processed by a dedicated OCU enabling parallel processing and optimal performance. In scenarios where a pipeline ingests data from multiple topics, the topic with the highest number of partitions should be used as a reference to configure the maximum OCUs. Additionally, if higher throughput is required, you can create another pipeline with a new set of OCUs for the same topic and consumer group, enabling near-linear scalability.

OpenSearch Ingestion provides several pre-defined configuration blueprints that can help you quickly build your ingestion pipeline on AWS

The following snippet illustrates pipeline configuration for an OpenSearch Ingestion pipeline using OpenSearch as a SINK with a dead letter queue (DLQ) to Amazon S3. When a pipeline encounters write errors, it creates DLQ objects in the configured S3 bucket. DLQ objects exist within a JSON file as an array of failed events.

sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<domain-name>>.<<region>>.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam:: <<account-id>>:role/<<role-name>>"

Step 3: OpenSearch Ingestion to Amazon S3

OpenSearch Ingestion offers a built-in sink for loading streaming data directly into S3. The service can compress, partition, and optimize the data for cost-effective storage and analytics in Amazon S3. Data loaded into S3 can be partitioned for easier query isolation and lifecycle management. Partitions can be based on vehicle ID, date, geographic region, or other dimensions as needed for your queries.

The following snippet illustrates how we’ve partitioned and stored EV data in Amazon S3.

- s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "evbucket"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

The pipeline can be created following the steps in Creating Amazon OpenSearch Ingestion pipelines.

The following is the complete pipeline configuration, combining the configuration of all three steps. Update the Amazon Resource Names (ARNs), AWS Region, Open Search Service domain endpoint, and S3 names as needed.

The entire OpenSearch Ingestion pipeline configuration can be directly copied into the ‘Pipeline configuration’ field in the AWS Management Console while creating the OpenSearch Ingestion pipeline

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true           # Default is false  
      topics: 
         - name: "<<msk-topic-name>>" 
           group_id: "opensearch-consumer" 
           serde_format: json        
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-east-1:<<account-id>>:cluster/<<cluster-name>>/<<cluster-id>>" 
  processor:
      - parse_json:
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<opensearch-service-domain-endpoint>>.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
      - s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "<<bucket-name>>"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

Real-time analytics

After the data is available in OpenSearch Service, you can build real-time monitoring and notifications. OpenSearch Service has robust support for multiple notification channels, allowing you to receive alerts through services like Slack, Chime, custom webhooks, Microsoft Teams, email, and Amazon Simple Notification Service (Amazon SNS).

The following screenshot illustrates supported notification channels in OpenSearch Service.

The notification feature in OpenSearch Service allows you to create monitors that will watch for certain conditions or changes in your data and launch alerts, such as monitoring vehicle telemetry data and launching alerts for issues like battery degradation or abnormal energy consumption. For example, you can create a monitor that analyzes battery capacity over time and notifies the on-call team using Slack if capacity drops below expected degradation curves in a significant number of vehicles. This could indicate a potential manufacturing defect requiring investigation.

In addition to notifications, OpenSearch Service makes it easy to build real-time dashboards to visually track metrics across your fleet of vehicles. You can ingest vehicle telemetry data like location, speed, fuel consumption, and so on, and visualize it on maps, charts, and gauges. Dashboards can provide real-time visibility into vehicle health and performance.

The following screenshot illustrates creating a sample dashboard on OpenSearch Service

Opensearch Dashboard

A key benefit of OpenSearch Service is its ability to handle high sustained ingestion and query rates with millisecond latencies. It distributes incoming vehicle data across data nodes in a cluster for parallel processing. This allows OpenSearch to scale out to handle very large fleets while still delivering the real-time performance needed for operational visibility and alerting.

Batch analytics

After the data is available in Amazon S3, you can build a secure data lake to power a variety of analytics use cases deriving powerful insights. As an immutable store, new data is continually stored in S3 while existing data remains unaltered. This serves as a single source of truth for downstream analytics.

For business intelligence and reporting, you can analyze trends, identify insights, and create rich visualizations powered by the data lake. You can use Amazon QuickSight to build and share dashboards without needing to set up servers or infrastructure. Here’s an example of a Quicksight dashboard for IoT device data. For example, you can use a dashboard to gain insights from historical data that can help with better vehicle and battery design.

The Amazon Quicksight public gallery shows examples of dashboards across different domains.

You should consider Amazon OpenSearch dashboards for your operational day-to-day use cases to identify issues and alert in near real time whereas Amazon Quicksight should be used to analyze big data stored in a lake house and generate actionable insights from them.

Clean up

Delete the OpenSearch pipeline and Amazon MSK cluster to stop incurring costs on these services.

Conclusion

In this post, you learned how Amazon MSK, OpenSearch Ingestion, OpenSearch Services, and Amazon S3 can be integrated to ingest, process, store, analyze, and act on endless streams of EV data efficiently.

With OpenSearch Ingestion as the integration layer between streams and storage, the entire pipeline scales up and down automatically based on demand. No more complex cluster management or lost data from bursts in streams.

See Amazon OpenSearch Ingestion to learn more.


About the authors

Ayush Agrawal is a Startups Solutions Architect from Gurugram, India with 11 years of experience in Cloud Computing. With a keen interest in AI, ML, and Cloud Security, Ayush is dedicated to helping startups navigate and solve complex architectural challenges. His passion for technology drives him to constantly explore new tools and innovations. When he’s not architecting solutions, you’ll find Ayush diving into the latest tech trends, always eager to push the boundaries of what’s possible.

Fraser SequeiraFraser Sequeira is a Solutions Architect with AWS based in Mumbai, India. In his role at AWS, Fraser works closely with startups to design and build cloud-native solutions on AWS, with a focus on analytics and streaming workloads. With over 10 years of experience in cloud computing, Fraser has deep expertise in big data, real-time analytics, and building event-driven architecture on AWS.

How EchoStar ingests terabytes of data daily across its 5G Open RAN network in near real-time using Amazon Redshift Serverless Streaming Ingestion

Post Syndicated from Balaram Mathukumilli original https://aws.amazon.com/blogs/big-data/how-echostar-ingests-terabytes-of-data-daily-across-its-5g-open-ran-network-in-near-real-time-using-amazon-redshift-serverless-streaming-ingestion/

This post was co-written with Balaram Mathukumilli, Viswanatha Vellaboyana and Keerthi Kambam from DISH Wireless, a wholly owned subsidiary of EchoStar.

EchoStar, a connectivity company providing television entertainment, wireless communications, and award-winning technology to residential and business customers throughout the US, deployed the first standalone, cloud-native Open RAN 5G network on AWS public cloud.

Amazon Redshift Serverless is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, simple, and secure analytics at scale. Amazon Redshift data sharing allows you to share data within and across organizations, AWS Regions, and even third-party providers, without moving or copying the data. Additionally, it allows you to use multiple warehouses of different types and sizes for extract, transform, and load (ETL) jobs so you can tune your warehouses based on your write workloads’ price-performance needs.

You can use the Amazon Redshift Streaming Ingestion capability to update your analytics data warehouse in near real time. Redshift Streaming Ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use SQL to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK), and pull data directly to Amazon Redshift.

EchoStar uses Redshift Streaming Ingestion to ingest over 10 TB of data daily from more than 150 MSK topics in near real time across its Open RAN 5G network. This post provides an overview of real-time data analysis with Amazon Redshift and how EchoStar uses it to ingest hundreds of megabytes per second. As data sources and volumes grew across its network, EchoStar migrated from a single Redshift Serverless workgroup to a multi-warehouse architecture with live data sharing. This resulted in improved performance for ingesting and analyzing their rapidly growing data.

“By adopting the strategy of ‘parse and transform later,’ and establishing an Amazon Redshift data warehouse farm with a multi-cluster architecture, we leveraged the power of Amazon Redshift for direct streaming ingestion and data sharing.

“This innovative approach improved our data latency, reducing it from two–three days to an average of 37 seconds. Additionally, we achieved better scalability, with Amazon Redshift direct streaming ingestion supporting over 150 MSK topics.”

—Sandeep Kulkarni, VP, Software Engineering & Head of Wireless OSS Platforms at EchoStar

EchoStar use case

EchoStar needed to provide near real-time access to 5G network performance data for downstream consumers and interactive analytics applications. This data is sourced from the 5G network EMS observability infrastructure and is streamed in near real-time using AWS services like AWS Lambda and AWS Step Functions. The streaming data produced many small files, ranging from bytes to kilobytes. To efficiently integrate this data, a messaging system like Amazon MSK was required.

EchoStar was processing over 150 MSK topics from their messaging system, with each topic containing around 1 billion rows of data per day. This resulted in an average total data volume of 10 TB per day. To use this data, EchoStar needed to visualize it, perform spatial analysis, join it with third-party data sources, develop end-user applications, and use the insights to make near real-time improvements to their terrestrial 5G network. EchoStar needed a solution that does the following:

  • Optimize parsing and loading of over 150 MSK topics to enable downstream workloads to run simultaneously without impacting each other
  • Allow hundreds of queries to run in parallel with desired query throughput
  • Seamlessly scale capacity with the increase in user base and maintain cost-efficiency

Solution overview

EchoStar migrated from a single Redshift Serverless workgroup to a multi-warehouse Amazon Redshift architecture in partnership with AWS. The new architecture enables workload isolation by separating streaming ingestion and ETL jobs from analytics workloads across multiple Redshift compute instances. At the same time, it provides live data sharing using a single copy of the data between the data warehouse. This architecture takes advantage of AWS capabilities to scale Redshift streaming ingestion jobs and isolate workloads while maintaining data access.

The following diagram shows the high-level end-to-end serverless architecture and overall data pipeline.

Architecture Diagram

The solution consists of the following key components:

  • Primary ETL Redshift Serverless workgroup – A primary ETL producer workgroup of size 392 RPU
  • Secondary Redshift Serverless workgroups – Additional producer workgroups of varying sizes to distribute and scale near real-time data ingestion from over 150 MSK topics based on price-performance requirements
  • Consumer Redshift Serverless workgroup – A consumer workgroup instance to run analytics using Tableau

To efficiently load multiple MSK topics into Redshift Serverless in parallel, we first identified the topics with the highest data volumes in order to determine the appropriate sizing for secondary workgroups.

We began by sizing the system initially to Redshift Serverless workgroup of 64 RPU. Then we onboarded a small number of MSK topics, creating related streaming materialized views. We incrementally added more materialized views, evaluating overall ingestion cost, performance, and latency needs within a single workgroup. This initial benchmarking gave us a solid baseline to onboard the remaining MSK topics across multiple workgroups.

In addition to a multi-warehouse approach and workgroup sizing, we optimized such large-scale data volume ingestion with an average latency of 37 seconds by splitting ingestion jobs into two steps:

  • Streaming materialized views – Use JSON_PARSE to ingest data from MSK topics in Amazon Redshift
  • Flattening materialized views – Shred and perform transformations as a second step, reading data from the respective streaming materialized view

The following diagram depicts the high-level approach.

MSK to Redshift

Best practices

In this section, we share some of the best practices we observed while implementing this solution:

  • We performed an initial Redshift Serverless workgroup sizing based on three key factors:
    • Number of records per second per MSK topic
    • Average record size per MSK topic
    • Desired latency SLA
  • Additionally, we created only one streaming materialized view for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic.
  • While defining the streaming materialized view, we avoided using JSON_EXTRACT_PATH_TEXT to pre-shred data, because json_extract_path_text operates on the data row by row, which significantly impacts ingestion throughput. Instead, we adopted JSON_PARSE with the CAN_JSON_PARSE function to ingest data from the stream at lowest latency and to guard against errors. The following is a sample SQL query we used for the MSK topics (the actual data source names have been masked due to security reasons):
CREATE MATERIALIZED VIEW <source-name>_streaming_mvw AUTO REFRESH YES AS
SELECT
    kafka_partition,
    kafka_offset,
    refresh_time,
    case when CAN_JSON_PARSE(kafka_value) = true then JSON_PARSE(kafka_value) end as Kafka_Data,
    case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end as Invalid_Data
FROM
    external_<source-name>."<source-name>_mvw";
  • We kept the streaming materialized views simple and moved all transformations like unnesting, aggregation, and case expressions to a later step as flattening materialized views. The following is a sample SQL query we used to flatten data by reading the streaming materialized views created in the previous step (the actual data source and column names have been masked due to security reasons):
CREATE MATERIALIZED VIEW <source-name>_flatten_mvw AUTO REFRESH NO AS
SELECT
    kafka_data."<column1>" :: integer as "<column1>",
    kafka_data."<column2>" :: integer as "<column2>",
    kafka_data."<column3>" :: bigint as "<column3>",
    … 
    …
    …
    …
FROM
    <source-name>_streaming_mvw;
  • The streaming materialized views were set to auto refresh so that they can continuously ingest data into Amazon Redshift from MSK topics.
  • The flattening materialized views were set to manual refresh based on SLA requirements using Amazon Managed Workflows for Apache Airflow (Amazon MWAA).
  • We skipped defining any sort key in the streaming materialized views to further accelerate the ingestion speed.
  • Lastly, we used SYS_MV_REFRESH_HISTORY and SYS_STREAM_SCAN_STATES system views to monitor the streaming ingestion refreshes and latencies.

For more information about best practices and monitoring techniques, refer to Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK.

Results

EchoStar saw improvements with this solution in both performance and scalability across their 5G Open RAN network.

Performance

By isolating and scaling Redshift Streaming Ingestion refreshes across multiple Redshift Serverless workgroups, EchoStar met their latency SLA requirements. We used the following SQL query to measure latencies:

WITH curr_qry as (
    SELECT
        mv_name,
        cast(partition_id as int) as partition_id,
        max(query_id) as current_query_id
    FROM
        sys_stream_scan_states
    GROUP BY
        mv_name,
        cast(partition_id as int)
)
SELECT
    strm.mv_name,
    tmp.partition_id,
    min(datediff(second, stream_record_time_max, record_time)) as min_latency_in_secs,
    max(datediff(second, stream_record_time_min, record_time)) as max_latency_in_secs
FROM
    sys_stream_scan_states strm,
    curr_qry tmp
WHERE
    strm.query_id = tmp.current_query_id
    and strm.mv_name = tmp.mv_name
    and strm.partition_id = tmp.partition_id
GROUP BY 1,2
ORDER BY 1,2;

When we further aggregate the preceding query to only the mv_name level (removing partition_id, which uniquely identifies a partition in an MSK topic), we find the average daily performance results we achieved on a Redshift Serverless workgroup size of 64 RPU as shown in the following chart. (The actual materialized view names have been hashed for security reasons because it maps to an external vendor name and data source.)

S.No. stream_name_hash min_latency_secs max_latency_secs avg_records_per_day
1 e022b6d13d83faff02748d3762013c 1 6 186,395,805
2 a8cc0770bb055a87bbb3d37933fc01 1 6 186,720,769
3 19413c1fc8fd6f8e5f5ae009515ffb 2 4 5,858,356
4 732c2e0b3eb76c070415416c09ffe0 3 27 12,494,175
5 8b4e1ffad42bf77114ab86c2ea91d6 3 4 149,927,136
6 70e627d11eba592153d0f08708c0de 5 5 121,819
7 e15713d6b0abae2b8f6cd1d2663d94 5 31 148,768,006
8 234eb3af376b43a525b7c6bf6f8880 6 64 45,666
9 38e97a2f06bcc57595ab88eb8bec57 7 100 45,666
10 4c345f2f24a201779f43bd585e53ba 9 12 101,934,969
11 a3b4f6e7159d9b69fd4c4b8c5edd06 10 14 36,508,696
12 87190a106e0889a8c18d93a3faafeb 13 69 14,050,727
13 b1388bad6fc98c67748cc11ef2ad35 25 118 509
14 cf8642fccc7229106c451ea33dd64d 28 66 13,442,254
15 c3b2137c271d1ccac084c09531dfcd 29 74 12,515,495
16 68676fc1072f753136e6e992705a4d 29 69 59,565
17 0ab3087353bff28e952cd25f5720f4 37 71 12,775,822
18 e6b7f10ea43ae12724fec3e0e3205c 39 83 2,964,715
19 93e2d6e0063de948cc6ce2fb5578f2 45 45 1,969,271
20 88cba4fffafd085c12b5d0a01d0b84 46 47 12,513,768
21 d0408eae66121d10487e562bd481b9 48 57 12,525,221
22 de552412b4244386a23b4761f877ce 52 52 7,254,633
23 9480a1a4444250a0bc7a3ed67eebf3 58 96 12,522,882
24 db5bd3aa8e1e7519139d2dc09a89a7 60 103 12,518,688
25 e6541f290bd377087cdfdc2007a200 71 83 176,346,585
26 6f519c71c6a8a6311f2525f38c233d 78 115 100,073,438
27 3974238e6aff40f15c2e3b6224ef68 79 82 12,770,856
28 7f356f281fc481976b51af3d76c151 79 96 75,077
29 e2e8e02c7c0f68f8d44f650cd91be2 92 99 12,525,210
30 3555e0aa0630a128dede84e1f8420a 97 105 8,901,014
31 7f4727981a6ba1c808a31bd2789f3a 108 110 11,599,385

All 31 materialized views running and refreshing concurrently and continuously show a minimum latency of 1 second and a maximum latency of 118 seconds over the last 7 days, meeting EchoStar’s SLA requirements.

Scalability

With this Redshift data sharing enabled multi-warehouse architecture approach, EchoStar can now quickly scale their Redshift compute resources on demand by using the Redshift data sharing architecture to onboard the remaining 150 MSK topics. In addition, as their data sources and MSK topics increase further, they can quickly add additional Redshift Serverless workgroups (for example, another Redshift Serverless 128 RPU workgroup) to meet their desired SLA requirements.

Conclusion

By using the scalability of Amazon Redshift and a multi-warehouse architecture with data sharing, EchoStar delivers near real-time access to over 150 million rows of data across over 150 MSK topics, totaling 10 TB ingested daily, to their users.

This split multi-producer/consumer model of Amazon Redshift can bring benefits to many workloads that have similar performance characteristics as EchoStar’s warehouse. With this pattern, you can scale your workload to meet SLAs while optimizing for price and performance. Please reach out to your AWS Account Team to engage an AWS specialist for additional help or for a proof of concept.


About the authors

Balaram Mathukumilli is Director, Enterprise Data Services at DISH Wireless. He is deeply passionate about Data and Analytics solutions. With 20+ years of experience in Enterprise and Cloud transformation, he has worked across domains such as PayTV, Media Sales, Marketing and Wireless. Balaram works closely with the business partners to identify data needs, data sources, determine data governance, develop data infrastructure, build data analytics capabilities, and foster a data-driven culture to ensure their data assets are properly managed, used effectively, and are secure

Viswanatha Vellaboyana, a Solutions Architect at DISH Wireless, is deeply passionate about Data and Analytics solutions. With 20 years of experience in enterprise and cloud transformation, he has worked across domains such as Media, Media Sales, Communication, and Health Insurance. He collaborates with enterprise clients, guiding them in architecting, building, and scaling applications to achieve their desired business outcomes.

Keerthi Kambam is a Senior Engineer at DISH Network specializing in AWS Services. She builds scalable data engineering and analytical solutions for dish customer faced applications. She is passionate about solving complex data challenges with cloud solutions.

Raks KhareRaks Khare is a Senior Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers across varying industries and regions architect data analytics solutions at scale on the AWS platform. Outside of work, he likes exploring new travel and food destinations and spending quality time with his family.

Adi Eswar has been a core member of the AI/ML and Analytics Specialist team, leading the customer experience of customer’s existing workloads and leading key initiatives as part of the Analytics Customer Experience Program and Redshift enablement in AWS-TELCO customers. He spends his free time exploring new food, cultures, national parks and museums with his family.

Shirin Bhambhani is a Senior Solutions Architect at AWS. She works with customers to build solutions and accelerate their cloud migration journey. She enjoys simplifying customer experiences on AWS.

Vinayak Rao is a Senior Customer Solutions Manager at AWS. He collaborates with customers, partners, and internal AWS teams to drive customer success, delivery of technical solutions, and cloud adoption.

Configure a custom domain name for your Amazon MSK cluster

Post Syndicated from Subham Rakshit original https://aws.amazon.com/blogs/big-data/configure-a-custom-domain-name-for-your-amazon-msk-cluster/

Amazon Managed Streaming for Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data. It runs open-source versions of Apache Kafka. This means existing applications, tooling, and plugins from partners and the Apache Kafka community are supported without requiring changes to application code.

Customers use Amazon MSK for real-time data sharing with their end customers, who could be internal teams or third parties. These end customers manage Kafka clients, which are deployed in AWS, other managed cloud providers, or on premises. When migrating from self-managed to Amazon MSK or moving clients between MSK clusters, customers want to avoid the need for Kafka client reconfiguration, to use a different Domain Name System (DNS) name. Therefore, it’s important to have a custom domain name for the MSK cluster that the clients can communicate to. Also, having a custom domain name makes the disaster recovery (DR) process less complicated because clients don’t need to change the MSK bootstrap address when either a new cluster is created or a client connection needs to be redirected to a DR AWS Region.

MSK clusters use AWS-generated DNS names that are unique for each cluster, containing the broker ID, MSK cluster name, two service generated sub-domains, and the AWS Region, ending with amazonaws.com. The following figure illustrates this naming format.

MSK brokers use the same DNS name for the certificates used for Transport Layer Security (TLS) connections. The DNS name used by clients with TLS encrypted authentication mechanisms must match the primary Common Name (CN), or Subject Alternative Name (SAN) of the certificate presented by the MSK broker, to avoid hostname validation errors.

The solution discussed in this post provides a way for you to use a custom domain name for clients to connect to their MSK clusters when using SASL/SCRAM (Simple Authentication and Security Layer/ Salted Challenge Response Mechanism) authentication only.

Solution overview

Network Load Balancers (NLBs) are a popular addition to the Amazon MSK architecture, along with AWS PrivateLink as a way to expose connectivity to an MSK cluster from other virtual private clouds (VPCs). For more details, see How Goldman Sachs builds cross-account connectivity to their Amazon MSK clusters with AWS PrivateLink. In this post, we run through how to use an NLB to enable the use of a custom domain name with Amazon MSK when using SASL/SCRAM authentication.

The following diagram shows all components used by the solution.

SASL/SCRAM uses TLS to encrypt the Kafka protocol traffic between the client and Kafka broker. To use a custom domain name, the client needs to be presented with a server certificate matching that custom domain name. As of this writing, it isn’t possible to modify the certificate used by the MSK brokers, so this solution uses an NLB to sit between the client and MSK brokers.

An NLB works at the connection layer (Layer 4) and routes the TCP or UDP protocol traffic. It doesn’t validate the application data being sent and forwards the Kafka protocol traffic. The NLB provides the ability to use a TLS listener, where a certificate is imported into AWS Certificate Manager (ACM) and associated with the listener and enables TLS negotiation between the client and the NLB. The NLB performs a separate TLS negotiation between itself and the MSK brokers. This NLB TLS negotiation to the target works exactly the same irrespective of whether certificates are signed by a public or private Certificate Authority (CA).

For the client to resolve DNS queries for the custom domain, an Amazon Route 53 private hosted zone is used to host the DNS records, and is associated with the client’s VPC to enable DNS resolution from the Route 53 VPC resolver.

Kafka listeners and advertised listeners

Kafka listeners (listeners) are the lists of addresses that Kafka binds to for listening. A Kafka listener is composed of a hostname or IP, port, and protocol: <protocol>://<hostname>:<port>.

The Kafka client uses the bootstrap address to connect to one of the brokers in the cluster and issues a metadata request. The broker provides a metadata response containing the address information of each broker that the client needs to connect to talk to these brokers. Advertised listeners (advertised.listeners) is a configuration option used by Kafka clients to connect to the brokers. By default, an advertised listener is not set. After it’s set, Kafka clients will use the advertised listener instead of listeners to obtain the connection information for brokers.

When Amazon MSK multi-VPC private connectivity is enabled, AWS sets the advertised.listeners configuration option to include the Amazon MSK multi-VPC DNS alias.

MSK brokers use the listener configuration to tell clients the DNS names to use to connect to the individual brokers for each authentication type enabled. Therefore, when clients are directed to use the custom domain name, you need to set a custom advertised listener for SASL/SCRAM authentication protocol. Advertised listeners are unique to each broker; the cluster won’t start if multiple brokers have the same advertised listener address.

Kafka bootstrap process and setup options

A Kafka client uses the bootstrap addresses to get the metadata from the MSK cluster, which in response provides the broker hostname and port (the listeners information by default or the advertised listener if it’s configured) that the client needs to connect to for subsequent requests. Using this information, the client connects to the appropriate broker for the topic or partition that it needs to send to or fetch from. The following diagram shows the default bootstrap and topic or partition connectivity between a Kafka client and MSK broker.

You have two options when using a custom domain name with Amazon MSK.

Option 1: Only a bootstrap connection through an NLB

You can use a custom domain name only for the bootstrap connection, where the advertised listeners are not set, so the client is directed to the default AWS cluster DNS name. This option is beneficial when the Kafka client has direct network connectivity to both the NLB and the MSK broker’s Elastic Network Interface (ENI). The following diagram illustrates this setup.

No changes are required to the MSK brokers, and the Kafka client has the custom domain set as the bootstrap address. The Kafka client uses the custom domain bootstrap address to send a get metadata request to the NLB. The NLB sends the Kafka protocol traffic received by the Kafka client to a healthy MSK broker’s ENI. That broker responds with metadata where only listeners is set, containing the default MSK cluster DNS name for each broker. The Kafka client then uses the default MSK cluster DNS name for the appropriate broker and connects to that broker’s ENI.

Option 2: All connections through an NLB

Alternatively, you can use a custom domain name for the bootstrap and the brokers, where the custom domain name for each broker is set in the advertised listeners configuration. You need to use this option when Kafka clients don’t have direct network connectivity to the MSK brokers ENI. For example, Kafka clients need to use an NLB, AWS PrivateLink, or Amazon MSK multi-VPC endpoints to connect to an MSK cluster. The following diagram illustrates this setup.

The advertised listeners are set to use the custom domain name, and the Kafka client has the custom domain set as the bootstrap address. The Kafka client uses the custom domain bootstrap address to send a get metadata request, which is sent to the NLB. The NLB sends the Kafka protocol traffic received by the Kafka client to a healthy MSK broker’s ENI. That broker responds with metadata where advertised listeners is set. The Kafka client uses the custom domain name for the appropriate broker, which directs the connection to the NLB, for the port set for that broker. The NLB sends the Kafka protocol traffic to that broker.

Network Load Balancer

The following diagram illustrates the NLB port and target configuration. A TLS listener with port 9000 is used for bootstrap connections with all MSK brokers set as targets. The listener uses TLS target type with target port as 9096. A TLS listener port is used to represent each broker in the MSK cluster. In this post, there are three brokers in the MSK cluster with TLS 9001, representing broker 1, up to TLS 9003, representing broker 3.

For all TLS listeners on the NLB, a single imported certificate with the domain name bootstrap.example.com is attached to the NLB. bootstrap.example.com is used as the Common Name (CN) so that the certificate is valid for the bootstrap address, and Subject Alternative Names (SANs) are set for all broker DNS names. If the certificate is issued by a private CA, clients need to import the root and intermediate CA certificates to the trust store. If the certificate is issued by a public CA, the root and intermediate CA certificates will be in the default trust store.

The following table shows the required NLB configuration.

NLB Listener Type NLB Listener Port Certificate NLB Target Type NLB Targets
TLS 9000 bootstrap.example.com TLS All Broker ENIs
TLS 9001 bootstrap.example.com TLS Broker 1
TLS 9002 bootstrap.example.com TLS Broker 2
TLS 9003 bootstrap.example.com TLS Broker 3

Domain Name System

For this post, a Route 53 private hosted zone is used to host the DNS records for the custom domain, in this case example.com. The private hosted zone is associated with the Amazon MSK VPC, to enable DNS resolution for the client that is launched in the same VPC. If your client is in a different VPC than the MSK cluster, you need to associate the private hosted zone with that client’s VPC.

The Route 53 private hosted zone is not a required part of the solution. The most crucial part is that the client can perform DNS resolution against the custom domain and get the required responses. You can instead use your organization’s existing DNS, a Route 53 public hosted zone or Route 53 inbound resolver to resolve Route 53 private hosted zones from outside of AWS, or an alternative DNS solution.

The following figure shows the DNS records used by the client to resolve to the NLB. We use bootstrap for the initial client connection, and use b-1, b-2, and b-3 to reference each broker’s name.

The following table lists the DNS records required for a three-broker MSK cluster when using a Route 53 private or public hosted zone.

Record Record Type Value
bootstrap A NLB Alias
b-1 A NLB Alias
b-2 A NLB Alias
b-3 A NLB Alias

The following table lists the DNS records required for a three-broker MSK cluster when using other DNS solutions.

Record Record Type Value
bootstrap C NLB DNS A Record (e.g. name-id.elb.region.amazonaws.com)
b-1 C NLB DNS A Record
b-2 C NLB DNS A Record
b-3 C NLB DNS A Record

In the following sections, we go through the steps to configure a custom domain name for your MSK cluster and clients connecting with the custom domain.

Prerequisites

To deploy the solution, you need the following prerequisites:

Launch the CloudFormation template

Complete the following steps to deploy the CloudFormation template:

  1. Choose Launch Stack.

  1. Provide the stack name as msk-custom-domain.
  2. For MSKClientUserName, enter the user name of the secret used for SASL/SCRAM authentication with Amazon MSK.
  3. For MSKClientUserPassword, enter the password of the secret used for SASL/SCRAM authentication with Amazon MSK.

The CloudFormation template will deploy the following resources:

Set up the EC2 instance

Complete the following steps to configure your EC2 instance:

  1. On the Amazon EC2 console, connect to the instance msk-custom-domain-KafkaClientInstance1 using Session Manager, a capability of AWS Systems Manager.
  2. Switch to ec2-user:
    sudo su - ec2-user 
    cd

  3. Run the following commands to configure the SASL/SCRAM client properties, create Kafka access control lists (ACLs), and create a topic named customer:
    . ./cloudformation_outputs.sh 
    aws configure set region $REGION 
    export BS=$(aws kafka get-bootstrap-brokers --cluster-arn ${MSKClusterArn} | jq -r '.BootstrapBrokerStringSaslScram') 
    export ZOOKEEPER=$(aws kafka describe-cluster --cluster-arn $MSKClusterArn | jq -r '.ClusterInfo.ZookeeperConnectString')
    ./configure_sasl_scram_properties_and_kafka_acl.sh

Create a certificate

For this post, we use self-signed certificates. However, it’s recommended to use either a public certificate or a certificate signed by your organization’s private key infrastructure (PKI).

If you’re are using an AWS private CA for the private key infrastructure, refer to Creating a private CA for instructions to create and install a private CA.

Use the openSSL command to create a self-signed certificate. Modify the following command, adding the country code, state, city, and company:

SSLCONFIG="[req]
prompt = no
distinguished_name = req_distinguished_name
x509_extensions = v3_ca

[req_distinguished_name]
C = <<Country_Code>>
ST = <<State>>
L = <<City>>
O = <<Company>>
OU = 
emailAddress = 
CN = botstrap.example.com

[v3_ca]
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alternate_names

[alternate_names]
DNS.1 = bootstrap.example.com
DNS.2 = b-1.example.com
DNS.3 = b-2.example.com
DNS.4 = b-3.example.com
"

openssl req -x509 -newkey rsa:2048 -days 365 -nodes \
    -config <(echo "$SSLCONFIG") \
    -keyout msk-custom-domain-pvt-key.pem \
    -out msk-custom-domain-certificate.pem  

You can check the created certificate using the following command:

openssl x509 -text -noout -in msk-custom-domain-certificate.pem

Import the certificate to ACM

To use the self-signed certificate for the solution, you need to import the certificate to ACM:

export CertificateARN=$(aws acm import-certificate --certificate file://msk-custom-domain-certificate.pem --private-key file://msk-custom-domain-pvt-key.pem | jq -r '.CertificateArn')

echo $CertificateARN

After it’s imported, you can see the certificate in ACM.

Import the certificate to the Kafka client trust store

For the client to validate the server SSL certificate during the TLS handshake, you need to import the self-signed certificate to the client’s trust store.

  1. Run the following command to use the JVM trust store to create your client trust store:
    cp /usr/lib/jvm/jre-1.8.0-openjdk/lib/security/cacerts /home/ec2-user/kafka.client.truststore.jks 
    chmod 700 kafka.client.truststore.jks

  2. Import the self-signed certificate to the trust store by using the following command. Provide the keystore password as changeit.
    /usr/lib/jvm/jre-1.8.0-openjdk/bin/keytool -import \ 
    	-trustcacerts \ 
    	-noprompt \ 
    	-alias msk-cert \ 
    	-file msk-custom-domain-certificate.pem \ 
    	-keystore kafka.client.truststore.jks

  3. You need to include the trust store certificate location config properties used by Kafka clients to enable certification validation:
    echo 'ssl.truststore.location=/home/ec2-user/kafka.client.truststore.jks' >> /home/ec2-user/kafka/config/client_sasl.properties

Set up DNS resolution for clients within the VPC

To set up DNS resolution for clients, create a private hosted zone for the domain and associate the hosted zone with the VPC where the client is deployed:

aws route53 create-hosted-zone \
--name example.com \
--caller-reference "msk-custom-domain" \
--hosted-zone-config Comment="Private Hosted Zone for MSK",PrivateZone=true \
--vpc VPCRegion=$REGION,VPCId=$MSKVPCId

export HostedZoneId=$(aws route53 list-hosted-zones-by-vpc --vpc-id $MSKVPCId --vpc-region $REGION | jq -r '.HostedZoneSummaries[0].HostedZoneId')

Create EC2 target groups

Target groups route requests to individual registered targets, such as EC2 instances, using the protocol and port number that you specify. You can register a target with multiple target groups and you can register multiple targets to one target group.

For this post, you need four target groups: one for each broker instance and one that will point to all the brokers and will be used by clients for Amazon MSK connection bootstrapping.

The target group will receive traffic on port 9096 (SASL/SCRAM authentication) and will be associated with the Amazon MSK VPC:

aws elbv2 create-target-group \
    --name b-all-bootstrap \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId
    
aws elbv2 create-target-group \
    --name b-1 \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId
    
aws elbv2 create-target-group \
    --name b-2 \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId
    
aws elbv2 create-target-group \
    --name b-3 \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId

Register target groups with MSK broker IPs

You need to associate each target group with the broker instance (target) in the MSK cluster so that the traffic going through the target group can be routed to the individual broker instance.

Complete the following steps:

  1. Get the MSK broker hostnames:
echo $BS

This should show the brokers, which are part of bootstrap address. The hostname of broker 1 looks like the following code:

b-1.mskcustomdomaincluster.xxxxx.yy.kafka.region.amazonaws.com

To get the hostname of other brokers in the cluster, replace b-1 with values like b-2, b-3, and so on. For example, if you have six brokers in the cluster, you will have six broker hostnames starting with b-1 to b-6.

  1. To get the IP address of the individual brokers, use the nslookup command:
nslookup b-1.mskcustomdomaincluster.xxxxx.yy.kafka.region.amazonaws.com Server: 172.16.0.2
Address: 172.16.0.2#53

Non-authoritative answer:
Name: b-1.mskcustomdomaincluster.xxxxx.yy.kafka.region.amazonaws.com
Address: 172.16.1.225
  1. Modify the following commands with the IP addresses of each broker to create an environment variable that will be used later:
export B1=<<b-1_IP_Address>> 
export B2=<<b-2_IP_Address>> 
export B3=<<b-3_IP_Address>>

Next, you need to register the broker IP with the target group. For broker b-1, you will register the IP address with target group b-1.

  1. Provide the target group name b-1 to get the target group ARN. Then register the broker IP address with the target group.
export TARGET_GROUP_B_1_ARN=$(aws elbv2 describe-target-groups --names b-1 | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
--target-group-arn ${TARGET_GROUP_B_1_ARN} \
--targets Id=$B1
  1. Iterate the steps of obtaining the IP address from other broker hostnames and register the IP address with the corresponding target group for brokers b-2 and b-3:
B-2
export TARGET_GROUP_B_2_ARN=$(aws elbv2 describe-target-groups --names b-2 | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
    --target-group-arn ${TARGET_GROUP_B_2_ARN} \
    --targets Id=$B2
B-3
export TARGET_GROUP_B_3_ARN=$(aws elbv2 describe-target-groups --names b-3 | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
    --target-group-arn ${TARGET_GROUP_B_3_ARN} \
    --targets Id=$B3
  1. Also, you need to register all three broker IP addresses with the target group b-all-bootstrap. This target group will be used for routing the traffic for the Amazon MSK client connection bootstrap process.
export TARGET_GROUP_B_ALL_ARN=$(aws elbv2 describe-target-groups --names b-all-bootstrap | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
--target-group-arn ${TARGET_GROUP_B_ALL_ARN} \
--targets Id=$B1 Id=$B2 Id=$B3

Set up NLB listeners

Now that you have the target groups created and certificate imported, you’re ready to create the NLB and listeners.

Create the NLB with the following code:

aws elbv2 create-load-balancer \
--name msk-nlb-internal \
--scheme internal \
--type network \
--subnets $MSKVPCPrivateSubnet1 $MSKVPCPrivateSubnet2 $MSKVPCPrivateSubnet3 \
--security-groups $NLBSecurityGroupId

export NLB_ARN=$(aws elbv2 describe-load-balancers --names msk-nlb-internal | jq -r '.LoadBalancers[0].LoadBalancerArn')

Next, you configure the listeners that will be used by the clients to communicate with the MSK cluster. You need to create four listeners, one for each target group for ports 9000–9003. The following table lists the listener configurations.

Protocol Port Certificate NLB Target Type NLB Targets
TLS 9000 bootstrap.example.com TLS b-all-bootstrap
TLS 9001 bootstrap.example.com TLS b-1
TLS 9002 bootstrap.example.com TLS b-2
TLS 9003 bootstrap.example.com TLS b-3

Use the following code for port 9000:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9000 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_ALL_ARN

Use the following code for port 9001:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9001 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_1_ARN

Use the following code for port 9002:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9002 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_2_ARN

Use the following code for port 9003:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9003 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_3_ARN

Enable cross-zone load balancing

By default, cross-zone load balancing is disabled on NLBs. When disabled, each load balancer node distributes traffic to healthy targets in the same Availability Zone. For example, requests that come into the load balancer node in Availability Zone A will only be forwarded to a healthy target in Availability Zone A. If the only healthy target or the only registered target associated to an NLB listener is in another Availability Zone than the load balancer node receiving the traffic, the traffic is dropped.

Because the NLB has the bootstrap listener that is associated with a target group that has all brokers registered across multiple Availability Zones, Route 53 will respond to DNS queries against the NLB DNS name with the IP address of NLB ENIs in Availability Zones with healthy targets.

When the Kafka client tries to connect to a broker through the broker’s listener on the NLB, there will be a noticeable delay in receiving a response from the broker as the client tries to connect to the broker using all IPs returned by Route 53.

Enabling cross-zone load balancing distributes the traffic across the registered targets in all Availability Zones.

aws elbv2 modify-load-balancer-attributes --load-balancer-arn $NLB_ARN --attributes Key=load_balancing.cross_zone.enabled,Value=true

Create DNS A records in a private hosted zone

Create DNS A records to route the traffic to the network load balancer. The following table lists the records.

Record Record Type Value
bootstrap A NLB Alias
b-1 A NLB Alias
b-2 A NLB Alias
b-3 A NLB Alias

Alias record types will be used, so you need the NLB’s DNS name and hosted zone ID:

export NLB_DNS=$(aws elbv2 describe-load-balancers --names msk-nlb-internal | jq -r '.LoadBalancers[0].DNSName')

export NLB_ZoneId=$(aws elbv2 describe-load-balancers --names msk-nlb-internal | jq -r '.LoadBalancers[0].CanonicalHostedZoneId')

Create the bootstrap record, and then repeat this command to create the b-1, b-2, and b-3 records, modifying the Name field:

aws route53 change-resource-record-sets \
--hosted-zone-id $HostedZoneId \
--change-batch file://<(cat << EOF
{
   "Comment": "Create bootstrap record",
   "Changes": [{
      "Action": "CREATE",
      "ResourceRecordSet": {
         "Name": "bootstrap.example.com",
         "Type": "A",
         "AliasTarget": {
            "HostedZoneId": "$NLB_ZoneId",
            "DNSName": "$NLB_DNS",
            "EvaluateTargetHealth": true
         }
      }
   }]
}
EOF)

Optionally, to optimize cross-zone data charges, you can set b-1, b-2, and b-3 to the IP address of the NLB’s ENI that is in the same Availability Zone as each broker. For example, if b-2 is using an IP address that is in subnet 172.16.2.0/24, which is in Availability Zone A, you should use the NLB ENI that is in the same Availability Zone as the value for the DNS record.

The next step details how to use a custom domain name for bootstrap connectivity only. If all Kafka traffic needs to go through the NLB, as discussed earlier, proceed to the subsequent section to set up advertised listeners.

Configure the advertised listener in the MSK cluster

To get the listener details for broker 1, you provide entity-type as brokers and entity-name as 1 for the broker ID:

/home/ec2-user/kafka/bin/kafka-configs.sh --bootstrap-server $BS \
--entity-type brokers \
--entity-name 1 \
--command-config ~/kafka/config/client_sasl.properties \
--all \
--describe | grep 'listeners=CLIENT_SASL_SCRAM'

You will get an output like the following:

Listeners=CLIENT_SASL_SCRAM://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9096,CLIENT_SECURE://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9094,REPLICATION://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE:// b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9095 sensitive=false synonyms={STATIC_BROKER_CONFIG:listeners=CLIENT_SASL_SCRAM://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9096,CLIENT_SECURE://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9094,REPLICATION://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE:// b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9095}

Going forward, clients will connect through the custom domain name. Therefore, you need to configure the advertised listeners to the custom domain hostname and port. For this, you need to copy the listener details and change the CLIENT_SASL_SCRAM listener to b-1.example.com:9001.

While you’re configuring the advertised listener, you also need to preserve the information about other listener types in the advertised listener because inter-broker communications also use the addresses in the advertised listener.

Based on our configuration, the advertised listener for broker 1 will look like the following code, with everything after sensitive=false removed:

CLIENT_SASL_SCRAM://b-1.example.com:9001,REPLICATION://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9095

Modify the following command as follows:

  • <<BROKER_NUMBER>> – Set to the broker ID being changed (for example, 1 for broker 1)
  • <<PORT_NUMBER>> – Set to the port number corresponding to broker ID (for example, 9001 for broker 1)
  • <<REPLICATION_DNS_NAME>> – Set to the DNS name for REPLICATION
  • <<REPLICATION_SECURE_DNS_NAME>> – Set to the DNS name for REPLICATION_SECURE
/home/ec2-user/kafka/bin/kafka-configs.sh --alter \
--bootstrap-server $BS \
--entity-type brokers \
--entity-name <<BROKER_NUMBER>> \
--command-config ~/kafka/config/client_sasl.properties \
--add-config advertised.listeners=[CLIENT_SASL_SCRAM://b-<<BROKER_NUMBER>>.example.com:<<PORT_NUMBER>>,REPLICATION://<<REPLICATION_DNS_NAME>>:9093,REPLICATION_SECURE://<<REPLICATION_SECURE_DNS_NAME>>:9095]

The command should look something like the following example:

/home/ec2-user/kafka/bin/kafka-configs.sh --alter \
--bootstrap-server $BS \
--entity-type brokers \
--entity-name 1 \
--command-config ~/kafka/config/client_sasl.properties \
--add-config advertised.listeners=[CLIENT_SASL_SCRAM://b-1.example.com:9001,REPLICATION://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9095]

Run the command to add the advertised listener for broker 1.

You need to get the listener details for the other brokers and configure the advertised.listener for each.

Test the setup

Set the bootstrap address to the custom domain. This is the A record created in the private hosted zone.

export BS=bootstrap.example.com:9000

List the MSK topics using the custom domain bootstrap address:

/home/ec2-user/kafka/bin/kafka-topics.sh --list \
--bootstrap-server $BS \
--command-config=/home/ec2-user/kafka/config/client_sasl.properties

You should see the topic customer.

Clean up

To stop incurring costs, it’s recommended to manually delete the private hosted zone, NLB, target groups, and imported certificate in ACM. Also, delete the CloudFormation stack to remove any resources provisioned by CloudFormation.

Use the following code to manually delete the aforementioned resources:

aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "bootstrap.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "b-1.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "b-2.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "b-3.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 delete-hosted-zone --id $HostedZoneId
aws elbv2 delete-load-balancer --load-balancer-arn $NLB_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_ALL_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_1_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_2_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_3_ARN

You need to wait up to 5 minutes for the completion of the NLB deletion:

aws acm delete-certificate --certificate-arn $CertificateARN

Now you can delete the CloudFormation stack.

Summary

This post explains how you can use an NLB, Route 53, and the advertised listener configuration option in Amazon MSK to support custom domain names with MSK clusters when using SASL/SCRAM authentication. You can use this solution to keep your existing Kafka bootstrap DNS name and reduce or remove the need to change client applications because of a migration, recovery process, or multi-cluster high availability. You can also use this solution to have the MSK bootstrap and broker names under your custom domain, enabling you to bring the DNS name in line with your naming convention (for example, msk.prod.example.com).

Try the solution out for yourself, and leave your questions and feedback in the comments section.


About the Authors

Subham Rakshit is a Senior Streaming Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build streaming architectures so they can get value from analyzing their streaming data. His two little daughters keep him occupied most of the time outside work, and he loves solving jigsaw puzzles with them. Connect with him on LinkedIn.

Mark Taylor is a Senior Technical Account Manager at Amazon Web Services, working with enterprise customers to implement best practices, optimize AWS usage, and address business challenges. Prior to joining AWS, Mark spent over 16 years in networking roles across industries, including healthcare, government, education, and payments. Mark lives in Folkestone, England, with his wife and two dogs. Outside of work, he enjoys watching and playing football, watching movies, playing board games, and traveling.

Stream multi-tenant data with Amazon MSK

Post Syndicated from Emanuele Levi original https://aws.amazon.com/blogs/big-data/stream-multi-tenant-data-with-amazon-msk/

Real-time data streaming has become prominent in today’s world of instantaneous digital experiences. Modern software as a service (SaaS) applications across all industries rely more and more on continuously generated data from different data sources such as web and mobile applications, Internet of Things (IoT) devices, social media platforms, and ecommerce sites. Processing these data streams in real time is key to delivering responsive and personalized solutions, and maximizes the value of data by processing it as close to the event time as possible.

AWS helps SaaS vendors by providing the building blocks needed to implement a streaming application with Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), and real-time processing applications with Amazon Managed Service for Apache Flink.

In this post, we look at implementation patterns a SaaS vendor can adopt when using a streaming platform as a means of integration between internal components, where streaming data is not directly exposed to third parties. In particular, we focus on Amazon MSK.

Streaming multi-tenancy patterns

When building streaming applications, you should take the following dimensions into account:

  • Data partitioning – Event streaming and storage needs to be isolated at the appropriate level, physical or logical, based on tenant ownership
  • Performance fairness – The performance coupling of applications processing streaming data for different tenants must be controlled and limited
  • Tenant isolation – A solid authorization strategy needs to be put in place to make sure tenants can access only their data

Underpinning all interactions with a multi-tenant system is the concept of SaaS identity. For more information, refer to SaaS Architecture Fundamentals.

SaaS deployment models

Tenant isolation is not optional for SaaS providers, and tenant isolation approaches will differ depending on your deployment model. The model is influenced by business requirements, and the models are not mutually exclusive. Trade-offs must be weighed across individual services to achieve a proper balance of isolation, complexity, and cost. There is no universal solution, and a SaaS vendor needs to carefully weigh their business and customer needs against three isolation strategies: silo, pool and bridge (or combinations thereof).

In the following sections, we explore these deployment models across data isolation, performance fairness, and tenant isolation dimensions.

Silo model

The silo model represents the highest level of data segregation, but also the highest running cost. Having a dedicated MSK cluster per tenant increases the risk of overprovisioning and requires duplication of management and monitoring tooling.

Having a dedicated MSK cluster per tenant makes sure tenant data partitioning occurs at the disk level when using an Amazon MSK Provisioned model. Both Amazon MSK Provisioned and Serverless clusters support server-side encryption at rest. Amazon MSK Provisioned further allows you to use a customer managed AWS Key Management Service (AWS KMS) key (see Amazon MSK encryption).

In a silo model, Kafka ACL and quotas is not strictly required unless your business requirements require them. Performance fairness is guaranteed because only a single tenant will be using the resources of the entire MSK cluster and are dedicated to applications producing and consuming events of a single tenant. This means spikes of traffic on a specific tenant can’t impact other tenants, and there is no risk of cross-tenant data access. As a drawback, having a provisioned cluster per tenant requires a right-sizing exercise per tenant, with a higher risk of overprovisioning than in the pool or bridge models.

You can implement tenant isolation the MSK cluster level with AWS Identity and Access Management (IAM) policies, creating per-cluster credentials, depending on the authentication scheme in use.

Pool model

The pool model is the simplest model where tenants share resources. A single MSK cluster is used for all tenants with data split into topics based on the event type (for example, all events related to orders go to the topic orders), and all tenant’s events are sent to the same topic. The following diagram illustrates this architecture.

Image showing a single streaming topic with multiple producers and consumers

This model maximizes operational simplicity, but reduces the tenant isolation options available because the SaaS provider won’t be able to differentiate per-tenant operational parameters and all responsibilities of isolation are delegated to the applications producing and consuming data from Kafka. The pool model also doesn’t provide any mechanism of physical data partitioning, nor performance fairness. A SaaS provider with these requirements should consider either a bridge or silo model. If you don’t have requirements to account for parameters such as per-tenant encryption keys or tenant-specific data operations, a pool model offers reduced complexity and can be a viable option. Let’s dig deeper into the trade-offs.

A common strategy to implement consumer isolation is to identify the tenant within each event using a tenant ID. The options available with Kafka are passing the tenant ID either as event metadata (header) or part of the payload itself as an explicit field. With this approach, the tenant ID will be used as a standardized field across all applications within both the message payload and the event header. This approach can reduce the risk of semantic divergence when components process and forward messages because event headers are handled differently by different processing frameworks and could be stripped when forwarded. Conversely, the event body is often forwarded as a single object and no contained information is lost unless the event is explicitly transformed. Including the tenant ID in the event header as well may simplify the implementation of services allowing you to specify tenants that need to be recovered or migrated without requiring the provider to deserialize the message payload to filter by tenant.

When specifying the tenant ID using either a header or as a field in the event, consumer applications will not be able to selectively subscribe to the events of a specific tenant. With Kafka, a consumer subscribes to a topic and receives all events sent to that topic of all tenants. Only after receiving an event will the consumer will be able to inspect the tenant ID to filter the tenant of interest, making access segregation virtually impossible. This means sensitive data must be encrypted to make sure a tenant can’t read another tenant’s data when viewing these events. In Kafka, server-side encryption can only be set at the cluster level, where all tenants sharing a cluster will share the same server-side encryption key.

In Kafka, data retention can only be set on the topic. In the pool model, events belonging to all tenants are sent to the same topic, so tenant-specific operations like deleting all data for a tenant will not be possible. The immutable, append-only nature of Kafka only allows an entire topic to be deleted, not selective events belonging to a specific tenant. If specific customer data in the stream requires the right to be forgotten, such as for GDPR, a pool model will not work for that data and silo should be considered for that specific data stream.

Bridge model

In the bridge model, a single Kafka cluster is used across all tenants, but events from different tenants are segregated into different topics. With this model, there is a topic for each group of related events per tenant. You can simplify operations by adopting a topic naming convention such as including the tenant ID in the topic name. This will practically create a namespace per tenant, and also allows different administrators to manage different tenants, setting permissions with a prefix ACL, and avoiding naming clashes (for example, events related to orders for tenant 1 go to tenant1.orders and orders of tenant 2 go to tenant2.orders). The following diagram illustrates this architecture.

Image showing multiple producers and consumers each publishing to a stream-per-tenant

With the bridge model, server-side encryption using a per-tenant key is not possible. Data from different tenants is stored in the same MSK cluster, and server-side encryption keys can be specified per cluster only. For the same reason, data segregation can only be achieved at file level, because separate topics are stored in separate files. Amazon MSK stores all topics within the same Amazon Elastic Block Store (Amazon EBS) volume.

The bridge model offers per-tenant customization, such as retention policy or max message size, because Kafka allows you to set these parameters per topic. The bridge model also simplifies segregating and decoupling event processing per tenant, allowing a stronger isolation between separate applications that process data of separate tenants.

To summarize, the bridge model offers the following capabilities:

  • Tenant processing segregation – A consumer application can selectively subscribe to the topics belonging to specific tenants and only receive events for those tenants. A SaaS provider will be able to delete data for specific tenants, selectively deleting the topics belonging to that tenant.
  • Selective scaling of the processing – With Kafka, the maximum number of parallel consumers is determined by the number of partitions of a topic, and the number of partitions can be set per topic, and therefore per tenant.
  • Performance fairness – You can implement performance fairness using Kafka quotas, supported by Amazon MSK, preventing the services processing a particularly busy tenant to consume too many cluster resources, at the expense of other tenants. Refer to the following two-part series for more details on Kafka quotas in Amazon MSK, and an example implementation for IAM authentication.
  • Tenant isolation – You can implement tenant isolation using IAM access control or Apache Kafka ACLs, depending on the authentication scheme that is used with Amazon MSK. Both IAM and Kafka ACLs allow you to control access per topic. You can authorize an application to access only the topics belonging to the tenant it is supposed to process.

Trade-offs in a SaaS environment

Although each model provides different capabilities for data partitioning, performance fairness, and tenant isolation, they also come with different costs and complexities. During planning, it’s important to identify what trade-offs you are willing to make for typical customers, and provide a tier structure to your client subscriptions.

The following table summarizes the supported capabilities of the three models in a streaming application.

. Pool Bridge Silo
Per-tenant encryption at rest No No Yes
Can implement right to be forgotten for single tenant No Yes Yes
Per-tenant retention policies No Yes Yes
Per-tenant event size limit No Yes Yes
Per-tenant replayability Yes (must implement with logic in consumers) Yes Yes

Anti-patterns

In the bridge model, we discussed tenant segregation by topic. An alternative would be segregating by partition, where all messages of a given type are sent to the same topic (for example, orders), but each tenant has a dedicated partition. This approach has many disadvantages and we strongly discourage it. In Kafka, partitions are the unit of horizontal scaling and balancing of brokers and consumers. Assigning partitions per tenants can introduce unbalancing of the cluster, and operational and performance issues that will be hard to overcome.

Some level of data isolation, such as per-tenant encryption keys, could be achieved using client-side encryption, delegating any encryption or description to the producer and consumer applications. This approach would allow you to use a separate encryption key per tenant. We don’t recommend this approach because it introduces a higher level of complexity in both the consumer and producer applications. It may also prevent you from using most of the standard programming libraries, Kafka tooling, and most Kafka ecosystem services, like Kafka Connect or MSK Connect.

Conclusion

In this post, we explored three patterns that SaaS vendors can use when architecting multi-tenant streaming applications with Amazon MSK: the pool, bridge, and silo models. Each model presents different trade-offs between operational simplicity, tenant isolation level, and cost efficiency.

The silo model dedicates full MSK clusters per tenant, offering a straightforward tenant isolation approach but incurring a higher maintenance and cost per tenant. The pool model offers increased operational and cost-efficiencies by sharing all resources across tenants, but provides limited data partitioning, performance fairness, and tenant isolation capabilities. Finally, the bridge model offers a good compromise between operational and cost-efficiencies while providing a good range of options to create robust tenant isolation and performance fairness strategies.

When architecting your multi-tenant streaming solution, carefully evaluate your requirements around tenant isolation, data privacy, per-tenant customization, and performance guarantees to determine the appropriate model. Combine models if needed to find the right balance for your business. As you scale your application, reassess isolation needs and migrate across models accordingly.

As you’ve seen in this post, there is no one-size-fits-all pattern for streaming data in a multi-tenant architecture. Carefully weighing your streaming outcomes and customer needs will help determine the correct trade-offs you can make while making sure your customer data is secure and auditable. Continue your learning journey on SkillBuilder with our SaaS curriculum, get hands-on with an AWS Serverless SaaS workshop or Amazon EKS SaaS workshop, or dive deep with Amazon MSK Labs.


About the Authors

Emmanuele Levi is a Solutions Architect in the Enterprise Software and SaaS team, based in London. Emanuele helps UK customers on their journey to refactor monolithic applications into modern microservices SaaS architectures. Emanuele is mainly interested in event-driven patterns and designs, especially when applied to analytics and AI, where he has expertise in the fraud-detection industry.

Lorenzo Nicora is a Senior Streaming Solution Architect helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working across industries, in consultancies and product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Nicholas Tunney is a Senior Partner Solutions Architect for Worldwide Public Sector at AWS. He works with Global SI partners to develop architectures on AWS for clients in the government, nonprofit healthcare, utility, and education sectors.  He is also a core member of the SaaS Technical Field Community where he gets to meet clients from all over the world who are building SaaS on AWS.

AWS Weekly Roundup: New AWS Heroes, Amazon API Gateway, Amazon Q and more (June 10, 2024)

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-new-aws-heroes-amazon-api-gateway-amazon-q-and-more-june-10-2024/

In the last AWS Weekly Roundup, Channy reminded us on how life has ups and downs. It’s just how life is. But, that doesn’t mean that we should do it alone. Farouq Mousa, AWS Community Builder, is fighting brain cancer and Allen Helton, AWS Serverless Hero, his daughter is fighting leukemia.

If you have a moment, please visit their campaign pages and give your support.

Meanwhile, we’ve just finished a few AWS Summits in India, Korea and also Thailand. As always, I had so much fun working together at Developer Lounge with AWS Heroes, AWS Community Builders, and AWS User Group leaders. Here’s a photo from everyone here.

Last Week’s Launches
Here are some launches that caught my attention last week:

Welcome, new AWS Heroes! — Last week, we just announced new cohort for AWS Heroes, worldwide group of AWS experts who go above and beyond to share knowledge and empower their communities.

Amazon API Gateway increased integration timeout limit — If you’re using Regional REST APIs and private REST APIs in Amazon API Gateway, now you can increase the integration timeout limit greater than 29 seconds. This allows you to run various workloads requiring longer timeouts.

Amazon Q offers inline completion in the command line — Now, Amazon Q Developer provides real-time AI-generated code suggestions as you type in your command line. As a regular command line interface (CLI) user, I’m really excited about this.

New common control library in AWS Audit Manager — This announcement helps you to save time when mapping enterprise controls into AWS Audit Manager. Check out Danilo’s post where he elaborated how that you can simplify risk and complicance assessment with the new common control library.

Amazon Inspector container image scanning for Amazon CodeCatalyst and GitHub actions — If you need to integrate your CI/CD with software vulnerabilities checking, you can use Amazon Inspector. Now, with this native integration in GitHub actions and Amazon CodeCatalyst, it streamlines your development pipeline process.

Ingest streaming data with Amazon OpenSearch Ingestion and Amazon Managed Streaming for Apache Kafka — With this new capability, now you can build more efficient data pipelines for your complex analytics use cases. Now, you can seamlessly index the data from your Amazon MSK Serverless clusters in Amazon OpenSearch service.

Amazon Titan Text Embeddings V2 now available in Amazon Bedrock Knowledge Base — You now can embed your data into a vector database using Amazon Titan Text Embeddings V2. This will be helpful for you to retrieve relevant information for various tasks.

Max tokens 8,192
Languages 100+ in pre-training
Fine-tuning supported No
Normalization supported Yes
Vector size 256, 512, 1,024 (default)

From Community.aws
Here’s my 3 personal favorites posts from community.aws:

Upcoming AWS events
Check your calendars and sign up for these AWS and AWS Community events:

  • AWS Summits — Join free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Register in your nearest city: Japan (June 20), Washington, DC (June 26–27), and New York (July 10).

  • AWS re:Inforce — Join us for AWS re:Inforce (June 10–12) in Philadelphia, PA. AWS re:Inforce is a learning conference focused on AWS security solutions, cloud security, compliance, and identity. Connect with the AWS teams that build the security tools and meet AWS customers to learn about their security journeys.

  • AWS Community Days — Join community-led conferences that feature technical discussions, workshops, and hands-on labs led by expert AWS users and industry leaders from around the world: Midwest | Columbus (June 13), Sri Lanka (June 27), Cameroon (July 13), New Zealand (August 15), Nigeria (August 24), and New York (August 28).

You can browse all upcoming in-person and virtual events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

Donnie

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Introducing support for Apache Kafka on Raft mode (KRaft) with Amazon MSK clusters

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/introducing-support-for-apache-kafka-on-raft-mode-kraft-with-amazon-msk-clusters/

Organizations are adopting Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to capture and analyze data in real time. Amazon MSK helps you build and run production applications on Apache Kafka without needing Kafka infrastructure management expertise or having to deal with the complex overhead associated with setting up and running Apache Kafka on your own. Since its inception, Apache Kafka has depended on Apache Zookeeper for storing and replicating the metadata of Kafka brokers and topics. Starting from Apache Kafka version 3.3, the Kafka community has adopted KRaft (Apache Kafka on Raft), a consensus protocol, to replace Kafka’s dependency on ZooKeeper for metadata management. In the future, the Apache Kafka community plans to remove the ZooKeeper mode entirely.

Today, we’re excited to launch support for KRaft on new clusters on Amazon MSK starting from version 3.7. In this post, we walk you through some details around how KRaft mode helps over the ZooKeeper approach. We also guide you through the process of creating MSK clusters with KRaft mode and how to connect your application to MSK clusters with KRaft mode.

Why was ZooKeeper replaced with KRaft mode

The traditional Kafka architecture relies on ZooKeeper as the authoritative source for cluster metadata. Read and write access to metadata in ZooKeeper is funneled through a single Kafka controller. For clusters with a large number of partitions, this architecture can create a bottleneck during scenarios such as an uncontrolled broker shutdown or controller failover, due to a single-controller approach.

KRaft mode addresses these limitations by managing metadata within the Kafka cluster itself. Instead of relying on a separate ZooKeeper cluster, KRaft mode stores and replicates the cluster metadata across multiple Kafka controller nodes, forming a metadata quorum. The KRaft controller nodes comprise a Raft quorum that manages the Kafka metadata log. By distributing the metadata management responsibilities across multiple controller nodes, KRaft mode improves recovery time for scenarios such as uncontrolled broker shutdown or controller failover. For more details on KRaft mode and its implementation, refer to the KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum.

The following figure compares the three-node MSK cluster architecture with ZooKeeper vs. KRaft mode.

Amazon MSK with KRaft mode

Until now, Amazon MSK has supported Kafka clusters that rely on ZooKeeper for metadata management. One of the key benefits of Amazon MSK is that it handles the complexity of setting up and managing the ZooKeeper cluster at no additional cost. Many organizations use Amazon MSK to run large, business-critical streaming applications that require splitting their traffic across thousands of partitions. As the size of a Kafka cluster grows, the amount of metadata generated within the cluster increases proportionally to the number of partitions.

Two key properties govern the number of partitions a Kafka cluster can support: the per-node partition count limit and the cluster-wide partition limit. As mentioned earlier, the metadata management system based on ZooKeeper imposed a bottleneck on the cluster-wide partition limitation in Apache Kafka. However, with the introduction of KRaft mode in Amazon MSK starting with version 3.7, Amazon MSK now enables the creation of clusters with up to 60 brokers vs. the default quota of 30 brokers in ZooKeeper mode. Kafka’s scalability still fundamentally relies on expanding the cluster by adding more nodes to increase overall capacity. Consequently, the cluster-wide partition limit continues to define the upper bounds of scalability within the Kafka system, because it determines the maximum number of partitions that can be distributed across the available nodes. Amazon MSK manages the KRaft controller nodes at no additional cost.

Create and access an MSK cluster with KRaft mode

Complete the following steps to configure an MSK cluster with KRaft mode:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Choose Create cluster.
  3. For Cluster creation method, select Custom create.
  4. For Cluster name, enter a name.
  5. For Cluster type¸ select Provisioned.
  6. For Apache Kafka version, choose 3.7.x.
  7. For Metadata mode, select KRaft.
  8. Leave the other settings as default and choose Create cluster.

When the cluster creation is successful, you can navigate to the cluster and choose View client integration information, which will provide details about the cluster bootstrap servers.

Adapt your client applications and tools for accessing MSK clusters with KRaft mode

With the adoption of KRaft mode in Amazon MSK, customers using client applications and tools that connect to ZooKeeper to interact with MSK clusters will need to update them to reflect the removal of ZooKeeper from the architecture. Starting with version 1.0, Kafka introduced the ability for admin tools to use the bootstrap servers (brokers) as input parameters instead of a ZooKeeper connection string, and started deprecating ZooKeeper connection strings starting with version 2.5. This change was part of the efforts to decouple Kafka from ZooKeeper and pave the way for its eventual replacement with KRaft mode for metadata management. Instead of specifying the ZooKeeper connection string, clients will need to use the bootstrap.servers configuration option to connect directly to the Kafka brokers. The following table summarizes these changes.

. With Zookeeper With KRaft
Client and Services bootstrap.servers=broker:<port> or zookeeper.connect=zookeeper:2181 (deprecated) bootstrap.servers=broker:<port>
Admin Tools kafka-topics --zookeeper zookeeper:2181 (deprecated) or kafka-topics —bootstrap-server broker:<port> … —command-config kafka-topics —bootstrap-server broker:<port> … —command-config

Summary

In this post, we discussed how Amazon MSK has launched support for KRaft mode for metadata management. We also described how KRaft works and how it’s different from ZooKeeper.

To get started, create a new cluster with KRaft mode using the AWS Management Console, and refer to the Amazon MSK Developer Guide for more information.


About the author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Safely remove Kafka brokers from Amazon MSK provisioned clusters

Post Syndicated from Vidhi Taneja original https://aws.amazon.com/blogs/big-data/safely-remove-kafka-brokers-from-amazon-msk-provisioned-clusters/

Today, we are announcing broker removal capability for Amazon Managed Streaming for Apache Kafka (Amazon MSK) provisioned clusters, which lets you remove multiple brokers from your provisioned clusters. You can now reduce your cluster’s storage and compute capacity by removing sets of brokers, with no availability impact, data durability risk, or disruption to your data streaming applications. Amazon MSK is a fully managed Apache Kafka service that makes it easy for developers to build and run highly available, secure, and scalable streaming applications. Administrators can optimize the costs of their Amazon MSK clusters by reducing broker count and adapting the cluster capacity to the changes in the streaming data demand, without affecting their clusters’ performance, availability, or data durability.

You can use Amazon MSK as a core foundation to build a variety of real-time streaming applications and high-performance event-driven architectures. As business needs and traffic patterns change, cluster capacity is often adjusted to optimize costs. Amazon MSK provides flexibility and elasticity for administrators to right-size MSK clusters. You can increase broker count or the broker size to manage the surge in traffic during peak events or decrease the instance size of brokers of the cluster to reduce capacity. However, to reduce the broker count, earlier you had to undertake effort-intensive migration to another cluster.

With the broker removal capability, you can now remove multiple brokers from your provisioned clusters to meet the varying needs of your streaming workloads. During and post broker removal, the cluster continues to handle read and write requests from the client applications. MSK performs the necessary validations to safeguard against data durability risks and gracefully removes the brokers from the cluster. By using broker removal capability, you can precisely adjust MSK cluster capacity, eliminating the need to change the instance size of every broker in the cluster or having to migrate to another cluster to reduce broker count.

How the broker removal feature works

Before you execute the broker removal operation, you must make some brokers eligible for removal by moving all partitions off of them. You can use Kafka admin APIs or Cruise Control to move partitions to other brokers that you intend to retain in the cluster.

You choose which brokers to remove and move the partitions from those brokers to other brokers using Kafka tools. Alternatively, you may have brokers that are not hosting any partitions. Then use Edit number of brokers feature using the AWS Management Console, or the Amazon MSK API UpdateBrokerCount. Here are details on how you can use this new feature:

  • You can remove a maximum of one broker per Availability Zone (AZ) in a single broker removal operation. To remove more brokers, you can call multiple broker removal operations consecutively after the prior operation has been completed. You must retain at least one broker per AZ in your MSK cluster.
  • The target number of broker nodes in the cluster must be a multiple of the number of availability zones (AZs) in the client subnets parameter. For example, a cluster with subnets in two AZs must have a target number of nodes that is a multiple of two.
  • If the brokers you removed were present in the bootstrap broker string, MSK will perform the necessary routing so that the client’s connectivity to the cluster is not disrupted. You don’t need to make any client changes to change your bootstrap strings.
  • You can add brokers back to your cluster anytime using AWS Console, or the UpdateBrokerCount API.
  • Broker removal is supported on Kafka versions 2.8.1 and above. If you have clusters in lower versions, you must first upgrade to version 2.8.1 or above and then remove brokers.
  • Broker removal doesn’t support the t3.small instance type.
  • You will stop incurring costs for the removed brokers once the broker removal operation is completed successfully.
  • When brokers are removed from a cluster, their associated local storage is removed as well.

Considerations before removing brokers

Removing brokers from an existing Apache Kafka cluster is a critical operation that needs careful planning to avoid service disruption. When deciding how many brokers you should remove from the cluster, determine your cluster’s minimum broker count by considering your requirements around availability, durability, local data retention, and partition count. Here are a few things you should consider:

  • Check Amazon CloudWatch BytesInPerSec and BytesOutPerSec metrics for your cluster. Look for the peak load over a period of 1 month. Use this data with MSK sizing Excel file to identify how many brokers you need to handle your peak load. If the number of brokers listed in the Excel file is higher than the number of brokers that would remain after removing brokers, do not proceed with this operation. This indicates that removing brokers would result in too few brokers for the cluster, which can lead to availability impact for your cluster or applications.
  • Check UserPartitionExists metrics to verify that you have at least 1 empty broker per AZ in your cluster. If not, make sure to remove partitions from at least one broker per AZ before invoking the operation.
  • If you have more than one broker per AZ with no user partitions on them, MSK will randomly pick one of those during the removal operation.
  • Check the PartitionCount metrics to know the number of partitions that exist on your cluster. Check per broker partition limit. The broker removal feature will not allow the removal of brokers if the service detects that any brokers in the cluster have breached the partition limit. In that case, check if any unused topics could be removed instead to free up broker resources.
  • Check if the estimated storage in the Excel file exceeds the currently provisioned storage for the cluster. In that case, first provision additional storage on that cluster. If you are hitting per-broker storage limits, consider approaches like using MSK tiered storage or removing unused topics. Otherwise, avoid moving partitions to just a few brokers as that may lead to a disk full issue.
  • If the brokers you are planning to remove host partitions, make sure those partitions are reassigned to other brokers in the cluster. Use the kafka-reassign-partitions.sh tool or Cruise Control to initiate partition reassignment. Monitor the progress of reassignment to completion. Disregard the __amazon_msk_canary, __amazon_msk_canary_state internal topics, because they are managed by the service and will be automatically removed by MSK while executing the operation.
  • Verify the cluster status is Active, before starting the removal process.
  • Check the performance of the workload on your production environment after you move those partitions. We recommend monitoring this for a week before you remove the brokers to make sure that the other brokers in your cluster can safely handle your traffic patterns.
  • If you experience any impact on your applications or cluster availability after removing brokers, you can add the same number of brokers that you removed earlier by using the UpdateBrokerCount API, and then reassign partitions to the newly added brokers.
  • We recommend you test the entire process in a non-production environment, to identify and resolve any issues before making changes in the production environment.

Conclusion

Amazon MSK’s new broker removal capability provides a safe way to reduce the capacity of your provisioned Apache Kafka clusters. By allowing you to remove brokers without impacting availability, data durability, or disrupting your streaming applications, this feature enables you to optimize costs and right-size your MSK clusters based on changing business needs and traffic patterns. With careful planning and by following the recommended best practices, you can confidently use this capability to manage your MSK resources more efficiently.

Start taking advantage of the broker removal feature in Amazon MSK today. Review the documentation and follow the step-by-step guide to test the process in a non-production environment. Once you are comfortable with the workflow, plan and execute broker removal in your production MSK clusters to optimize costs and align your streaming infrastructure with your evolving workload requirements.


About the Authors


Vidhi Taneja is a Principal Product Manager for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She is passionate about helping customers build streaming applications at scale and derive value from real-time data. Before joining AWS, Vidhi worked at Apple, Goldman Sachs and Nutanix in product management and engineering roles. She holds an MS degree from Carnegie Mellon University.


Anusha Dasarakothapalli is a Principal Software Engineer for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She started her software engineering career with Amazon in 2015 and worked on products such as S3-Glacier and S3 Glacier Deep Archive, before transitioning to MSK in 2022. Her primary areas of focus lie in streaming technology, distributed systems, and storage.


Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Nexthink scales to trillions of events per day with Amazon MSK

Post Syndicated from Moe Haidar original https://aws.amazon.com/blogs/big-data/nexthink-scales-to-trillions-of-events-per-day-with-amazon-msk/

Real-time data streaming and event processing present scalability and management challenges. AWS offers a broad selection of managed real-time data streaming services to effortlessly run these workloads at any scale.

In this post, Nexthink shares how Amazon Managed Streaming for Apache Kafka (Amazon MSK) empowered them to achieve massive scale in event processing. Experiencing business hyper-growth, Nexthink migrated to AWS to overcome the scaling limitations of on-premises solutions. With Amazon MSK, Nexthink now seamlessly processes trillions of events per day, reaching over 5 GB per second of aggregated throughput.

In the following sections, Nexthink introduces their product and the need for scalability. They then highlight the challenges of their legacy on-premises application and present their transition to a cloud-centered software as a service (SaaS) architecture powered by Amazon MSK. Finally, Nexthink details the benefits achieved by adopting Amazon MSK.

Nexthink’s need to scale

Nexthink is the leader in digital employee experience (DeX). The company is shaping the future of work by providing IT leaders and C-levels with insights into employees’ daily technology experiences at the device and application level. This allows IT to evolve from reactive problem-solving to proactive optimization.

The Nexthink Infinity platform combines analytics, monitoring, automation, and more to manage the employee digital experience. By collecting device and application events, processing them in real time, and storing them, our platform analyzes data to solve problems and boost experiences for over 15 million employees across five continents.

In just 3 years, Nexthink’s business grew tenfold, and with the introduction of more real-time data our application had to scale from processing 200 MB per second to 5 GB per second and trillions of events daily. To enable this growth, we modernized our application from an on-premises single-tenant monolith to a cloud-based scalable SaaS solution powered by Amazon MSK.

The next sections detail our modernization journey, including the challenges we faced and the benefits we realized with our new cloud-centered, AWS-based architecture.

The on-premises solution and its challenges

Let’s first explore our previous on-premises solution, Nexthink V6, before examining how Amazon MSK addressed its challenges. The following diagram illustrates its architecture.

Nexthink v6

V6 was made up of two monolithic, single-tenant Java and C++ applications that were tightly coupled. The portal was a backend-for-frontend Java application, and the core engine was an in-house C++ in-memory database application that was also handling device connections, data ingestion, aggregation, and querying. By bundling all these functions together, the engine became difficult to manage and improve.

V6 also lacked scalability. Initially supporting 10,000 devices, some new tenants had over 300,000 devices. We reacted by deploying multiple V6 engines per tenant, increasing complexity and cost, hampering user experience, and delaying time to market. This also led to longer proof of concept and onboarding cycles, which hurt the business.

Furthermore, the absence of a streaming platform like Kafka created dependencies between teams through tight HTTP/gRPC coupling. Additionally, teams couldn’t access real-time events before ingestion into the database, limiting feature development. We also lacked a data buffer, risking potential data loss during outages. Such constraints impeded innovation and increased risks.

In summary, although the V6 system served its initial purpose, reinventing it with cloud-centered technologies became imperative to enhance scalability, reliability, and foster innovation by our engineering and product teams.

Transitioning to a cloud-centered architecture with Amazon MSK

To achieve our modernization goals, after thorough research and iterations, we implemented an event-driven microservices design on Amazon Elastic Kubernetes Service (Amazon EKS), using Kafka on Amazon MSK for distributed event storage and streaming.

Our transition from the v6 on-prem solution to the cloud-centered platform was phased over four iterations:

  • Phase 1 – We lifted and shifted from on premises to virtual machines in the cloud, reducing operational complexities and accelerating proof of concept cycles while transparently migrating customers.
  • Phase 2 – We extended the cloud architecture by implementing new product features with microservices and self-managed Kafka on Kubernetes. However, operating Kafka clusters ourselves proved overly difficult, leading us to Phase 3.
  • Phase 3 – We switched from self-managed Kafka to Amazon MSK, improving stability and reducing operational costs. We realized that managing Kafka wasn’t our core competency or differentiator, and the overhead was high. Amazon MSK enabled us to focus on our core application, freeing us from the burden of undifferentiated Kafka management.
  • Phase 4 – Finally, we eliminated all legacy components, completing the transition to a fully cloud-centered SaaS platform. This multi-year journey of learning and transformation took 3 years.

Today, after our successful transition, we use Amazon MSK for two key functions:

  • Real-time data ingestion and processing of trillions of daily events from over 15 million devices worldwide, as illustrated in the following figure.

Nexthink Architecture Ingestion

  • Enabling an event-driven system that decouples data producers and consumers, as depicted in the following figure.

Nexthink Architecture Event Driven

To further enhance our scalability and resilience, we adopted a cell-based architecture using the wide availability of Amazon MSK across AWS Regions. We currently operate over 10 cells, each representing an independent regional deployment of our SaaS solution. This cell-based approach minimizes the area of impact in case of issues, addresses data residency requirements, and enables horizontal scaling across AWS Regions, as illustrated in the following figure.

Nexthink Architecture Cells

Benefits of Amazon MSK

Amazon MSK has been critical in enabling our event-driven design. In this section, we outline the main benefits we gained from its adoption.

Improved data resilience

In our new architecture, data from devices is pushed directly to Kafka topics in Amazon MSK, which provides high availability and resilience. This makes sure that events can be safely received and stored at any time. Our services consuming this data inherit the same resilience from Amazon MSK. If our backend ingestion services face disruptions, no event is lost, because Kafka retains all published messages. When our services resume, they seamlessly continue processing from where they left off, thanks to Kafka’s producer semantics, which allow processing messages exactly-once, at-least-once, or at-most-once based on application needs.

Amazon MSK enables us to tailor the data retention duration to our specific requirements, ranging from seconds to unlimited duration. This flexibility grants uninterrupted data availability to our application, which wasn’t possible with our previous architecture. Furthermore, to safeguard data integrity in the event of processing errors or corruption, Kafka enabled us to implement a data replay mechanism, ensuring data consistency and reliability.

Organizational scaling

By adopting an event-driven architecture with Amazon MSK, we decomposed our monolithic application into loosely coupled, stateless microservices communicating asynchronously via Kafka topics. This approach enabled our engineering organization to scale rapidly from just 4–5 teams in 2019 to over 40 teams and approximately 350 engineers today.

The loose coupling between event publishers and subscribers empowered teams to focus on distinct domains, such as data ingestion, identification services, and data lakes. Teams could develop solutions independently within their domains, communicating through Kafka topics without tight coupling. This architecture accelerated feature development by minimizing the risk of new features impacting existing ones. Teams could efficiently consume events published by others, offering new capabilities more rapidly while reducing cross-team dependencies.

The following figure illustrates the seamless workflow of adding new domains to our system.

Adding domains

Furthermore, the event-driven design allowed teams to build stateless services that could seamlessly auto scale based on MSK metrics like messages per second. This event-driven scalability eliminated the need for extensive capacity planning and manual scaling efforts, freeing up development time.

By using an event-driven microservices architecture on Amazon MSK, we achieved organizational agility, enhanced scalability, and accelerated innovation while minimizing operational overhead.

Seamless infrastructure scaling

Nexthink’s business grew tenfold in 3 years, and many new capabilities were added to the product, leading to a substantial increase in traffic from 200 MB per second to 5 GB per second. This exponential data growth was enabled by the robust scalability of Amazon MSK. Achieving such scale with an on-premises solution would have been challenging and expensive, if not infeasible.

Attempting to self-manage Kafka imposed unnecessary operational overhead without providing business value. Running it with just 5% of today’s traffic was already complex and required two engineers. At today’s volumes, we estimated needing 6–10 dedicated staff, increasing costs and diverting resources away from core priorities.

Real-time capabilities

By channeling all our data through Amazon MSK, we enabled real-time processing of events. This unlocked capabilities like real-time alerts, event-driven triggers, and webhooks that were previously unattainable. As such, Amazon MSK was instrumental in facilitating our event-driven architecture and powering impactful innovations.

Secure data access

Transitioning to our new architecture, we met our security and data integrity goals. With Kafka ACLs, we enforced strict access controls, allowing consumers and producers to only interact with authorized topics. We based these granular data access controls on criteria like data type, domain, and team.

To securely scale decentralized management of topics, we introduced proprietary Kubernetes Custom Resource Definitions (CRDs). These CRDs enabled teams to independently manage their own topics, settings, and ACLs without compromising security.

Amazon MSK encryption made sure that the data remained encrypted at rest and in transit. We also introduced a Bring Your Own Key (BYOK) option, allowing application-level encryption with customer keys for all single-tenant and multi-tenant topics.

Enhanced observability

Amazon MSK gave us great visibility into our data flows. The out-of-the-box Amazon CloudWatch metrics let us see the amount and types of data flowing through each topic and cluster. This helped us quantify the usage of our product features by tracking data volumes at the topic level. The Amazon MSK operational metrics enabled effortless monitoring and right-sizing of clusters and brokers. Overall, the rich observability of Amazon MSK facilitated data-driven decisions about architecture and product features.

Conclusion

Nexthink’s journey from an on-premises monolith to a cloud SaaS was streamlined by using Amazon MSK, a fully managed Kafka service. Amazon MSK allowed us to scale seamlessly while benefiting from enterprise-grade reliability and security. By offloading Kafka management to AWS, we could stay focused on our core business and innovate faster.

Going forward, we plan to further improve performance, costs, and scalability by adopting Amazon MSK capabilities such as tiered storage and AWS Graviton-based EC2 instance types.

We are also working closely with the Amazon MSK team to prepare for upcoming service features. Rapidly adopting new capabilities will help us remain at the forefront of innovation while continuing to grow our business.

To learn more about how Nexthink uses AWS to serve its global customer base, explore the Nexthink on AWS case study. Additionally, discover other customer success stories with Amazon MSK by visiting the Amazon MSK blog category.


About the Authors

Moe HaidarMoe Haidar is a principal engineer and special projects lead @ CTO office of Nexthink. He has been involved with AWS since 2018 and is a key contributor to the cloud transformation of the Nexthink platform to AWS. His focus is on product and technology incubation and architecture, but he also loves doing hands-on activities to keep his knowledge of technologies sharp and up to date. He still contributes heavily to the code base and loves to tackle complex problems.
Simone PomataSimone Pomata is Senior Solutions Architect at AWS. He has worked enthusiastically in the tech industry for more than 10 years. At AWS, he helps customers succeed in building new technologies every day.
Magdalena GargasMagdalena Gargas is a Solutions Architect passionate about technology and solving customer challenges. At AWS, she works mostly with software companies, helping them innovate in the cloud. She participates in industry events, sharing insights and contributing to the advancement of the containerization field.

Exploring real-time streaming for generative AI Applications

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/exploring-real-time-streaming-for-generative-ai-applications/

Foundation models (FMs) are large machine learning (ML) models trained on a broad spectrum of unlabeled and generalized datasets. FMs, as the name suggests, provide the foundation to build more specialized downstream applications, and are unique in their adaptability. They can perform a wide range of different tasks, such as natural language processing, classifying images, forecasting trends, analyzing sentiment, and answering questions. This scale and general-purpose adaptability are what makes FMs different from traditional ML models. FMs are multimodal; they work with different data types such as text, video, audio, and images. Large language models (LLMs) are a type of FM and are pre-trained on vast amounts of text data and typically have application uses such as text generation, intelligent chatbots, or summarization.

Streaming data facilitates the constant flow of diverse and up-to-date information, enhancing the models’ ability to adapt and generate more accurate, contextually relevant outputs. This dynamic integration of streaming data enables generative AI applications to respond promptly to changing conditions, improving their adaptability and overall performance in various tasks.

To better understand this, imagine a chatbot that helps travelers book their travel. In this scenario, the chatbot needs real-time access to airline inventory, flight status, hotel inventory, latest price changes, and more. This data usually comes from third parties, and developers need to find a way to ingest this data and process the data changes as they happen.

Batch processing is not the best fit in this scenario. When data changes rapidly, processing it in a batch may result in stale data being used by the chatbot, providing inaccurate information to the customer, which impacts the overall customer experience. Stream processing, however, can enable the chatbot to access real-time data and adapt to changes in availability and price, providing the best guidance to the customer and enhancing the customer experience.

Another example is an AI-driven observability and monitoring solution where FMs monitor real-time internal metrics of a system and produces alerts. When the model finds an anomaly or abnormal metric value, it should immediately produce an alert and notify the operator. However, the value of such important data diminishes significantly over time. These notifications should ideally be received within seconds or even while it’s happening. If operators receive these notifications minutes or hours after they happened, such an insight is not actionable and has potentially lost its value. You can find similar use cases in other industries such as retail, car manufacturing, energy, and the financial industry.

In this post, we discuss why data streaming is a crucial component of generative AI applications due to its real-time nature. We discuss the value of AWS data streaming services such as Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Firehose in building generative AI applications.

In-context learning

LLMs are trained with point-in-time data and have no inherent ability to access fresh data at inference time. As new data appears, you will have to continuously fine-tune or further train the model. This is not only an expensive operation, but also very limiting in practice because the rate of new data generation far supersedes the speed of fine-tuning. Additionally, LLMs lack contextual understanding and rely solely on their training data, and are therefore prone to hallucinations. This means they can generate a fluent, coherent, and syntactically sound but factually incorrect response. They are also devoid of relevance, personalization, and context.

LLMs, however, have the capacity to learn from the data they receive from the context to more accurately respond without modifying the model weights. This is called in-context learning, and can be used to produce personalized answers or provide an accurate response in the context of organization policies.

For example, in a chatbot, data events could pertain to an inventory of flights and hotels or price changes that are constantly ingested to a streaming storage engine. Furthermore, data events are filtered, enriched, and transformed to a consumable format using a stream processor. The result is made available to the application by querying the latest snapshot. The snapshot constantly updates through stream processing; therefore, the up-to-date data is provided in the context of a user prompt to the model. This allows the model to adapt to the latest changes in price and availability. The following diagram illustrates a basic in-context learning workflow.

A commonly used in-context learning approach is to use a technique called Retrieval Augmented Generation (RAG). In RAG, you provide the relevant information such as most relevant policy and customer records along with the user question to the prompt. This way, the LLM generates an answer to the user question using additional information provided as context. To learn more about RAG, refer to Question answering using Retrieval Augmented Generation with foundation models in Amazon SageMaker JumpStart.

A RAG-based generative AI application can only produce generic responses based on its training data and the relevant documents in the knowledge base. This solution falls short when a near-real-time personalized response is expected from the application. For example, a travel chatbot is expected to consider the user’s current bookings, available hotel and flight inventory, and more. Moreover, the relevant customer personal data (commonly known as the unified customer profile) is usually subject to change. If a batch process is employed to update the generative AI’s user profile database, the customer may receive dissatisfying responses based on old data.

In this post, we discuss the application of stream processing to enhance a RAG solution used for building question answering agents with context from real-time access to unified customer profiles and organizational knowledge base.

Near-real-time customer profile updates

Customer records are typically distributed across data stores within an organization. For your generative AI application to provide a relevant, accurate, and up-to-date customer profile, it is vital to build streaming data pipelines that can perform identity resolution and profile aggregation across the distributed data stores. Streaming jobs constantly ingest new data to synchronize across systems and can perform enrichment, transformations, joins, and aggregations across windows of time more efficiently. Change data capture (CDC) events contain information about the source record, updates, and metadata such as time, source, classification (insert, update, or delete), and the initiator of the change.

The following diagram illustrates an example workflow for CDC streaming ingestion and processing for unified customer profiles.

In this section, we discuss the main components of a CDC streaming pattern required to support RAG-based generative AI applications.

CDC streaming ingestion

A CDC replicator is a process that collects data changes from a source system (usually by reading transaction logs or binlogs) and writes CDC events with the exact same order they occurred in a streaming data stream or topic. This involves a log-based capture with tools such as AWS Database Migration Service (AWS DMS) or open source connectors such as Debezium for Apache Kafka connect. Apache Kafka Connect is part of the Apache Kafka environment, allowing data to be ingested from various sources and delivered to variety of destinations. You can run your Apache Kafka connector on Amazon MSK Connect within minutes without worrying about configuration, setup, and operating an Apache Kafka cluster. You only need to upload your connector’s compiled code to Amazon Simple Storage Service (Amazon S3) and set up your connector with your workload’s specific configuration.

There are also other methods for capturing data changes. For example, Amazon DynamoDB provides a feature for streaming CDC data to Amazon DynamoDB Streams or Kinesis Data Streams. Amazon S3 provides a trigger to invoke an AWS Lambda function when a new document is stored.

Streaming storage

Streaming storage functions as an intermediate buffer to store CDC events before they get processed. Streaming storage provides reliable storage for streaming data. By design, it is highly available and resilient to hardware or node failures and maintains the order of the events as they are written. Streaming storage can store data events either permanently or for a set period of time. This allows stream processors to read from part of the stream if there is a failure or a need for re-processing. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. Amazon MSK is a fully managed, highly available, and secure service provided by AWS for running Apache Kafka.

Stream processing

Stream processing systems should be designed for parallelism to handle high data throughput. They should partition the input stream between multiple tasks running on multiple compute nodes. Tasks should be able to send the result of one operation to the next one over the network, making it possible for processing data in parallel while performing operations such as joins, filtering, enrichment, and aggregations. Stream processing applications should be able to process events with regards to the event time for use cases where events could arrive late or correct computation relies on the time events occur rather than the system time. For more information, refer to Notions of Time: Event Time and Processing Time.

Stream processes continuously produce results in the form of data events that need to be output to a target system. A target system could be any system that can integrate directly with the process or via streaming storage as in intermediary. Depending on the framework you choose for stream processing, you will have different options for target systems depending on available sink connectors. If you decide to write the results to an intermediary streaming storage, you can build a separate process that reads events and applies changes to the target system, such as running an Apache Kafka sink connector. Regardless of which option you choose, CDC data needs extra handling due to its nature. Because CDC events carry information about updates or deletes, it’s important that they merge in the target system in the right order. If changes are applied in the wrong order, the target system will be out of sync with its source.

Apache Flink is a powerful stream processing framework known for its low latency and high throughput capabilities. It supports event time processing, exactly-once processing semantics, and high fault tolerance. Additionally, it provides native support for CDC data via a special structure called dynamic tables. Dynamic tables mimic the source database tables and provide a columnar representation of the streaming data. The data in dynamic tables changes with every event that is processed. New records can be appended, updated, or deleted at any time. Dynamic tables abstract away the extra logic you need to implement for each record operation (insert, update, delete) separately. For more information, refer to Dynamic Tables.

With Amazon Managed Service for Apache Flink, you can run Apache Flink jobs and integrate with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

AWS Glue is a fully managed extract, transform, and load (ETL) service, which means AWS handles the infrastructure provisioning, scaling, and maintenance for you. Although it’s primarily known for its ETL capabilities, AWS Glue can also be used for Spark streaming applications. AWS Glue can interact with streaming data services such as Kinesis Data Streams and Amazon MSK for processing and transforming CDC data. AWS Glue can also seamlessly integrate with other AWS services such as Lambda, AWS Step Functions, and DynamoDB, providing you with a comprehensive ecosystem for building and managing data processing pipelines.

Unified customer profile

Overcoming the unification of the customer profile across a variety of source systems requires the development of robust data pipelines. You need data pipelines that can bring and synchronize all records into one data store. This data store provides your organization with the holistic customer records view that is needed for operational efficiency of RAG-based generative AI applications. For building such a data store, an unstructured data store would be best.

An identity graph is a useful structure for creating a unified customer profile because it consolidates and integrates customer data from various sources, ensures data accuracy and deduplication, offers real-time updates, connects cross-systems insights, enables personalization, enhances customer experience, and supports regulatory compliance. This unified customer profile empowers the generative AI application to understand and engage with customers effectively, and adhere to data privacy regulations, ultimately enhancing customer experiences and driving business growth. You can build your identity graph solution using Amazon Neptune, a fast, reliable, fully managed graph database service.

AWS provides a few other managed and serverless NoSQL storage service offerings for unstructured key-value objects. Amazon DocumentDB (with MongoDB compatibility) is a fast, scalable, highly available, and fully managed enterprise document database service that supports native JSON workloads. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

Near-real-time organizational knowledge base updates

Similar to customer records, internal knowledge repositories such as company policies and organizational documents are siloed across storage systems. This is typically unstructured data and is updated in a non-incremental fashion. The use of unstructured data for AI applications is effective using vector embeddings, which is a technique of representing high dimensional data such as text files, images, and audio files as multi-dimensional numeric.

AWS provides several vector engine services, such as Amazon OpenSearch Serverless, Amazon Kendra, and Amazon Aurora PostgreSQL-Compatible Edition with the pgvector extension for storing vector embeddings. Generative AI applications can enhance the user experience by transforming the user prompt into a vector and use it to query the vector engine to retrieve contextually relevant information. Both the prompt and the vector data retrieved are then passed to the LLM to receive a more precise and personalized response.

The following diagram illustrates an example stream-processing workflow for vector embeddings.

Knowledge base contents need to be converted to vector embeddings before being written to the vector data store. Amazon Bedrock or Amazon SageMaker can help you access the model of your choice and expose a private endpoint for this conversion. Furthermore, you can use libraries such as LangChain to integrate with these endpoints. Building a batch process can help you convert your knowledge base content to vector data and store it in a vector database initially. However, you need to rely on an interval to reprocess the documents to synchronize your vector database with changes in your knowledge base content. With a large number of documents, this process can be inefficient. Between these intervals, your generative AI application users will receive answers according to the old content, or will receive an inaccurate answer because the new content is not vectorized yet.

Stream processing is an ideal solution for these challenges. It produces events as per existing documents initially and further monitors the source system and creates a document change event as soon as they occur. These events can be stored in streaming storage and wait to be processed by a streaming job. A streaming job reads these events, loads the content of the document, and transforms the contents to an array of related tokens of words. Each token further transforms into vector data via an API call to an embedding FM. Results are sent for storage to the vector storage via a sink operator.

If you’re using Amazon S3 for storing your documents, you can build an event-source architecture based on S3 object change triggers for Lambda. A Lambda function can create an event in the desired format and write that to your streaming storage.

You can also use Apache Flink to run as a streaming job. Apache Flink provides the native FileSystem source connector, which can discover existing files and read their contents initially. After that, it can continuously monitor your file system for new files and capture their content. The connector supports reading a set of files from distributed file systems such as Amazon S3 or HDFS with a format of plain text, Avro, CSV, Parquet, and more, and produces a streaming record. As a fully managed service, Managed Service for Apache Flink removes the operational overhead of deploying and maintaining Flink jobs, allowing you to focus on building and scaling your streaming applications. With seamless integration into the AWS streaming services such as Amazon MSK or Kinesis Data Streams, it provides features like automatic scaling, security, and resiliency, providing reliable and efficient Flink applications for handling real-time streaming data.

Based on your DevOps preference, you can choose between Kinesis Data Streams or Amazon MSK for storing the streaming records. Kinesis Data Streams simplifies the complexities of building and managing custom streaming data applications, allowing you to focus on deriving insights from your data rather than infrastructure maintenance. Customers using Apache Kafka often opt for Amazon MSK due to its straightforwardness, scalability, and dependability in overseeing Apache Kafka clusters within the AWS environment. As a fully managed service, Amazon MSK takes on the operational complexities associated with deploying and maintaining Apache Kafka clusters, enabling you to concentrate on constructing and expanding your streaming applications.

Because a RESTful API integration suits the nature of this process, you need a framework that supports a stateful enrichment pattern via RESTful API calls to track for failures and retry for the failed request. Apache Flink again is a framework that can do stateful operations in at-memory speed. To understand the best ways to make API calls via Apache Flink, refer to Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink.

Apache Flink provides native sink connectors for writing data to vector datastores such as Amazon Aurora for PostgreSQL with pgvector or Amazon OpenSearch Service with VectorDB. Alternatively, you can stage the Flink job’s output (vectorized data) in an MSK topic or a Kinesis data stream. OpenSearch Service provides support for native ingestion from Kinesis data streams or MSK topics. For more information, refer to Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion and Loading streaming data from Amazon Kinesis Data Streams.

Feedback analytics and fine-tuning

It’s important for data operation managers and AI/ML developers to get insight about the performance of the generative AI application and the FMs in use. To achieve that, you need to build data pipelines that calculate important key performance indicator (KPI) data based on the user feedback and variety of application logs and metrics. This information is useful for stakeholders to gain real-time insight about the performance of the FM, the application, and overall user satisfaction about the quality of support they receive from your application. You also need to collect and store the conversation history for further fine-tuning your FMs to improve their ability in performing domain-specific tasks.

This use case fits very well in the streaming analytics domain. Your application should store each conversation in streaming storage. Your application can prompt users about their rating of each answer’s accuracy and their overall satisfaction. This data can be in a format of a binary choice or a free form text. This data can be stored in a Kinesis data stream or MSK topic, and get processed to generate KPIs in real time. You can put FMs to work for users’ sentiment analysis. FMs can analyze each answer and assign a category of user satisfaction.

Apache Flink’s architecture allows for complex data aggregation over windows of time. It also provides support for SQL querying over stream of data events. Therefore, by using Apache Flink, you can quickly analyze raw user inputs and generate KPIs in real time by writing familiar SQL queries. For more information, refer to Table API & SQL.

With Amazon Managed Service for Apache Flink Studio, you can build and run Apache Flink stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by Apache Zeppelin and use Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. With support for user-defined functions (UDFs), Apache Flink allows for building custom operators to integrate with external resources such as FMs for performing complex tasks such as sentiment analysis. You can use UDFs to compute various metrics or enrich user feedback raw data with additional insights such as user sentiment. To learn more about this pattern, refer to Proactively addressing customer concern in real-time with GenAI, Flink, Apache Kafka, and Kinesis.

With Managed Service for Apache Flink Studio, you can deploy your Studio notebook as a streaming job with one click. You can use native sink connectors provided by Apache Flink to send the output to your storage of choice or stage it in a Kinesis data stream or MSK topic. Amazon Redshift and OpenSearch Service are both ideal for storing analytical data. Both engines provide native ingestion support from Kinesis Data Streams and Amazon MSK via a separate streaming pipeline to a data lake or data warehouse for analysis.

Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses and data lakes, using AWS-designed hardware and machine learning to deliver the best price-performance at scale. OpenSearch Service offers visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

You can use the outcome of such analysis combined with user prompt data for fine-tuning the FM when is needed. SageMaker is the most straightforward way to fine-tune your FMs. Using Amazon S3 with SageMaker provides a powerful and seamless integration for fine-tuning your models. Amazon S3 serves as a scalable and durable object storage solution, enabling straightforward storage and retrieval of large datasets, training data, and model artifacts. SageMaker is a fully managed ML service that simplifies the entire ML lifecycle. By using Amazon S3 as the storage backend for SageMaker, you can benefit from the scalability, reliability, and cost-effectiveness of Amazon S3, while seamlessly integrating it with SageMaker training and deployment capabilities. This combination enables efficient data management, facilitates collaborative model development, and makes sure that ML workflows are streamlined and scalable, ultimately enhancing the overall agility and performance of the ML process. For more information, refer to Fine-tune Falcon 7B and other LLMs on Amazon SageMaker with @remote decorator.

With a file system sink connector, Apache Flink jobs can deliver data to Amazon S3 in open format (such as JSON, Avro, Parquet, and more) files as data objects. If you prefer to manage your data lake using a transactional data lake framework (such as Apache Hudi, Apache Iceberg, or Delta Lake), all of these frameworks provide a custom connector for Apache Flink. For more details, refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi.

Summary

For a generative AI application based on a RAG model, you need to consider building two data storage systems, and you need to build data operations that keep them up to date with all the source systems. Traditional batch jobs are not sufficient to process the size and diversity of the data you need to integrate with your generative AI application. Delays in processing the changes in source systems result in an inaccurate response and reduce the efficiency of your generative AI application. Data streaming enables you to ingest data from a variety of databases across various systems. It also allows you to transform, enrich, join, and aggregate data across many sources efficiently in near-real time. Data streaming provides a simplified data architecture to collect and transform users’ real-time reactions or comments on the application responses, helping you deliver and store the results in a data lake for model fine-tuning. Data streaming also helps you optimize data pipelines by processing only the change events, allowing you to respond to data changes more quickly and efficiently.

Learn more about AWS data streaming services and get started building your own data streaming solution.


About the Authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Imtiaz (Taz) Sayed is the World-Wide Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Introducing enhanced functionality for worker configuration management in Amazon MSK Connect

Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/introducing-enhanced-functionality-for-worker-configuration-management-in-amazon-msk-connect/

Amazon MSK Connect is a fully managed service for Apache Kafka Connect. With a few clicks, MSK Connect allows you to deploy connectors that move data between Apache Kafka and external systems.

MSK Connect now supports the ability to delete MSK Connect worker configurations, tag resources, and manage worker configurations and custom plugins using AWS CloudFormation. Together, these new capabilities make it straightforward to manage your MSK Connect resources and automate deployments through CI/CD pipelines.

MSK Connect makes it effortless to stream data to and from Apache Kafka over a private connection without requiring infrastructure management expertise. With a few clicks, you can deploy connectors like an Amazon S3 sink connector for loading streaming data to Amazon Simple Storage Service (Amazon S3), deploy connectors developed by third parties like Debezium for streaming change logs from databases into Apache Kafka, or deploy your own connector customized for your use case.

MSK Connect integrates external systems or AWS services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your Apache Kafka cluster into a data sink. The connector can also perform lightweight tasks such as transformation, format conversion, or filtering data before delivering the data to a destination. You can use a plugin to create the connecter; these custom plugins are resources that contain the code that defines connector logic.

The primary components of MSK Connect are workers. Each worker is a Java virtual machine (JVM) process that runs the connector logic based on the worker configuration provided. Worker configurations are resources that contain your connector configuration properties that can be reused across multiple connectors. Each worker is comprised of a set of tasks that copy the data in parallel.

Today, we are announcing three new capabilities in MSK Connect:

  • The ability to delete worker configurations
  • Support for resource tags for enabling resource grouping, cost allocation and reporting, and access control with tag-based policies
  • Support in AWS CloudFormation to manage worker configurations and custom plugins

In the following sections, we look at the new functionalities in more detail.

Delete worker configurations

Connectors for integrating Amazon Managed Streaming for Apache Kafka (Amazon MSK) with other AWS and partner services are usually created using a worker configuration (default or custom). These configurations can grow with the creation and deletion of connectors, potentially creating configuration management issues.

You can now use the new delete worker configuration API to delete unused configurations. The service checks that the worker configuration is not in use by any connectors before deleting the configuration. Additionally, you can now use a prefix filter to list worker configurations and custom plugins using the ListWorkerConfigurations and ListCustomPlugins API calls. The prefix filter allows you to list the selective resources with names starting with the prefix so you can perform quick selective deletes.

To test the new delete API, complete the following steps:

  1. On the Amazon MSK console, create a new worker configuration.
  2. Provide a name and optional description.
  3. In the Worker configuration section, enter your configuration code.

MSK Connect Worker Configuration

After you create the configuration, a Delete option is available on the configuration detail page (see the following screenshot) if the configuration is not being used in any connector.

To support this new API, an additional workerConfigurationState has been added, so you can more easily track the state of the worker configuration. This new state will be returned in the API call responses for CreateWorkerConfiguration, DescribeWorkerConfiguration, and ListWorkerConfigurations.

MSK Connect Worker Configuration

  1. Choose Delete to delete the worker configuration.
  2. In the confirmation pop-up, enter the name of the worker configuration, then choose Delete.

Delete MSKC Worker Configuration

If the worker configuration is being used with any connector, the Delete option is disabled, as shown in the following screenshot.

Resource tags

MSK Connect now also has support for resource tags. Tags are key-value metadata that can be associated with AWS service resources. You can add tags to connectors, custom plugins, and worker configurations to organize and find resources used across AWS services. In the following screenshots, our example MSK Connect connector, plugin, and worker configuration have been tagged with the resource tag key project and value demo-tags.

You can now tag your Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3 resources with the same project name, for example. Then you can use the tag to search for all resources linked to this particular project for cost allocation, reporting, resource grouping, or access control. MSK Connect supports adding tags when creating resources, applying tags to an existing resource, removing tags from a resource, and querying tags associated with a resource.

AWS CloudFormation support

Previously, you were only able to provision an MSK Connect connector with AWS CloudFormation by using an existing worker configuration. With this new feature, you can now perform CREATE, READ, UPDATE, DELETE, and LIST operations on connectors, and create and add new worker configurations using AWS CloudFormation.

The following code is an example of creating a worker configuration:

{
"Type": "AWS::KafkaConnect::WorkerConfiguration"
"Properties":{
"Name": "WorkerConfigurationName",
"Description": "WorkerConfigurationDescription",
"PropertiesFileContent": String,
"Tags": [Tag,…],
}
}

The return values are as follows:

  • ARN of the newly created worker configuration
  • State of the new worker configuration
  • Creation time of new worker configuration
  • Latest revision of the new worker configuration

Conclusion

MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we discussed the new features that were added to MSK Connect, which streamline connector and worker management with the introduction of APIs for deleting worker configurations, tagging MSK Connect resources, and support in AWS CloudFormation to create non-default worker configurations.

These capabilities are available in all AWS Regions where Amazon MSK Connect is available. For a list of Region availability, refer to AWS Services by Region. To learn more about MSK Connect, visit the Amazon MSK Connect Developer Guide.


About the Authors

Chinmayi Narasimhadevara is a is a Solutions Architect focused on Big Data and Analytics at Amazon Web Services. Chinmayi has over 20 years of experience in information technology. She helps AWS customers build advanced, highly scalable and performant solutions.

Harita Pappu is Technical Account Manager based out California. She has over 18 years of experience working in software industry building and scaling applications. She is passionate about new technologies and focused on helping customers achieve cost optimization and operational excellence.

Build an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK using Python

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-serverless-streaming-pipeline-with-apache-kafka-on-amazon-msk-using-python/

The volume of data generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and travel. Organizations are looking for more ways to quickly use the constant inflow of data to innovate for their businesses and customers. They have to reliably capture, process, analyze, and load the data into a myriad of data stores, all in real time.

Apache Kafka is a popular choice for these real-time streaming needs. However, it can be challenging to set up a Kafka cluster along with other data processing components that scale automatically depending on your application’s needs. You risk under-provisioning for peak traffic, which can lead to downtime, or over-provisioning for base load, leading to wastage. AWS offers multiple serverless services like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Data Firehose, Amazon DynamoDB, and AWS Lambda that scale automatically depending on your needs.

In this post, we explain how you can use some of these services, including MSK Serverless, to build a serverless data platform to meet your real-time needs.

Solution overview

Let’s imagine a scenario. You’re responsible for managing thousands of modems for an internet service provider deployed across multiple geographies. You want to monitor the modem connectivity quality that has a significant impact on customer productivity and satisfaction. Your deployment includes different modems that need to be monitored and maintained to ensure minimal downtime. Each device transmits thousands of 1 KB records every second, such as CPU usage, memory usage, alarm, and connection status. You want real-time access to this data so you can monitor performance in real time, and detect and mitigate issues quickly. You also need longer-term access to this data for machine learning (ML) models to run predictive maintenance assessments, find optimization opportunities, and forecast demand.

Your clients that gather the data onsite are written in Python, and they can send all the data as Apache Kafka topics to Amazon MSK. For your application’s low-latency and real-time data access, you can use Lambda and DynamoDB. For longer-term data storage, you can use managed serverless connector service Amazon Data Firehose to send data to your data lake.

The following diagram shows how you can build this end-to-end serverless application.

end-to-end serverless application

Let’s follow the steps in the following sections to implement this architecture.

Create a serverless Kafka cluster on Amazon MSK

We use Amazon MSK to ingest real-time telemetry data from modems. Creating a serverless Kafka cluster is straightforward on Amazon MSK. It only takes a few minutes using the AWS Management Console or AWS SDK. To use the console, refer to Getting started using MSK Serverless clusters. You create a serverless cluster, AWS Identity and Access Management (IAM) role, and client machine.

Create a Kafka topic using Python

When your cluster and client machine are ready, SSH to your client machine and install Kafka Python and the MSK IAM library for Python.

  • Run the following commands to install Kafka Python and the MSK IAM library:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python
  • Create a new file called createTopic.py.
  • Copy the following code into this file, replacing the bootstrap_servers and region information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))
  • Run the createTopic.py script to create a new Kafka topic called mytopic on your serverless cluster:
python createTopic.py

Produce records using Python

Let’s generate some sample modem telemetry data.

  • Create a new file called kafkaDataGen.py.
  • Copy the following code into this file, updating the BROKERS and region information with the details for your cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())
  • Run the kafkaDataGen.py to continuously generate random data and publish it to the specified Kafka topic:
python kafkaDataGen.py

Store events in Amazon S3

Now you store all the raw event data in an Amazon Simple Storage Service (Amazon S3) data lake for analytics. You can use the same data to train ML models. The integration with Amazon Data Firehose allows Amazon MSK to seamlessly load data from your Apache Kafka clusters into an S3 data lake. Complete the following steps to continuously stream data from Kafka to Amazon S3, eliminating the need to build or manage your own connector applications:

  • On the Amazon S3 console, create a new bucket. You can also use an existing bucket.
  • Create a new folder in your S3 bucket called streamingDataLake.
  • On the Amazon MSK console, choose your MSK Serverless cluster.
  • On the Actions menu, choose Edit cluster policy.

cluster policy

  • Select Include Firehose service principal and choose Save changes.

firehose service principal

  • On the S3 delivery tab, choose Create delivery stream.

delivery stream

  • For Source, choose Amazon MSK.
  • For Destination, choose Amazon S3.

source and destination

  • For Amazon MSK cluster connectivity, select Private bootstrap brokers.
  • For Topic, enter a topic name (for this post, mytopic).

source settings

  • For S3 bucket, choose Browse and choose your S3 bucket.
  • Enter streamingDataLake as your S3 bucket prefix.
  • Enter streamingDataLakeErr as your S3 bucket error output prefix.

destination settings

  • Choose Create delivery stream.

create delivery stream

You can verify that the data was written to your S3 bucket. You should see that the streamingDataLake directory was created and the files are stored in partitions.

amazon s3

Store events in DynamoDB

For the last step, you store the most recent modem data in DynamoDB. This allows the client application to access the modem status and interact with the modem remotely from anywhere, with low latency and high availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

Lets first create a table in DynamoDB. Refer to DynamoDB API permissions: Actions, resources, and conditions reference to verify that your client machine has the necessary permissions.

  • Create a new file called createTable.py.
  • Copy the following code into the file, updating the region information:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")
  • Run the createTable.py script to create a table called device_status in DynamoDB:
python createTable.py

Now let’s configure the Lambda function.

  • On the Lambda console, choose Functions in the navigation pane.
  • Choose Create function.
  • Select Author from scratch.
  • For Function name¸ enter a name (for example, my-notification-kafka).
  • For Runtime, choose Python 3.11.
  • For Permissions, select Use an existing role and choose a role with permissions to read from your cluster.
  • Create the function.

On the Lambda function configuration page, you can now configure sources, destinations, and your application code.

  • Choose Add trigger.
  • For Trigger configuration, enter MSK to configure Amazon MSK as a trigger for the Lambda source function.
  • For MSK cluster, enter myCluster.
  • Deselect Activate trigger, because you haven’t configured your Lambda function yet.
  • For Batch size, enter 100.
  • For Starting position, choose Latest.
  • For Topic name¸ enter a name (for example, mytopic).
  • Choose Add.
  • On the Lambda function details page, on the Code tab, enter the following code:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")
  • Deploy the Lambda function.
  • On the Configuration tab, choose Edit to edit the trigger.

edit trigger

  • Select the trigger, then choose Save.
  • On the DynamoDB console, choose Explore items in the navigation pane.
  • Select the table device_status.

You will see Lambda is writing events generated in the Kafka topic to DynamoDB.

ddb table

Summary

Streaming data pipelines are critical for building real-time applications. However, setting up and managing the infrastructure can be daunting. In this post, we walked through how to build a serverless streaming pipeline on AWS using Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose, and other services. The key benefits are no servers to manage, automatic scalability of the infrastructure, and a pay-as-you-go model using fully managed services.

Ready to build your own real-time pipeline? Get started today with a free AWS account. With the power of serverless, you can focus on your application logic while AWS handles the undifferentiated heavy lifting. Let’s build something awesome on AWS!


About the Authors

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Michael Oguike is a Product Manager for Amazon MSK. He is passionate about using data to uncover insights that drive action. He enjoys helping customers from a wide range of industries improve their businesses using data streaming. Michael also loves learning about behavioral science and psychology from books and podcasts.

How VMware Tanzu CloudHealth migrated from self-managed Kafka to Amazon MSK

Post Syndicated from Rivlin Pereira original https://aws.amazon.com/blogs/big-data/how-vmware-tanzu-cloudhealth-migrated-from-self-managed-kafka-to-amazon-msk/

This is a post co-written with Rivlin Pereira & Vaibhav Pandey from Tanzu CloudHealth (VMware by Broadcom).

VMware Tanzu CloudHealth is the cloud cost management platform of choice for more than 20,000 organizations worldwide, who rely on it to optimize and govern their largest and most complex multi-cloud environments. In this post, we discuss how the VMware Tanzu CloudHealth DevOps team migrated their self-managed Apache Kafka workloads (running version 2.0) to Amazon Managed Streaming for Apache Kafka (Amazon MSK) running version 2.6.2. We discuss the system architectures, deployment pipelines, topic creation, observability, access control, topic migration, and all the issues we faced with the existing infrastructure, along with how and why we migrated to the new Kafka setup and some lessons learned.

Kafka cluster overview

In the fast-evolving landscape of distributed systems, VMware Tanzu CloudHealth’s next-generation microservices platform relies on Kafka as its messaging backbone. For us, Kafka’s high-performance distributed log system excels in handling massive data streams, making it indispensable for seamless communication. Serving as a distributed log system, Kafka efficiently captures and stores diverse logs, from HTTP server access logs to security event audit logs.

Kafka’s versatility shines in supporting key messaging patterns, treating messages as basic logs or structured key-value stores. Dynamic partitioning and consistent ordering ensure efficient message organization. The unwavering reliability of Kafka aligns with our commitment to data integrity.

The integration of Ruby services with Kafka is streamlined through the Karafka library, acting as a higher-level wrapper. Our other language stack services use similar wrappers. Kafka’s robust debugging features and administrative commands play a pivotal role in ensuring smooth operations and infrastructure health.

Kafka as an architectural pillar

In VMware Tanzu CloudHealth’s next-generation microservices platform, Kafka emerges as a critical architectural pillar. Its ability to handle high data rates, support diverse messaging patterns, and guarantee message delivery aligns seamlessly with our operational needs. As we continue to innovate and scale, Kafka remains a steadfast companion, enabling us to build a resilient and efficient infrastructure.

Why we migrated to Amazon MSK

For us, migrating to Amazon MSK came down to three key decision points:

  • Simplified technical operations – Running Kafka on a self-managed infrastructure was an operational overhead for us. We hadn’t updated Kafka version 2.0.0 for a while, and Kafka brokers were going down in production, causing issues with topics going offline. We also had to run scripts manually for increasing replication factors and rebalancing leaders, which was additional manual effort.
  • Deprecated legacy pipelines and simplified permissions – We were looking to move away from our existing pipelines written in Ansible to create Kafka topics on the cluster. We also had a cumbersome process of giving team members access to Kafka machines in staging and production, and we wanted to simplify this.
  • Cost, patching, and support – Because Apache Zookeeper is completely managed and patched by AWS, moving to Amazon MSK was going to save us time and money. In addition, we discovered that running Amazon MSK with the same type of brokers on Amazon Elastic Compute Cloud (Amazon EC2) was cheaper to run on Amazon MSK. Combined with the fact that we get security patches applied on brokers by AWS, migrating to Amazon MSK was an easy decision. This also meant that the team was freed up to work on other important things. Finally, getting enterprise support from AWS was also critical in our final decision to move to a managed solution.

How we migrated to Amazon MSK

With the key drivers identified, we moved ahead with a proposed design to migrate existing self-managed Kafka to Amazon MSK. We conducted the following pre-migration steps before the actual implementation:

  • Assessment:
    • Conducted a meticulous assessment of the existing EC2 Kafka cluster, understanding its configurations and dependencies
    • Verified Kafka version compatibility with Amazon MSK
  • Amazon MSK setup with Terraform
  • Network configuration:
    • Ensured seamless network connectivity between the EC2 Kafka and MSK clusters, fine-tuning security groups and firewall settings

After the pre-migration steps, we implemented the following for the new design:

  • Automated deployment, upgrade, and topic creation pipelines for MSK clusters:
    • In the new setup, we wanted to have automated deployments and upgrades of the MSK clusters in a repeatable fashion using an IaC tool. Therefore, we created custom Terraform modules for MSK cluster deployments as well as upgrades. These modules where called from a Jenkins pipeline for automated deployments and upgrades of the MSK clusters. For Kafka topic creation, we were using an Ansible-based home-grown pipeline, which wasn’t stable and led to a lot of complaints from dev teams. As a result, we evaluated options for deployments to Kubernetes clusters and used the Strimzi Topic Operator to create topics on MSK clusters. Topic creation was automated using Jenkins pipelines, which dev teams could self-service.
  • Better observability for clusters:
    • The old Kafka clusters didn’t have good observability. We only had alerts on Kafka broker disk size. With Amazon MSK, we took advantage of open monitoring using Prometheus. We stood up a standalone Prometheus server that scraped metrics from MSK clusters and sent them to our internal observability tool. As a result of improved observability, we were able to set up robust alerting for Amazon MSK, which wasn’t possible with our old setup.
  • Improved COGS and better compute infrastructure:
    • For our old Kafka infrastructure, we had to pay for managing Kafka, Zookeeper instances, plus any additional broker storage costs and data transfer costs. With the move to Amazon MSK, because Zookeeper is completely managed by AWS, we only have to pay for Kafka nodes, broker storage, and data transfer costs. As a result, in final Amazon MSK setup for production, we saved not only on infrastructure costs but also operational costs.
  • Simplified operations and enhanced security:
    • With the move to Amazon MSK, we didn’t have to manage any Zookeeper instances. Broker security patching was also taken care by AWS for us.
    • Cluster upgrades became simpler with the move to Amazon MSK; it’s a straightforward process to initiate from the Amazon MSK console.
    • With Amazon MSK, we got broker automatic scaling out of the box. As a result, we didn’t have to worry about brokers running out of disk space, thereby leading to additional stability of the MSK cluster.
    • We also got additional security for the cluster because Amazon MSK supports encryption at rest by default, and various options for encryption in transit are also available. For more information, refer to Data protection in Amazon Managed Streaming for Apache Kafka.

During our pre-migration steps, we validated the setup on the staging environment before moving ahead with production.

Kafka topic migration strategy

With the MSK cluster setup complete, we performed a data migration of Kafka topics from the old cluster running on Amazon EC2 to the new MSK cluster. To achieve this, we performed the following steps:

  • Set up MirrorMaker with Terraform – We used Terraform to orchestrate the deployment of a MirrorMaker cluster consisting of 15 nodes. This demonstrated the scalability and flexibility by adjusting the number of nodes based on the migration’s concurrent replication needs.
  • Implement a concurrent replication strategy – We implemented a concurrent replication strategy with 15 MirrorMaker nodes to expedite the migration process. Our Terraform-driven approach contributed to cost optimization by efficiently managing resources during the migration and ensured the reliability and consistency of the MSK and MirrorMaker clusters. It also showcased how the chosen setup accelerates data transfer, optimizing both time and resources.
  • Migrate data – We successfully migrated 2 TB of data in a remarkably short timeframe, minimizing downtime and showcasing the efficiency of the concurrent replication strategy.
  • Set up post-migration monitoring – We implemented robust monitoring and alerting during the migration, contributing to a smooth process by identifying and addressing issues promptly.

The following diagram illustrates the architecture after the topic migration was complete.
Mirror-maker setup

Challenges and lessons learned

Embarking on a migration journey, especially with large datasets, is often accompanied by unforeseen challenges. In this section, we delve into the challenges encountered during the migration of topics from EC2 Kafka to Amazon MSK using MirrorMaker, and share valuable insights and solutions that shaped the success of our migration.

Challenge 1: Offset discrepancies

One of the challenges we encountered was the mismatch in topic offsets between the source and destination clusters, even with offset synchronization enabled in MirrorMaker. The lesson learned here was that offset values don’t necessarily need to be identical, as long as offset sync is enabled, which makes sure the topics have the correct position to read the data from.

We addressed this problem by using a custom tool to run tests on consumer groups, confirming that the translated offsets were either smaller or caught up, indicating synchronization as per MirrorMaker.

Challenge 2: Slow data migration

The migration process faced a bottleneck—data transfer was slower than anticipated, especially with a substantial 2 TB dataset. Despite a 20-node MirrorMaker cluster, the speed was insufficient.

To overcome this, the team strategically grouped MirrorMaker nodes based on unique port numbers. Clusters of five MirrorMaker nodes, each with a distinct port, significantly boosted throughput, allowing us to migrate data within hours instead of days.

Challenge 3: Lack of detailed process documentation

Navigating the uncharted territory of migrating large datasets using MirrorMaker highlighted the absence of detailed documentation for such scenarios.

Through trial and error, the team crafted an IaC module using Terraform. This module streamlined the entire cluster creation process with optimized settings, enabling a seamless start to the migration within minutes.

Final setup and next steps

As a result of the move to Amazon MSK, our final setup after topic migration looked like the following diagram.
MSK Blog
We’re considering the following future improvements:

Conclusion.

In this post, we discussed how VMware Tanzu CloudHealth migrated their existing Amazon EC2-based Kafka infrastructure to Amazon MSK. We walked you through the new architecture, deployment and topic creation pipelines, improvements to observability and access control, topic migration challenges, and the issues we faced with the existing infrastructure, along with how and why we migrated to the new Amazon MSK setup. We also talked about all the advantages that Amazon MSK gave us, the final architecture we achieved with this migration, and lessons learned.

For us, the interplay of offset synchronization, strategic node grouping, and IaC proved pivotal in overcoming obstacles and ensuring a successful migration from Amazon EC2 Kafka to Amazon MSK. This post serves as a testament to the power of adaptability and innovation in migration challenges, offering insights for others navigating a similar path.

If you’re running self-managed Kafka on AWS, we encourage you to try the managed Kafka offering, Amazon MSK.


About the Authors

Rivlin Pereira is Staff DevOps Engineer at VMware Tanzu Division. He is very passionate about Kubernetes and works on CloudHealth Platform building and operating cloud solutions that are scalable, reliable and cost effective.

Vaibhav Pandey, a Staff Software Engineer at Broadcom, is a key contributor to the development of cloud computing solutions. Specializing in architecting and engineering data storage layers, he is passionate about building and scaling SaaS applications for optimal performance.

Raj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Todd McGrath is a data streaming specialist at Amazon Web Services where he advises customers on their streaming strategies, integration, architecture, and solutions. On the personal side, he enjoys watching and supporting his 3 teenagers in their preferred activities as well as following his own pursuits such as fishing, pickleball, ice hockey, and happy hour with friends and family on pontoon boats. Connect with him on LinkedIn.

Satya Pattanaik is a Sr. Solutions Architect at AWS. He has been helping ISVs build scalable and resilient applications on AWS Cloud. Prior joining AWS, he played significant role in Enterprise segments with their growth and success. Outside of work, he spends time learning “how to cook a flavorful BBQ” and trying out new recipes.

Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/best-practices-to-implement-near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-msk/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, straightforward, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the most widely used cloud data warehouse. You can run and scale analytics in seconds on all your data, without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift Streaming Ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use Structured Query Language (SQL) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss the best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK.

Overview of solution

We walk through an example pipeline to ingest data from an MSK topic into Amazon Redshift using Amazon Redshift streaming ingestion. We also show how to unnest JSON data using dot notation in Amazon Redshift. The following diagram illustrates our solution architecture.

The process flow consists of the following steps:

  1. Create a streaming materialized view in your Redshift cluster to consume live streaming data from the MSK topics.
  2. Use a stored procedure to implement change data capture (CDC) using the unique combination of Kafka Partition and Kafka Offset at the record level for the ingested MSK topic.
  3. Create a user-facing table in the Redshift cluster and use dot notation to unnest the JSON document from the streaming materialized view into data columns of the table. You can continuously load fresh data by calling the stored procedure at regular intervals.
  4. Establish connectivity between an Amazon QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

As part of this post, we also discuss the following topics:

  • Steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift
  • Best practices to achieve optimized performance from streaming materialized views
  • Monitoring techniques to track failures in Amazon Redshift streaming ingestion

Prerequisites

You must have the following:

Considerations while setting up your MSK topic

Keep in mind the following considerations when configuring your MSK topic:

  • Make sure that the name of your MSK topic is no longer than 128 characters.
  • As of this writing, MSK records containing compressed data can’t be directly queried in Amazon Redshift. Amazon Redshift doesn’t support any native decompression methods for client-side compressed data in an MSK topic.
  • Follow best practices while setting up your MSK cluster.
  • Review the streaming ingestion limitations for any other considerations.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to the Setting up IAM and performing streaming ingestion from Kafka.
  2. Make sure that data is flowing into your MSK topic using Amazon CloudWatch metrics (for example, BytesOutPerSec).
  3. Launch the query editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Redshift cluster for the next steps. The following steps were run in query editor v2.
  4. Create an external schema to map to the MSK cluster. Replace your IAM role ARN and the MSK cluster ARN in the following statement:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Optionally, if your topic names are case sensitive, you need to enable enable_case_sensitive_identifier to be able to access them in Amazon Redshift. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Create a materialized view to consume the stream data from the MSK topic:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

The metadata column kafka_value that arrives from Amazon MSK is stored in VARBYTE format in Amazon Redshift. For this post, you use the JSON_PARSE function to convert kafka_value to a SUPER data type. You also use the CAN_JSON_PARSE function in the filter condition to skip invalid JSON records and guard against errors due to JSON parsing failures. We discuss how to store the invalid data for future debugging later in this post.

  1. Refresh the streaming materialized view, which triggers Amazon Redshift to read from the MSK topic and load data into the materialized view:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the MSK topic to the Data column of SUPER type in the streaming materialized view Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Use dot notation as shown in the following code to unnest your JSON payload:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

The following screenshot shows what the result looks like after unnesting.

If you have arrays in your JSON document, consider unnesting your data using PartiQL statements in Amazon Redshift. For more information, refer to the section Unnest the JSON document in the post Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB.

Incremental data load strategy

Complete the following steps to implement an incremental data load:

  1. Create a table called Orders in Amazon Redshift, which end-users will use for visualization and business analysis:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Next, you create a stored procedure called SP_Orders_Load to implement CDC from a streaming materialized view and load into the final Orders table. You use the combination of Kafka_Partition and Kafka_Offset available in the streaming materialized view as system columns to implement CDC. The combination of these two columns will always be unique within an MSK topic, which makes sure that none of the records are missed during the process. The stored procedure contains the following components:

  • To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level.
  • Refresh the streaming materialized view manually if auto refresh is not enabled.
  • Create an audit table called Orders_Streaming_Audit if it doesn’t exist to keep track of the last offset for a partition that was loaded into Orders table during the last run of the stored procedure.
  • Unnest and insert only new or changed data into a staging table called Orders_Staging_Table, reading from the streaming materialized view Orders_Stream_MV, where Kafka_Offset is greater than the last processed Kafka_Offset recorded in the audit table Orders_Streaming_Audit for the Kafka_Partition being processed.
  • When loading for the first time using this stored procedure, there will be no data in the Orders_Streaming_Audit table and all the data from Orders_Stream_MV will get loaded into the Orders table.
  • Insert only business-relevant columns to the user-facing Orders table, selecting from the staging table Orders_Staging_Table.
  • Insert the max Kafka_Offset for every loaded Kafka_Partition into the audit table Orders_Streaming_Audit

We have added the intermediate staging table Orders_Staging_Table in this solution to help with the debugging in case of unexpected failures and trackability. Skipping the staging step and directly loading into the final table from Orders_Stream_MV can provide lower latency depending on your use case.

  1. Create the stored procedure with the following code:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Run the stored procedure to load data into the Orders table:
    call SP_Orders_Load();

  3. Validate data in the Orders table.

Establish cross-account streaming ingestion

If your MSK cluster belongs to a different account, complete the following steps to create IAM roles to set up cross-account streaming ingestion. Let’s assume the Redshift cluster is in account A and the MSK cluster is in account B, as shown in the following diagram.

Complete the following steps:

  1. In account B, create an IAM role called MyRedshiftMSKRole that allows Amazon Redshift (account A) to communicate with the MSK cluster (account B) named MyTestCluster. Depending on whether your MSK cluster uses IAM authentication or unauthenticated access to connect, you need to create an IAM role with one of the following policies:
    • An IAM policAmazonAmazon MSK using unauthenticated access:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • An IAM policy for Amazon MSK when using IAM authentication:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

The resource section in the preceding example gives access to all topics in the MyTestCluster MSK cluster. If you need to restrict the IAM role to specific topics, you need to replace the topic resource with a more restrictive resource policy.

  1. After you create the IAM role in account B, take note of the IAM role ARN (for example, arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. In account A, create a Redshift customizable IAM role called MyRedshiftRole, that Amazon Redshift will assume when connecting to Amazon MSK. The role should have a policy like the following, which allows the Amazon Redshift IAM Role in account A to assume the Amazon MSK role in account B:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Take note of the role ARN for the Amazon Redshift IAM role (for example, arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Go back to account B and add this role in the trust policy of the IAM role arn:aws:iam::0123456789:role/MyRedshiftMSKRole to allow account B to trust the IAM role from account A. The trust policy should look like the following code:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Sign in to the Amazon Redshift console as account A.
  6. Launch the query editor v2 or your preferred SQL client and run the following statements to access the MSK topic in account B. To map to the MSK cluster, create an external schema using role chaining by specifying IAM role ARNs, separated by a comma without any spaces around it. The role attached to the Redshift cluster comes first in the chain.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Performance considerations

Keep in mind the following performance considerations:

  • Keep the streaming materialized view simple and move transformations like unnesting, aggregation, and case expressions to a later step—for example, by creating another materialized view on top of the streaming materialized view.
  • Consider creating only one streaming materialized view in a single Redshift cluster or workgroup for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic. Live streaming data in a streaming materialized view can be shared across multiple Redshift clusters or Redshift Serverless workgroups using data sharing.
  • While defining your streaming materialized view, avoid using Json_Extract_Path_Text to pre-shred data, because Json_extract_path_text operates on the data row by row, which significantly impacts ingestion throughput. It is preferable to land the data as is from the stream and then shred it later.
  • Where possible, consider skipping the sort key in the streaming materialized view to accelerate the ingestion speed. When a streaming materialized view has a sort key, a sort operation will occur with every batch of ingested data from the stream. Sorting has a performance overheard depending on the sort key data type, number of sort key columns, and amount of data ingested in each batch. This sorting step can increase the latency before the streaming data is available to query. You should weigh which is more important: latency on ingestion or latency on querying the data.
  • For optimized performance of the streaming materialized view and to reduce storage usage, occasionally purge data from the materialized view using delete, truncate, or alter table append.
  • If you need to ingest multiple MSK topics in parallel into Amazon Redshift, start with a smaller number of streaming materialized views and keep adding more materialized views to evaluate the overall ingestion performance within a cluster or workgroup.
  • Increasing the number of nodes in a Redshift provisioned cluster or the base RPU of a Redshift Serverless workgroup can help boost the ingestion performance of a streaming materialized view. For optimal performance, you should aim to have as many slices in your Redshift provisioned cluster as there are partitions in your MSK topic, or 8 RPU for every four partitions in your MSK topic.

Monitoring techniques

Records in the topic that exceed the size of the target materialized view column at the time of ingestion will be skipped. Records that are skipped by the materialized view refresh will be logged in the SYS_STREAM_SCAN_ERRORS system table.

Errors that occur when processing a record due to a calculation or a data type conversion or some other logic in the materialized view definition will result in the materialized view refresh failure until the offending record has expired from the topic. To avoid these types of issues, test the logic of your materialized view definition carefully; otherwise, land the records into the default VARBYTE column and process them later.

The following are available monitoring views:

  • SYS_MV_REFRESH_HISTORY – Use this view to gather information about the refresh history of your streaming materialized views. The results include the refresh type, such as manual or auto, and the status of the most recent refresh. The following query shows the refresh history for a streaming materialized view:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Use this view to check the reason why a record failed to load via streaming ingestion from an MSK topic. As of writing this post, when ingesting from Amazon MSK, this view only logs errors when the record is larger than the materialized view column size. This view will also show the unique identifier (offset) of the MSK record in the position column. The following query shows the error code and error reason when a record exceeded the maximum size limit:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Use this view to monitor the number of records scanned at a given record_time. This view also tracks the offset of the last record read in the batch. The following query shows topic data for a specific materialized view:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Use this view to check the overall metrics for a streaming materialized view refresh. This will also log errors in the error_message column for errors that don’t show up in SYS_STREAM_SCAN_ERRORS. The following query shows the error causing the refresh failure of a streaming materialized view:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Additional considerations for implementation

You have the choice to optionally generate a materialized view on top of a streaming materialized view, allowing you to unnest and precompute results for end-users. This approach eliminates the need to store the results in a final table using a stored procedure.

In this post, you use the CAN_JSON_PARSE function to guard against any errors to more successfully ingest data—in this case, the streaming records that can’t be parsed are skipped by Amazon Redshift. However, if you want to keep track of your error records, consider storing them in a column using the following SQL when creating the streaming materialized view:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

You can also consider unloading data from the view SYS_STREAM_SCAN_ERRORS into an Amazon Simple Storage Service (Amazon S3) bucket and get alerts by sending a report via email using Amazon Simple Notification Service (Amazon SNS) notifications whenever a new S3 object is created.

Lastly, based on your data freshness requirement, you can use Amazon EventBridge to schedule the jobs in your data warehouse to call the aforementioned SP_Orders_Load stored procedure on a regular basis. EventBridge does this at fixed intervals, and you may need to have a mechanism (for example, an AWS Step Functions state machine) to monitor if the previous call to the procedure completed. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. You can also refer to Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API. Another option is to use Amazon Redshift query editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

Conclusion

In this post, we discussed best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK. We showed you an example pipeline to ingest data from an MSK topic into Amazon Redshift using streaming ingestion. We also showed a reliable strategy to perform incremental streaming data load into Amazon Redshift using Kafka Partition and Kafka Offset. Additionally, we demonstrated the steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift and discussed performance considerations for optimized ingestion rate. Lastly, we discussed monitoring techniques to track failures in Amazon Redshift streaming ingestion.

If you have any questions, leave them in the comments section.


About the Authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Adekunle Adedotun is a Sr. Database Engineer with Amazon Redshift service. He has been working on MPP databases for 6 years with a focus on performance tuning. He also provides guidance to the development team for new and existing service features.

Simplify data streaming ingestion for analytics using Amazon MSK and Amazon Redshift

Post Syndicated from Sebastian Vlad original https://aws.amazon.com/blogs/big-data/simplify-data-streaming-ingestion-for-analytics-using-amazon-msk-and-amazon-redshift/

Towards the end of 2022, AWS announced the general availability of real-time streaming ingestion to Amazon Redshift for Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), eliminating the need to stage streaming data in Amazon Simple Storage Service (Amazon S3) before ingesting it into Amazon Redshift.

Streaming ingestion from Amazon MSK into Amazon Redshift, represents a cutting-edge approach to real-time data processing and analysis. Amazon MSK serves as a highly scalable, and fully managed service for Apache Kafka, allowing for seamless collection and processing of vast streams of data. Integrating streaming data into Amazon Redshift brings immense value by enabling organizations to harness the potential of real-time analytics and data-driven decision-making.

This integration enables you to achieve low latency, measured in seconds, while ingesting hundreds of megabytes of streaming data per second into Amazon Redshift. At the same time, this integration helps make sure that the most up-to-date information is readily available for analysis. Because the integration doesn’t require staging data in Amazon S3, Amazon Redshift can ingest streaming data at a lower latency and without intermediary storage cost.

You can configure Amazon Redshift streaming ingestion on a Redshift cluster using SQL statements to authenticate and connect to an MSK topic. This solution is an excellent option for data engineers that are looking to simplify data pipelines and reduce the operational cost.

In this post, we provide a complete overview on how to configure Amazon Redshift streaming ingestion from Amazon MSK.

Solution overview

The following architecture diagram describes the AWS services and features you will be using.

architecture diagram describing the AWS services and features you will be using

The workflow includes the following steps:

  1. You start with configuring an Amazon MSK Connect source connector, to create an MSK topic, generate mock data, and write it to the MSK topic. For this post, we work with mock customer data.
  2. The next step is to connect to a Redshift cluster using the Query Editor v2.
  3. Finally, you configure an external schema and create a materialized view in Amazon Redshift, to consume the data from the MSK topic. This solution does not rely on an MSK Connect sink connector to export the data from Amazon MSK to Amazon Redshift.

The following solution architecture diagram describes in more detail the configuration and integration of the AWS services you will be using.
solution architecture diagram describing in more detail the configuration and integration of the AWS services you will be using
The workflow includes the following steps:

  1. You deploy an MSK Connect source connector, an MSK cluster, and a Redshift cluster within the private subnets on a VPC.
  2. The MSK Connect source connector uses granular permissions defined in an AWS Identity and Access Management (IAM) in-line policy attached to an IAM role, which allows the source connector to perform actions on the MSK cluster.
  3. The MSK Connect source connector logs are captured and sent to an Amazon CloudWatch log group.
  4. The MSK cluster uses a custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  5. The MSK cluster logs are captured and sent to an Amazon CloudWatch log group.
  6. The Redshift cluster uses granular permissions defined in an IAM in-line policy attached to an IAM role, which allows the Redshift cluster to perform actions on the MSK cluster.
  7. You can use the Query Editor v2 to connect to the Redshift cluster.

Prerequisites

To simplify the provisioning and configuration of the prerequisite resources, you can use the following AWS CloudFormation template:

Complete the following steps when launching the stack:

  1. For Stack name, enter a meaningful name for the stack, for example, prerequisites.
  2. Choose Next.
  3. Choose Next.
  4. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Submit.

The CloudFormation stack creates the following resources:

  • A VPC custom-vpc, created across three Availability Zones, with three public subnets and three private subnets:
    • The public subnets are associated with a public route table, and outbound traffic is directed to an internet gateway.
    • The private subnets are associated with a private route table, and outbound traffic is sent to a NAT gateway.
  • An internet gateway attached to the Amazon VPC.
  • A NAT gateway that is associated with an elastic IP and is deployed in one of the public subnets.
  • Three security groups:
    • msk-connect-sg, which will be later associated with the MSK Connect connector.
    • redshift-sg, which will be later associated with the Redshift cluster.
    • msk-cluster-sg, which will be later associated with the MSK cluster. It allows inbound traffic from msk-connect-sg, and redshift-sg.
  • Two CloudWatch log groups:
    • msk-connect-logs, to be used for the MSK Connect logs.
    • msk-cluster-logs, to be used for the MSK cluster logs.
  • Two IAM Roles:
    • msk-connect-role, which includes granular IAM permissions for MSK Connect.
    • redshift-role, which includes granular IAM permissions for Amazon Redshift.
  • A custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  • An MSK cluster, with three brokers deployed across the three private subnets of custom-vpc. The msk-cluster-sg security group and the custom-msk-cluster-configuration configuration are applied to the MSK cluster. The broker logs are delivered to the msk-cluster-logs CloudWatch log group.
  • A Redshift cluster subnet group, which is using the three private subnets of custom-vpc.
  • A Redshift cluster, with one single node deployed in a private subnet within the Redshift cluster subnet group. The redshift-sg security group and redshift-role IAM role are applied to the Redshift cluster.

Create an MSK Connect custom plugin

For this post, we use an Amazon MSK data generator deployed in MSK Connect, to generate mock customer data, and write it to an MSK topic.

Complete the following steps:

  1. Download the Amazon MSK data generator JAR file with dependencies from GitHub.
    awslabs github page for downloading the jar file of the amazon msk data generator
  2. Upload the JAR file into an S3 bucket in your AWS account.
    amazon s3 console image showing the uploaded jar file in an s3 bucket
  3. On the Amazon MSK console, choose Custom plugins under MSK Connect in the navigation pane.
  4. Choose Create custom plugin.
  5. Choose Browse S3, search for the Amazon MSK data generator JAR file you uploaded to Amazon S3, then choose Choose.
  6. For Custom plugin name, enter msk-datagen-plugin.
  7. Choose Create custom plugin.

When the custom plugin is created, you will see that its status is Active, and you can move to the next step.
amazon msk console showing the msk connect custom plugin being successfully created

Create an MSK Connect connector

Complete the following steps to create your connector:

  1. On the Amazon MSK console, choose Connectors under MSK Connect in the navigation pane.
  2. Choose Create connector.
  3. For Custom plugin type, choose Use existing plugin.
  4. Select msk-datagen-plugin, then choose Next.
  5. For Connector name, enter msk-datagen-connector.
  6. For Cluster type, choose Self-managed Apache Kafka cluster.
  7. For VPC, choose custom-vpc.
  8. For Subnet 1, choose the private subnet within your first Availability Zone.

For the custom-vpc created by the CloudFormation template, we are using odd CIDR ranges for public subnets, and even CIDR ranges for the private subnets:

    • The CIDRs for the public subnets are 10.10.1.0/24, 10.10.3.0/24, and 10.10.5.0/24
    • The CIDRs for the private subnets are 10.10.2.0/24, 10.10.4.0/24, and 10.10.6.0/24
  1. For Subnet 2, select the private subnet within your second Availability Zone.
  2. For Subnet 3, select the private subnet within your third Availability Zone.
  3. For Bootstrap servers, enter the list of bootstrap servers for TLS authentication of your MSK cluster.

To retrieve the bootstrap servers for your MSK cluster, navigate to the Amazon MSK console, choose Clusters, choose msk-cluster, then choose View client information. Copy the TLS values for the bootstrap servers.

  1. For Security groups, choose Use specific security groups with access to this cluster, and choose msk-connect-sg.
  2. For Connector configuration, replace the default settings with the following:
connector.class=com.amazonaws.mskdatagen.GeneratorSourceConnector
tasks.max=2
genkp.customer.with=#{Code.isbn10}
genv.customer.name.with=#{Name.full_name}
genv.customer.gender.with=#{Demographic.sex}
genv.customer.favorite_beer.with=#{Beer.name}
genv.customer.state.with=#{Address.state}
genkp.order.with=#{Code.isbn10}
genv.order.product_id.with=#{number.number_between '101','109'}
genv.order.quantity.with=#{number.number_between '1','5'}
genv.order.customer_id.matching=customer.key
global.throttle.ms=2000
global.history.records.max=1000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
  1. For Connector capacity, choose Provisioned.
  2. For MCU count per worker, choose 1.
  3. For Number of workers, choose 1.
  4. For Worker configuration, choose Use the MSK default configuration.
  5. For Access permissions, choose msk-connect-role.
  6. Choose Next.
  7. For Encryption, select TLS encrypted traffic.
  8. Choose Next.
  9. For Log delivery, choose Deliver to Amazon CloudWatch Logs.
  10. Choose Browse, select msk-connect-logs, and choose Choose.
  11. Choose Next.
  12. Review and choose Create connector.

After the custom connector is created, you will see that its status is Running, and you can move to the next step.
amazon msk console showing the msk connect connector being successfully created

Configure Amazon Redshift streaming ingestion for Amazon MSK

Complete the following steps to set up streaming ingestion:

  1. Connect to your Redshift cluster using Query Editor v2, and authenticate with the database user name awsuser, and password Awsuser123.
  2. Create an external schema from Amazon MSK using the following SQL statement.

In the following code, enter the values for the redshift-role IAM role, and the msk-cluster cluster ARN.

CREATE EXTERNAL SCHEMA msk_external_schema
FROM MSK
IAM_ROLE '<insert your redshift-role arn>'
AUTHENTICATION iam
CLUSTER_ARN '<insert your msk-cluster arn>';
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create an external schema from amazon msk

  1. Create a materialized view using the following SQL statement:
CREATE MATERIALIZED VIEW msk_mview AUTO REFRESH YES AS
SELECT
    "kafka_partition",
    "kafka_offset",
    "kafka_timestamp_type",
    "kafka_timestamp",
    "kafka_key",
    JSON_PARSE(kafka_value) as Data,
    "kafka_headers"
FROM
    "dev"."msk_external_schema"."customer"
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create a materialized view

  1. You can now query the materialized view using the following SQL statement:
select * from msk_mview LIMIT 100;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the materialized view

  1. To monitor the progress of records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_STATES monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_STATES;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the sys stream scan states monitoring view

  1. To monitor errors encountered on records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_ERRORS monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_ERRORS;
  1. Choose Run to run the SQL statement.redshift query editor v2 showing the SQL statement used to query the sys stream scan errors monitoring view

Clean up

After following along, if you no longer need the resources you created, delete them in the following order to prevent incurring additional charges:

  1. Delete the MSK Connect connector msk-datagen-connector.
  2. Delete the MSK Connect plugin msk-datagen-plugin.
  3. Delete the Amazon MSK data generator JAR file you downloaded, and delete the S3 bucket you created.
  4. After you delete your MSK Connect connector, you can delete the CloudFormation template. All the resources created by the CloudFormation template will be automatically deleted from your AWS account.

Conclusion

In this post, we demonstrated how to configure Amazon Redshift streaming ingestion from Amazon MSK, with a focus on privacy and security.

The combination of the ability of Amazon MSK to handle high throughput data streams with the robust analytical capabilities of Amazon Redshift empowers business to derive actionable insights promptly. This real-time data integration enhances the agility and responsiveness of organizations in understanding changing data trends, customer behaviors, and operational patterns. It allows for timely and informed decision-making, thereby gaining a competitive edge in today’s dynamic business landscape.

This solution is also applicable for customers that are looking to use Amazon MSK Serverless and Amazon Redshift Serverless.

We hope this post was a good opportunity to learn more about AWS service integration and configuration. Let us know your feedback in the comments section.


About the authors

Sebastian Vlad is a Senior Partner Solutions Architect with Amazon Web Services, with a passion for data and analytics solutions and customer success. Sebastian works with enterprise customers to help them design and build modern, secure, and scalable solutions to achieve their business outcomes.

Sharad Pai is a Lead Technical Consultant at AWS. He specializes in streaming analytics and helps customers build scalable solutions using Amazon MSK and Amazon Kinesis. He has over 16 years of industry experience and is currently working with media customers who are hosting live streaming platforms on AWS, managing peak concurrency of over 50 million. Prior to joining AWS, Sharad’s career as a lead software developer included 9 years of coding, working with open source technologies like JavaScript, Python, and PHP.

Secure connectivity patterns for Amazon MSK Serverless cross-account access

Post Syndicated from Tamer Soliman original https://aws.amazon.com/blogs/big-data/secure-connectivity-patterns-for-amazon-msk-serverless-cross-account-access/

Amazon MSK Serverless is a cluster type of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that makes it straightforward for you to run Apache Kafka without having to manage and scale cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain on a usage basis.

Deploying infrastructure across multiple VPCs and multiple accounts is considered best practice, facilitating scalability while maintaining isolation boundaries. In a multi-account environment, Kafka producers and consumers can exist within the same VPC—however, they are often located in different VPCs, sometimes within the same account, in a different account, or even in multiple different accounts. There is a need for a solution that can extend access to MSK Serverless clusters to producers and consumers from multiple VPCs within the same AWS account and across multiple AWS accounts. The solution needs to be scalable and straightforward to maintain.

In this post, we walk you through multiple solution approaches that address the MSK Serverless cross-VPC and cross-account access connectivity options, and we discuss the advantages and limitations of each approach.

MSK Serverless connectivity and authentication

When an MSK Serverless cluster is created, AWS manages the cluster infrastructure on your behalf and extends private connectivity back to your VPCs through VPC endpoints powered by AWS PrivateLink. You bootstrap your connection to the cluster through a bootstrap server that holds a record of all your underlying brokers.

At creation, a fully qualified domain name (FQDN) is assigned to your cluster bootstrap server. The bootstrap server FQDN has the general format of boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and your cluster brokers follow the format of bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, where ClusterUniqueID.xx is unique to your cluster and bxxxx is a dynamic broker range (b0001, b0037, and b0523 can be some of your assigned brokers at a point of time). It’s worth noting that the brokers assigned to your cluster are dynamic and change over time, but your bootstrap address remains the same for the cluster. All your communication with the cluster starts with the bootstrap server that can respond with the list of active brokers when required. For proper Kafka communication, your MSK client needs to be able to resolve the domain names of your bootstrap server as well as all your brokers.

At cluster creation, you specify the VPCs that you would like the cluster to communicate with (up to five VPCs in the same account as your cluster). For each VPC specified during cluster creation, cluster VPC endpoints are created along with a private hosted zone that includes a list of your bootstrap server and all dynamic brokers kept up to date. The private hosted zones facilitate resolving the FQDNs of your bootstrap server and brokers, from within the associated VPCs defined during cluster creation, to the respective VPC endpoints for each.

Cross-account access

To be able to extend private connectivity of your Kafka producers and consumers to your MSK Serverless cluster, you need to consider three main aspects: private connectivity, authentication and authorization, and DNS resolution.

The following diagram highlights the possible connectivity options. Although the diagram shows them all here for demonstration purposes, in most cases, you would use one or more of these options depending on your architecture, not necessary all in the same setup.

MSK cross account connectivity options

In this section, we discuss the different connectivity options along with their pros and cons. We also cover the authentication and DNS resolution aspects associated with the relevant connectivity options.

Private connectivity layer

This is the underlying private network connectivity. You can achieve this connectivity using VPC peering, AWS Transit Gateway, or PrivateLink, as indicated in the preceding diagram. VPC peering simplifies the setup, but it lacks the support for transitive routing. In most cases, peering is used when you have a limited number of VPCs or if your VPCs generally communicate with some limited number of core services VPCs without the need of lateral connectivity or transitive routing. On the other hand, AWS Transit Gateway facilitates transitive routing and can simplify the architecture when you have a large number of VPCs, and especially when lateral connectivity is required. PrivateLink is more suited for extending connectivity to a specific resource unidirectionally across VPCs or accounts without exposing full VPC-to-VPC connectivity, thereby adding a layer of isolation. PrivateLink is useful if you have overlapping CIDRs, which is a case that is not supported by Transit Gateway or VPC peering. PrivateLink is also useful when your connected parties are administrated separately, and when one-way connectivity and isolation are required.

If you choose PrivateLink as a connectivity option, you need to use a Network Load Balancer (NLB) with an IP type target group with its registered targets set as the IP addresses of the zonal endpoints of your MSK Serverless cluster.

Cluster authentication and authorization

In addition to having private connectivity and being able to resolve the bootstrap server and brokers domain names, for your producers and consumers to have access to your cluster, you need to configure your clients with proper credentials. MSK Serverless supports AWS Identity and Access Management (IAM) authentication and authorization. For cross-account access, your MSK client needs to assume a role that has proper credentials to access the cluster. This post focuses mainly on the cross-account connectivity and name resolution aspects. For more details on cross-account authentication and authorization, refer to the following GitHub repo.

DNS resolution

For Kafka producers and consumers located in accounts across the organization to be able to produce and consume to and from the centralized MSK Serverless cluster, they need to be able to resolve the FQDNs of the cluster bootstrap server as well as each of the cluster brokers. Understanding the dynamic nature of broker allocation, the solution will have to accommodate such a requirement. In the next section, we address how we can satisfy this part of the requirements.

Cluster cross-account DNS resolution

Now that we have discussed how MSK Serverless works, how private connectivity is extended, and the authentication and authorization requirements, let’s discuss how DNS resolution works for your cluster.

For every VPC associated with your cluster during cluster creation, a VPC endpoint is created along with a private hosted zone. Private hosted zones enable name resolve of the FQDNs of the cluster bootstrap server and the dynamically allocated brokers, from within each respective VPC. This works well when requests come from within any of the VPCs that were added during cluster creation because they already have the required VPC endpoints and relevant private hosted zones.

Let’s discuss how you can extend name resolution to other VPCs within the same account that were not included during cluster creation, and to others that may be located in other accounts.

You’ve already made your choice of the private connectivity option that best fits your architecture requirements, be it VPC peering, PrivateLink, or Transit Gateway. Assuming that you have also configured your MSK clients to assume roles that have the right IAM credentials in order to facilitate cluster access, you now need to address the name resolution aspect of connectivity. It’s worth noting that, although we list different connectivity options using VPC peering, Transit Gateway, and PrivateLink, in most cases only one or two of these connectivity options are present. You don’t necessarily need to have them all; they are listed here to demonstrate your options, and you are free to choose the ones that best fit your architecture and requirements.

In the following sections, we describe two different methods to address DNS resolution. For each method, there are advantages and limitations.

Private hosted zones

The following diagram highlights the solution architecture and its components. Note that, to simplify the diagram, and to make room for more relevant details required in this section, we have eliminated some of the connectivity options.

Cross-account access using Private Hosted Zones

The solution starts with creating a private hosted zone, followed by creating a VPC association.

Create a private hosted zone

We start by creating a private hosted zone for name resolution. To make the solution scalable and straightforward to maintain, you can choose to create this private hosted zone in the same MSK Serverless cluster account; in some cases, creating the private hosted zone in a centralized networking account is preferred. Having the private hosted zone created in the MSK Serverless cluster account facilitates centralized management of the private hosted zone alongside the MSK cluster. We can then associate the centralized private hosted zone with VPCs within the same account, or in different other accounts. Choosing to centralize your private hosted zones in a networking account can also be a viable solution to consider.

The purpose of the private hosted zone is to be able to resolve the FQDNs of the bootstrap server as well as all the dynamically assigned cluster-associated brokers. As discussed earlier, the bootstrap server FQDN format is boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and the cluster brokers use the format bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, with bxxxx being the broker ID. You need to create the new private hosted zone with the primary domain set as kafka-serverless.Region.amazonaws.com, with an A-Alias record called *.kafka-serverless.Region.amazonaws.com pointing to the Regional VPC endpoint of the MSK Serverless cluster in the MSK cluster VPC. This should be sufficient to direct all traffic targeting your cluster to the primary cluster VPC endpoints that you specified in your private hosted zone.

Now that you have created the private hosted zone, for name resolution to work, you need to associate the private hosted zone with every VPC where you have clients for the MSK cluster (producer or consumer).

Associate a private hosted zone with VPCs in the same account

For VPCs that are in the same account as the MSK cluster and weren’t included in the configuration during cluster creation, you can associate them to the private hosted zone created using the AWS Management Console by editing the private hosted zone settings and adding the respective VPCs. For more information, refer to Associating more VPCs with a private hosted zone.

Associate a private hosted zone in cross-account VPCs

For VPCs that are in a different account other than the MSK cluster account, refer to Associating an Amazon VPC and a private hosted zone that you created with different AWS accounts. The key steps are as follows:

  1. Create a VPC association authorization in the account where the private hosted zone is created (in this case, it’s the same account as the MSK Serverless cluster account) to authorize the remote VPCs to be associated with the hosted zone:
aws route53 create-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID
  1. Associate the VPC with the private hosted zone in the account where you have the VPCs with the MSK clients (remote account), referencing the association authorization you created earlier:
aws route53 list-vpc-association-authorizations --hosted-zone-id HostedZoneID
aws route53 associate-vpc-with-hosted-zone --hosted-zone-id HostedZoneID --VPC VPCRegion=Region,VPCId=vpc-ID
  1. Delete the VPC authorization to associate the VPC with the hosted zone:
aws route53 delete-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID

Deleting the authorization doesn’t affect the association, it just prevents you from re-associating the VPC with the hosted zone in the future. If you want to re-associate the VPC with the hosted zone, you’ll need to repeat steps 1 and 2 of this procedure.

Note that your VPC needs to have the enableDnsSupport and enableDnsHostnames DNS attributes enabled for this to work. These two settings can be configured under the VPC DNS settings. For more information, refer to DNS attributes in your VPC.

These procedures work well for all remote accounts when connectivity is extended using VPC peering or Transit Gateway. If your connectivity option uses PrivateLink, the private hosted zone needs to be created in the remote account instead (the account where the PrivateLink VPC endpoints are). In addition, an A-Alias record that resolves to the PrivateLink endpoint instead of the MSK cluster endpoint needs to be created as indicated in the earlier diagram. This will facilitate name resolution to the PrivateLink endpoint. If other VPCs need access to the cluster through that same PrivateLink setup, you need to follow the same private hosted zone association procedure as described earlier and associate your other VPCs with the private hosted zone created for your PrivateLink VPC.

Limitations

The private hosted zones solution has some key limitations.

Firstly, because you’re using kafka-serverless.Region.amazonaws.com as the primary domain for our private hosted zone, and your A-Alias record uses *.kafka-serverless.Region.amazonaws.com, all traffic to the MSK Serverless service originating from any VPC associated with this private hosted zone will be directed to the one specific cluster VPC Regional endpoint that you specified in the hosted zone A-Alias record.

This solution is valid if you have one MSK Serverless cluster in your centralized service VPC. If you need to provide access to multiple MSK Serverless clusters, you can use the same solution but adapt a distributed private hosted zone approach as opposed to a centralized approach. In a distributed private hosted zone approach, each private hosted zone can point to a specific cluster. The VPCs associated with that specific private hosted zone will communicate only to the respective cluster listed under the specific private hosted zone.

In addition, after you establish a VPC association with a private hosted zone resolving *.kafka-serverless.Region.amazonaws.com, the respective VPC will only be able to communicate with the cluster defined in that specific private hosted zone and no other cluster. An exception to this rule is if a local cluster is created within the same client VPC, in which case the clients within the VPC will only be able to communicate with only the local cluster.

You can also use PrivateLink to accommodate multiple clusters by creating a PrivateLink plus private hosted zone per cluster, replicating the configuration steps described earlier.

Both solutions, using distributed private hosted zones or PrivateLink, are still subject to the limitation that for each client VPC, you can only communicate with the one MSK Serverless cluster that your associated private hosted zone is configured for.

In the next section, we discuss another possible solution.

Resolver rules and AWS Resource Access Manager

The following diagram shows a high-level overview of the solution using Amazon Route 53 resolver rules and AWS Resource Access Manager.

Cross-account access using Resolver Rules and Resolver Endpoints

The solution starts with creating Route 53 inbound and outbound resolver endpoints, which are associated with the MSK cluster VPC. Then you create a resolver forwarding rule in the MSK account that is not associated with any VPC. Next, you share the resolver rule across accounts using Resource Access Manager. At the remote account where you need to extend name resolution to, you need to accept the resource share and associate the resolver rules with your target VPCs located in the remote account (the account where the MSK clients are located).

For more information about this approach, refer to the third use case in Simplify DNS management in a multi-account environment with Route 53 Resolver.

This solution accommodates multiple centralized MSK serverless clusters in a more scalable and flexible approach. Therefore, the solution counts on directing DNS requests to be resolved by the VPC where the MSK clusters are. Multiple MSK Serverless clusters can coexist, where clients in a particular VPC can communicate with one or more of them at the same time. This option is not supported with the private hosted zone solution approach.

Limitations

Although this solution has its advantages, it also has a few limitations.

Firstly, for a particular target consumer or producer account, all your MSK Serverless clusters need to be in the same core service VPC in the MSK account. This is due to the fact that the resolver rule is set on an account level and uses.kafka-serverless.Region.amazonaws.com as the primary domain, directing its resolution to one specific VPC resolver endpoint inbound/outbound pair within that service VPC. If you need to have separate clusters in different VPCs, consider creating separate accounts.

The second limitation is that all your client VPCs need to be in the same Region as your core MSK Serverless VPC. The reason behind this limitation is that resolver rules pointing to a resolver endpoint pair (in reality, they point to the outbound endpoint that loops into the inbound endpoints) need to be in the same Region as the resolver rules, and Resource Access Manager will extend the share only within the same Region. However, this solution is good when you have multiple MSK clusters in the same core VPC, and although your remote clients are in different VPCs and accounts, they are still within the same Region. A workaround for this limitation is to duplicate the creation of resolver rules and outbound resolver endpoint in a second Region, where the outbound endpoint loops back through the original first Region inbound resolver endpoint associated with the MSK Serverless cluster VPC (assuming IP connectivity is facilitated). This second Region resolver rule can then be shared using Resource Access Manager within the second Region.

Conclusion

You can configure MSK Serverless cross-VPC and cross-account access in multi-account environments using private hosted zones or Route 53 resolver rules. The solution discussed in this post allows you to centralize your configuration while extending cross-account access, making it a scalable and straightforward-to-maintain solution. You can create your MSK Serverless clusters with cross-account access for producers and consumers, keep your focus on your business outcomes, and gain insights from sources of data across your organization without having to right-size and manage a Kafka infrastructure.


About the Author

Tamer Soliman is a Senior Solutions Architect at AWS. He helps Independent Software Vendor (ISV) customers innovate, build, and scale on AWS. He has over two decades of industry experience, and is an inventor with three granted patents. His experience spans multiple technology domains including telecom, networking, application integration, data analytics, AI/ML, and cloud deployments. He specializes in AWS Networking and has a profound passion for machine leaning, AI, and Generative AI.