Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

Building a scalable streaming data platform that enables real-time and batch analytics of electric vehicles on AWS

Post Syndicated from Ayush Agrawal original https://aws.amazon.com/blogs/big-data/building-a-scalable-streaming-data-platform-that-enables-real-time-and-batch-analytics-of-electric-vehicles-on-aws/

The automobile industry has undergone a remarkable transformation because of the increasing adoption of electric vehicles (EVs). EVs, known for their sustainability and eco-friendliness, are paving the way for a new era in transportation. As environmental concerns and the push for greener technologies have gained momentum, the adoption of EVs has surged, promising to reshape our mobility landscape.

The surge in EVs brings with it a profound need for data acquisition and analysis to optimize their performance, reliability, and efficiency. In the rapidly evolving EV industry, the ability to harness, process, and derive insights from the massive volume of data generated by EVs has become essential for manufacturers, service providers, and researchers alike.

As the EV market is expanding with many new and incumbent players trying to capture the market, the major differentiating factor will be the performance of the vehicles.

Modern EVs are equipped with an array of sensors and systems that continuously monitor various aspects of their operation including parameters such as voltage, temperature, vibration, speed, and so on. From battery management to motor performance, these data-rich machines provide a wealth of information that, when effectively captured and analyzed, can revolutionize vehicle design, enhance safety, and optimize energy consumption. The data can be used to do predictive maintenance, device anomaly detection, real-time customer alerts, remote device management, and monitoring.

However, managing this deluge of data isn’t without its challenges. As the adoption of EVs accelerates, the need for robust data pipelines capable of collecting, storing, and processing data from an exponentially growing number of vehicles becomes more pronounced. Moreover, the granularity of data generated by each vehicle has increased significantly, making it essential to efficiently handle the ever-increasing number of data points. The challenges include not only the technical intricacies of data management but also concerns related to data security, privacy, and compliance with evolving regulations.

In this blog post, we delve into the intricacies of building a reliable data analytics pipeline that can scale to accommodate millions of vehicles, each generating hundreds of metrics every second using Amazon OpenSearch Ingestion. We also provide guidelines and sample configurations to help you implement a solution.

Of the prerequisites that follow, the IOT topic rule and the Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster can be set up by following How to integrate AWS IoT Core with Amazon MSK. The steps to create an Amazon OpenSearch Service cluster are available in Creating and managing Amazon OpenSearch Service domains.

Prerequisites

Before you begin the implementing the solution, you need the following:

  • IOT topic rule
  • Amazon MSK Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM) cluster
  • Amazon OpenSearch Service domain

Solution overview

The following architecture diagram provides a scalable and fully managed modern data streaming platform. The architecture uses Amazon OpenSearch Ingestion to stream data into OpenSearch Service and Amazon Simple Storage Service (Amazon S3) to store the data. The data in OpenSearch powers real-time dashboards. The data can also be used to notify customers of any failures occurring on the vehicle (see Configuring alerts in Amazon OpenSearch Service). The data in Amazon S3 is used for business intelligence and long-term storage.

Architecture diagram

In the following sections, we focus on the following three critical pieces of the architecture in depth:

1. Amazon MSK to OpenSearch ingestion pipeline

2. Amazon OpenSearch Ingestion pipeline to OpenSearch Service

3. Amazon OpenSearch Ingestion to Amazon S3

Solution Walkthrough

Step 1: MSK to Amazon OpenSearch Ingestion pipeline

Because each electric vehicle streams massive volumes of data to Amazon MSK clusters through AWS IoT Core, making sense of this data avalanche is critical. OpenSearch Ingestion provides a fully managed serverless integration to tap into these data streams.

The Amazon MSK source in OpenSearch Ingestion uses Kafka’s Consumer API to read records from one or more MSK topics. The MSK source in OpenSearch Ingestion seamlessly connects to MSK to ingest the streaming data into OpenSearch Ingestion’s processing pipeline.

The following snippet illustrates the pipeline configuration for an OpenSearch Ingestion pipeline used to ingest data from an MSK cluster.

While creating an OpenSearch Ingestion pipeline, add the following snippet in the Pipeline configuration section.

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                  
      topics: 
         - name: "ev-device-topic " 
           group_id: "opensearch-consumer" 
           serde_format: json                 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam:: ::<<account-id>>:role/opensearch-pipeline-Role"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:<<region>>:<<account-id>>:cluster/<<name>>/<<id>>" 

When configuring Amazon MSK and OpenSearch Ingestion, it’s essential to establish an optimal relationship between the number of partitions in your Kafka topics and the number of OpenSearch Compute Units (OCUs) allocated to your ingestion pipelines. This optimal configuration ensures efficient data processing and maximizes throughput. You can read more about it in Configure recommended compute units (OCUs) for the Amazon MSK pipeline.

Step 2: OpenSearch Ingestion pipeline to OpenSearch Service

OpenSearch Ingestion offers a direct method for streaming EV data into OpenSearch. The OpenSearch sink plugin channels data from multiple sources directly into the OpenSearch domain. Instead of manually provisioning the pipeline, you define the capacity for your pipeline using OCUs. Each OCU provides 6 GB of memory and two virtual CPUs. To use OpenSearch Ingestion auto-scaling optimally, it’s essential to configure the maximum number of OCUs for a pipeline based on the number of partitions in the topics being ingested. If a topic has a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), it’s recommended to configure the pipeline with a maximum of 1–96 OCUs. This way, the pipeline can automatically scale up or down within this range as needed. However, if a topic has a low number of partitions (for example, fewer than 96), it’s advisable to set the maximum number of OCUs to be equal to the number of partitions. This approach ensures that each partition is processed by a dedicated OCU enabling parallel processing and optimal performance. In scenarios where a pipeline ingests data from multiple topics, the topic with the highest number of partitions should be used as a reference to configure the maximum OCUs. Additionally, if higher throughput is required, you can create another pipeline with a new set of OCUs for the same topic and consumer group, enabling near-linear scalability.

OpenSearch Ingestion provides several pre-defined configuration blueprints that can help you quickly build your ingestion pipeline on AWS

The following snippet illustrates pipeline configuration for an OpenSearch Ingestion pipeline using OpenSearch as a SINK with a dead letter queue (DLQ) to Amazon S3. When a pipeline encounters write errors, it creates DLQ objects in the configured S3 bucket. DLQ objects exist within a JSON file as an array of failed events.

sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<domain-name>>.<<region>>.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam:: <<account-id>>:role/<<role-name>>"

Step 3: OpenSearch Ingestion to Amazon S3

OpenSearch Ingestion offers a built-in sink for loading streaming data directly into S3. The service can compress, partition, and optimize the data for cost-effective storage and analytics in Amazon S3. Data loaded into S3 can be partitioned for easier query isolation and lifecycle management. Partitions can be based on vehicle ID, date, geographic region, or other dimensions as needed for your queries.

The following snippet illustrates how we’ve partitioned and stored EV data in Amazon S3.

- s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "evbucket"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

The pipeline can be created following the steps in Creating Amazon OpenSearch Ingestion pipelines.

The following is the complete pipeline configuration, combining the configuration of all three steps. Update the Amazon Resource Names (ARNs), AWS Region, Open Search Service domain endpoint, and S3 names as needed.

The entire OpenSearch Ingestion pipeline configuration can be directly copied into the ‘Pipeline configuration’ field in the AWS Management Console while creating the OpenSearch Ingestion pipeline

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true           # Default is false  
      topics: 
         - name: "<<msk-topic-name>>" 
           group_id: "opensearch-consumer" 
           serde_format: json        
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-east-1:<<account-id>>:cluster/<<cluster-name>>/<<cluster-id>>" 
  processor:
      - parse_json:
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<opensearch-service-domain-endpoint>>.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
      - s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "<<bucket-name>>"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

Real-time analytics

After the data is available in OpenSearch Service, you can build real-time monitoring and notifications. OpenSearch Service has robust support for multiple notification channels, allowing you to receive alerts through services like Slack, Chime, custom webhooks, Microsoft Teams, email, and Amazon Simple Notification Service (Amazon SNS).

The following screenshot illustrates supported notification channels in OpenSearch Service.

The notification feature in OpenSearch Service allows you to create monitors that will watch for certain conditions or changes in your data and launch alerts, such as monitoring vehicle telemetry data and launching alerts for issues like battery degradation or abnormal energy consumption. For example, you can create a monitor that analyzes battery capacity over time and notifies the on-call team using Slack if capacity drops below expected degradation curves in a significant number of vehicles. This could indicate a potential manufacturing defect requiring investigation.

In addition to notifications, OpenSearch Service makes it easy to build real-time dashboards to visually track metrics across your fleet of vehicles. You can ingest vehicle telemetry data like location, speed, fuel consumption, and so on, and visualize it on maps, charts, and gauges. Dashboards can provide real-time visibility into vehicle health and performance.

The following screenshot illustrates creating a sample dashboard on OpenSearch Service

Opensearch Dashboard

A key benefit of OpenSearch Service is its ability to handle high sustained ingestion and query rates with millisecond latencies. It distributes incoming vehicle data across data nodes in a cluster for parallel processing. This allows OpenSearch to scale out to handle very large fleets while still delivering the real-time performance needed for operational visibility and alerting.

Batch analytics

After the data is available in Amazon S3, you can build a secure data lake to power a variety of analytics use cases deriving powerful insights. As an immutable store, new data is continually stored in S3 while existing data remains unaltered. This serves as a single source of truth for downstream analytics.

For business intelligence and reporting, you can analyze trends, identify insights, and create rich visualizations powered by the data lake. You can use Amazon QuickSight to build and share dashboards without needing to set up servers or infrastructure. Here’s an example of a Quicksight dashboard for IoT device data. For example, you can use a dashboard to gain insights from historical data that can help with better vehicle and battery design.

The Amazon Quicksight public gallery shows examples of dashboards across different domains.

You should consider Amazon OpenSearch dashboards for your operational day-to-day use cases to identify issues and alert in near real time whereas Amazon Quicksight should be used to analyze big data stored in a lake house and generate actionable insights from them.

Clean up

Delete the OpenSearch pipeline and Amazon MSK cluster to stop incurring costs on these services.

Conclusion

In this post, you learned how Amazon MSK, OpenSearch Ingestion, OpenSearch Services, and Amazon S3 can be integrated to ingest, process, store, analyze, and act on endless streams of EV data efficiently.

With OpenSearch Ingestion as the integration layer between streams and storage, the entire pipeline scales up and down automatically based on demand. No more complex cluster management or lost data from bursts in streams.

See Amazon OpenSearch Ingestion to learn more.


About the authors

Ayush Agrawal is a Startups Solutions Architect from Gurugram, India with 11 years of experience in Cloud Computing. With a keen interest in AI, ML, and Cloud Security, Ayush is dedicated to helping startups navigate and solve complex architectural challenges. His passion for technology drives him to constantly explore new tools and innovations. When he’s not architecting solutions, you’ll find Ayush diving into the latest tech trends, always eager to push the boundaries of what’s possible.

Fraser SequeiraFraser Sequeira is a Solutions Architect with AWS based in Mumbai, India. In his role at AWS, Fraser works closely with startups to design and build cloud-native solutions on AWS, with a focus on analytics and streaming workloads. With over 10 years of experience in cloud computing, Fraser has deep expertise in big data, real-time analytics, and building event-driven architecture on AWS.

How EchoStar ingests terabytes of data daily across its 5G Open RAN network in near real-time using Amazon Redshift Serverless Streaming Ingestion

Post Syndicated from Balaram Mathukumilli original https://aws.amazon.com/blogs/big-data/how-echostar-ingests-terabytes-of-data-daily-across-its-5g-open-ran-network-in-near-real-time-using-amazon-redshift-serverless-streaming-ingestion/

This post was co-written with Balaram Mathukumilli, Viswanatha Vellaboyana and Keerthi Kambam from DISH Wireless, a wholly owned subsidiary of EchoStar.

EchoStar, a connectivity company providing television entertainment, wireless communications, and award-winning technology to residential and business customers throughout the US, deployed the first standalone, cloud-native Open RAN 5G network on AWS public cloud.

Amazon Redshift Serverless is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, simple, and secure analytics at scale. Amazon Redshift data sharing allows you to share data within and across organizations, AWS Regions, and even third-party providers, without moving or copying the data. Additionally, it allows you to use multiple warehouses of different types and sizes for extract, transform, and load (ETL) jobs so you can tune your warehouses based on your write workloads’ price-performance needs.

You can use the Amazon Redshift Streaming Ingestion capability to update your analytics data warehouse in near real time. Redshift Streaming Ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use SQL to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK), and pull data directly to Amazon Redshift.

EchoStar uses Redshift Streaming Ingestion to ingest over 10 TB of data daily from more than 150 MSK topics in near real time across its Open RAN 5G network. This post provides an overview of real-time data analysis with Amazon Redshift and how EchoStar uses it to ingest hundreds of megabytes per second. As data sources and volumes grew across its network, EchoStar migrated from a single Redshift Serverless workgroup to a multi-warehouse architecture with live data sharing. This resulted in improved performance for ingesting and analyzing their rapidly growing data.

“By adopting the strategy of ‘parse and transform later,’ and establishing an Amazon Redshift data warehouse farm with a multi-cluster architecture, we leveraged the power of Amazon Redshift for direct streaming ingestion and data sharing.

“This innovative approach improved our data latency, reducing it from two–three days to an average of 37 seconds. Additionally, we achieved better scalability, with Amazon Redshift direct streaming ingestion supporting over 150 MSK topics.”

—Sandeep Kulkarni, VP, Software Engineering & Head of Wireless OSS Platforms at EchoStar

EchoStar use case

EchoStar needed to provide near real-time access to 5G network performance data for downstream consumers and interactive analytics applications. This data is sourced from the 5G network EMS observability infrastructure and is streamed in near real-time using AWS services like AWS Lambda and AWS Step Functions. The streaming data produced many small files, ranging from bytes to kilobytes. To efficiently integrate this data, a messaging system like Amazon MSK was required.

EchoStar was processing over 150 MSK topics from their messaging system, with each topic containing around 1 billion rows of data per day. This resulted in an average total data volume of 10 TB per day. To use this data, EchoStar needed to visualize it, perform spatial analysis, join it with third-party data sources, develop end-user applications, and use the insights to make near real-time improvements to their terrestrial 5G network. EchoStar needed a solution that does the following:

  • Optimize parsing and loading of over 150 MSK topics to enable downstream workloads to run simultaneously without impacting each other
  • Allow hundreds of queries to run in parallel with desired query throughput
  • Seamlessly scale capacity with the increase in user base and maintain cost-efficiency

Solution overview

EchoStar migrated from a single Redshift Serverless workgroup to a multi-warehouse Amazon Redshift architecture in partnership with AWS. The new architecture enables workload isolation by separating streaming ingestion and ETL jobs from analytics workloads across multiple Redshift compute instances. At the same time, it provides live data sharing using a single copy of the data between the data warehouse. This architecture takes advantage of AWS capabilities to scale Redshift streaming ingestion jobs and isolate workloads while maintaining data access.

The following diagram shows the high-level end-to-end serverless architecture and overall data pipeline.

Architecture Diagram

The solution consists of the following key components:

  • Primary ETL Redshift Serverless workgroup – A primary ETL producer workgroup of size 392 RPU
  • Secondary Redshift Serverless workgroups – Additional producer workgroups of varying sizes to distribute and scale near real-time data ingestion from over 150 MSK topics based on price-performance requirements
  • Consumer Redshift Serverless workgroup – A consumer workgroup instance to run analytics using Tableau

To efficiently load multiple MSK topics into Redshift Serverless in parallel, we first identified the topics with the highest data volumes in order to determine the appropriate sizing for secondary workgroups.

We began by sizing the system initially to Redshift Serverless workgroup of 64 RPU. Then we onboarded a small number of MSK topics, creating related streaming materialized views. We incrementally added more materialized views, evaluating overall ingestion cost, performance, and latency needs within a single workgroup. This initial benchmarking gave us a solid baseline to onboard the remaining MSK topics across multiple workgroups.

In addition to a multi-warehouse approach and workgroup sizing, we optimized such large-scale data volume ingestion with an average latency of 37 seconds by splitting ingestion jobs into two steps:

  • Streaming materialized views – Use JSON_PARSE to ingest data from MSK topics in Amazon Redshift
  • Flattening materialized views – Shred and perform transformations as a second step, reading data from the respective streaming materialized view

The following diagram depicts the high-level approach.

MSK to Redshift

Best practices

In this section, we share some of the best practices we observed while implementing this solution:

  • We performed an initial Redshift Serverless workgroup sizing based on three key factors:
    • Number of records per second per MSK topic
    • Average record size per MSK topic
    • Desired latency SLA
  • Additionally, we created only one streaming materialized view for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic.
  • While defining the streaming materialized view, we avoided using JSON_EXTRACT_PATH_TEXT to pre-shred data, because json_extract_path_text operates on the data row by row, which significantly impacts ingestion throughput. Instead, we adopted JSON_PARSE with the CAN_JSON_PARSE function to ingest data from the stream at lowest latency and to guard against errors. The following is a sample SQL query we used for the MSK topics (the actual data source names have been masked due to security reasons):
CREATE MATERIALIZED VIEW <source-name>_streaming_mvw AUTO REFRESH YES AS
SELECT
    kafka_partition,
    kafka_offset,
    refresh_time,
    case when CAN_JSON_PARSE(kafka_value) = true then JSON_PARSE(kafka_value) end as Kafka_Data,
    case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end as Invalid_Data
FROM
    external_<source-name>."<source-name>_mvw";
  • We kept the streaming materialized views simple and moved all transformations like unnesting, aggregation, and case expressions to a later step as flattening materialized views. The following is a sample SQL query we used to flatten data by reading the streaming materialized views created in the previous step (the actual data source and column names have been masked due to security reasons):
CREATE MATERIALIZED VIEW <source-name>_flatten_mvw AUTO REFRESH NO AS
SELECT
    kafka_data."<column1>" :: integer as "<column1>",
    kafka_data."<column2>" :: integer as "<column2>",
    kafka_data."<column3>" :: bigint as "<column3>",
    … 
    …
    …
    …
FROM
    <source-name>_streaming_mvw;
  • The streaming materialized views were set to auto refresh so that they can continuously ingest data into Amazon Redshift from MSK topics.
  • The flattening materialized views were set to manual refresh based on SLA requirements using Amazon Managed Workflows for Apache Airflow (Amazon MWAA).
  • We skipped defining any sort key in the streaming materialized views to further accelerate the ingestion speed.
  • Lastly, we used SYS_MV_REFRESH_HISTORY and SYS_STREAM_SCAN_STATES system views to monitor the streaming ingestion refreshes and latencies.

For more information about best practices and monitoring techniques, refer to Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK.

Results

EchoStar saw improvements with this solution in both performance and scalability across their 5G Open RAN network.

Performance

By isolating and scaling Redshift Streaming Ingestion refreshes across multiple Redshift Serverless workgroups, EchoStar met their latency SLA requirements. We used the following SQL query to measure latencies:

WITH curr_qry as (
    SELECT
        mv_name,
        cast(partition_id as int) as partition_id,
        max(query_id) as current_query_id
    FROM
        sys_stream_scan_states
    GROUP BY
        mv_name,
        cast(partition_id as int)
)
SELECT
    strm.mv_name,
    tmp.partition_id,
    min(datediff(second, stream_record_time_max, record_time)) as min_latency_in_secs,
    max(datediff(second, stream_record_time_min, record_time)) as max_latency_in_secs
FROM
    sys_stream_scan_states strm,
    curr_qry tmp
WHERE
    strm.query_id = tmp.current_query_id
    and strm.mv_name = tmp.mv_name
    and strm.partition_id = tmp.partition_id
GROUP BY 1,2
ORDER BY 1,2;

When we further aggregate the preceding query to only the mv_name level (removing partition_id, which uniquely identifies a partition in an MSK topic), we find the average daily performance results we achieved on a Redshift Serverless workgroup size of 64 RPU as shown in the following chart. (The actual materialized view names have been hashed for security reasons because it maps to an external vendor name and data source.)

S.No. stream_name_hash min_latency_secs max_latency_secs avg_records_per_day
1 e022b6d13d83faff02748d3762013c 1 6 186,395,805
2 a8cc0770bb055a87bbb3d37933fc01 1 6 186,720,769
3 19413c1fc8fd6f8e5f5ae009515ffb 2 4 5,858,356
4 732c2e0b3eb76c070415416c09ffe0 3 27 12,494,175
5 8b4e1ffad42bf77114ab86c2ea91d6 3 4 149,927,136
6 70e627d11eba592153d0f08708c0de 5 5 121,819
7 e15713d6b0abae2b8f6cd1d2663d94 5 31 148,768,006
8 234eb3af376b43a525b7c6bf6f8880 6 64 45,666
9 38e97a2f06bcc57595ab88eb8bec57 7 100 45,666
10 4c345f2f24a201779f43bd585e53ba 9 12 101,934,969
11 a3b4f6e7159d9b69fd4c4b8c5edd06 10 14 36,508,696
12 87190a106e0889a8c18d93a3faafeb 13 69 14,050,727
13 b1388bad6fc98c67748cc11ef2ad35 25 118 509
14 cf8642fccc7229106c451ea33dd64d 28 66 13,442,254
15 c3b2137c271d1ccac084c09531dfcd 29 74 12,515,495
16 68676fc1072f753136e6e992705a4d 29 69 59,565
17 0ab3087353bff28e952cd25f5720f4 37 71 12,775,822
18 e6b7f10ea43ae12724fec3e0e3205c 39 83 2,964,715
19 93e2d6e0063de948cc6ce2fb5578f2 45 45 1,969,271
20 88cba4fffafd085c12b5d0a01d0b84 46 47 12,513,768
21 d0408eae66121d10487e562bd481b9 48 57 12,525,221
22 de552412b4244386a23b4761f877ce 52 52 7,254,633
23 9480a1a4444250a0bc7a3ed67eebf3 58 96 12,522,882
24 db5bd3aa8e1e7519139d2dc09a89a7 60 103 12,518,688
25 e6541f290bd377087cdfdc2007a200 71 83 176,346,585
26 6f519c71c6a8a6311f2525f38c233d 78 115 100,073,438
27 3974238e6aff40f15c2e3b6224ef68 79 82 12,770,856
28 7f356f281fc481976b51af3d76c151 79 96 75,077
29 e2e8e02c7c0f68f8d44f650cd91be2 92 99 12,525,210
30 3555e0aa0630a128dede84e1f8420a 97 105 8,901,014
31 7f4727981a6ba1c808a31bd2789f3a 108 110 11,599,385

All 31 materialized views running and refreshing concurrently and continuously show a minimum latency of 1 second and a maximum latency of 118 seconds over the last 7 days, meeting EchoStar’s SLA requirements.

Scalability

With this Redshift data sharing enabled multi-warehouse architecture approach, EchoStar can now quickly scale their Redshift compute resources on demand by using the Redshift data sharing architecture to onboard the remaining 150 MSK topics. In addition, as their data sources and MSK topics increase further, they can quickly add additional Redshift Serverless workgroups (for example, another Redshift Serverless 128 RPU workgroup) to meet their desired SLA requirements.

Conclusion

By using the scalability of Amazon Redshift and a multi-warehouse architecture with data sharing, EchoStar delivers near real-time access to over 150 million rows of data across over 150 MSK topics, totaling 10 TB ingested daily, to their users.

This split multi-producer/consumer model of Amazon Redshift can bring benefits to many workloads that have similar performance characteristics as EchoStar’s warehouse. With this pattern, you can scale your workload to meet SLAs while optimizing for price and performance. Please reach out to your AWS Account Team to engage an AWS specialist for additional help or for a proof of concept.


About the authors

Balaram Mathukumilli is Director, Enterprise Data Services at DISH Wireless. He is deeply passionate about Data and Analytics solutions. With 20+ years of experience in Enterprise and Cloud transformation, he has worked across domains such as PayTV, Media Sales, Marketing and Wireless. Balaram works closely with the business partners to identify data needs, data sources, determine data governance, develop data infrastructure, build data analytics capabilities, and foster a data-driven culture to ensure their data assets are properly managed, used effectively, and are secure

Viswanatha Vellaboyana, a Solutions Architect at DISH Wireless, is deeply passionate about Data and Analytics solutions. With 20 years of experience in enterprise and cloud transformation, he has worked across domains such as Media, Media Sales, Communication, and Health Insurance. He collaborates with enterprise clients, guiding them in architecting, building, and scaling applications to achieve their desired business outcomes.

Keerthi Kambam is a Senior Engineer at DISH Network specializing in AWS Services. She builds scalable data engineering and analytical solutions for dish customer faced applications. She is passionate about solving complex data challenges with cloud solutions.

Raks KhareRaks Khare is a Senior Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers across varying industries and regions architect data analytics solutions at scale on the AWS platform. Outside of work, he likes exploring new travel and food destinations and spending quality time with his family.

Adi Eswar has been a core member of the AI/ML and Analytics Specialist team, leading the customer experience of customer’s existing workloads and leading key initiatives as part of the Analytics Customer Experience Program and Redshift enablement in AWS-TELCO customers. He spends his free time exploring new food, cultures, national parks and museums with his family.

Shirin Bhambhani is a Senior Solutions Architect at AWS. She works with customers to build solutions and accelerate their cloud migration journey. She enjoys simplifying customer experiences on AWS.

Vinayak Rao is a Senior Customer Solutions Manager at AWS. He collaborates with customers, partners, and internal AWS teams to drive customer success, delivery of technical solutions, and cloud adoption.

Configure a custom domain name for your Amazon MSK cluster

Post Syndicated from Subham Rakshit original https://aws.amazon.com/blogs/big-data/configure-a-custom-domain-name-for-your-amazon-msk-cluster/

Amazon Managed Streaming for Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data. It runs open-source versions of Apache Kafka. This means existing applications, tooling, and plugins from partners and the Apache Kafka community are supported without requiring changes to application code.

Customers use Amazon MSK for real-time data sharing with their end customers, who could be internal teams or third parties. These end customers manage Kafka clients, which are deployed in AWS, other managed cloud providers, or on premises. When migrating from self-managed to Amazon MSK or moving clients between MSK clusters, customers want to avoid the need for Kafka client reconfiguration, to use a different Domain Name System (DNS) name. Therefore, it’s important to have a custom domain name for the MSK cluster that the clients can communicate to. Also, having a custom domain name makes the disaster recovery (DR) process less complicated because clients don’t need to change the MSK bootstrap address when either a new cluster is created or a client connection needs to be redirected to a DR AWS Region.

MSK clusters use AWS-generated DNS names that are unique for each cluster, containing the broker ID, MSK cluster name, two service generated sub-domains, and the AWS Region, ending with amazonaws.com. The following figure illustrates this naming format.

MSK brokers use the same DNS name for the certificates used for Transport Layer Security (TLS) connections. The DNS name used by clients with TLS encrypted authentication mechanisms must match the primary Common Name (CN), or Subject Alternative Name (SAN) of the certificate presented by the MSK broker, to avoid hostname validation errors.

The solution discussed in this post provides a way for you to use a custom domain name for clients to connect to their MSK clusters when using SASL/SCRAM (Simple Authentication and Security Layer/ Salted Challenge Response Mechanism) authentication only.

Solution overview

Network Load Balancers (NLBs) are a popular addition to the Amazon MSK architecture, along with AWS PrivateLink as a way to expose connectivity to an MSK cluster from other virtual private clouds (VPCs). For more details, see How Goldman Sachs builds cross-account connectivity to their Amazon MSK clusters with AWS PrivateLink. In this post, we run through how to use an NLB to enable the use of a custom domain name with Amazon MSK when using SASL/SCRAM authentication.

The following diagram shows all components used by the solution.

SASL/SCRAM uses TLS to encrypt the Kafka protocol traffic between the client and Kafka broker. To use a custom domain name, the client needs to be presented with a server certificate matching that custom domain name. As of this writing, it isn’t possible to modify the certificate used by the MSK brokers, so this solution uses an NLB to sit between the client and MSK brokers.

An NLB works at the connection layer (Layer 4) and routes the TCP or UDP protocol traffic. It doesn’t validate the application data being sent and forwards the Kafka protocol traffic. The NLB provides the ability to use a TLS listener, where a certificate is imported into AWS Certificate Manager (ACM) and associated with the listener and enables TLS negotiation between the client and the NLB. The NLB performs a separate TLS negotiation between itself and the MSK brokers. This NLB TLS negotiation to the target works exactly the same irrespective of whether certificates are signed by a public or private Certificate Authority (CA).

For the client to resolve DNS queries for the custom domain, an Amazon Route 53 private hosted zone is used to host the DNS records, and is associated with the client’s VPC to enable DNS resolution from the Route 53 VPC resolver.

Kafka listeners and advertised listeners

Kafka listeners (listeners) are the lists of addresses that Kafka binds to for listening. A Kafka listener is composed of a hostname or IP, port, and protocol: <protocol>://<hostname>:<port>.

The Kafka client uses the bootstrap address to connect to one of the brokers in the cluster and issues a metadata request. The broker provides a metadata response containing the address information of each broker that the client needs to connect to talk to these brokers. Advertised listeners (advertised.listeners) is a configuration option used by Kafka clients to connect to the brokers. By default, an advertised listener is not set. After it’s set, Kafka clients will use the advertised listener instead of listeners to obtain the connection information for brokers.

When Amazon MSK multi-VPC private connectivity is enabled, AWS sets the advertised.listeners configuration option to include the Amazon MSK multi-VPC DNS alias.

MSK brokers use the listener configuration to tell clients the DNS names to use to connect to the individual brokers for each authentication type enabled. Therefore, when clients are directed to use the custom domain name, you need to set a custom advertised listener for SASL/SCRAM authentication protocol. Advertised listeners are unique to each broker; the cluster won’t start if multiple brokers have the same advertised listener address.

Kafka bootstrap process and setup options

A Kafka client uses the bootstrap addresses to get the metadata from the MSK cluster, which in response provides the broker hostname and port (the listeners information by default or the advertised listener if it’s configured) that the client needs to connect to for subsequent requests. Using this information, the client connects to the appropriate broker for the topic or partition that it needs to send to or fetch from. The following diagram shows the default bootstrap and topic or partition connectivity between a Kafka client and MSK broker.

You have two options when using a custom domain name with Amazon MSK.

Option 1: Only a bootstrap connection through an NLB

You can use a custom domain name only for the bootstrap connection, where the advertised listeners are not set, so the client is directed to the default AWS cluster DNS name. This option is beneficial when the Kafka client has direct network connectivity to both the NLB and the MSK broker’s Elastic Network Interface (ENI). The following diagram illustrates this setup.

No changes are required to the MSK brokers, and the Kafka client has the custom domain set as the bootstrap address. The Kafka client uses the custom domain bootstrap address to send a get metadata request to the NLB. The NLB sends the Kafka protocol traffic received by the Kafka client to a healthy MSK broker’s ENI. That broker responds with metadata where only listeners is set, containing the default MSK cluster DNS name for each broker. The Kafka client then uses the default MSK cluster DNS name for the appropriate broker and connects to that broker’s ENI.

Option 2: All connections through an NLB

Alternatively, you can use a custom domain name for the bootstrap and the brokers, where the custom domain name for each broker is set in the advertised listeners configuration. You need to use this option when Kafka clients don’t have direct network connectivity to the MSK brokers ENI. For example, Kafka clients need to use an NLB, AWS PrivateLink, or Amazon MSK multi-VPC endpoints to connect to an MSK cluster. The following diagram illustrates this setup.

The advertised listeners are set to use the custom domain name, and the Kafka client has the custom domain set as the bootstrap address. The Kafka client uses the custom domain bootstrap address to send a get metadata request, which is sent to the NLB. The NLB sends the Kafka protocol traffic received by the Kafka client to a healthy MSK broker’s ENI. That broker responds with metadata where advertised listeners is set. The Kafka client uses the custom domain name for the appropriate broker, which directs the connection to the NLB, for the port set for that broker. The NLB sends the Kafka protocol traffic to that broker.

Network Load Balancer

The following diagram illustrates the NLB port and target configuration. A TLS listener with port 9000 is used for bootstrap connections with all MSK brokers set as targets. The listener uses TLS target type with target port as 9096. A TLS listener port is used to represent each broker in the MSK cluster. In this post, there are three brokers in the MSK cluster with TLS 9001, representing broker 1, up to TLS 9003, representing broker 3.

For all TLS listeners on the NLB, a single imported certificate with the domain name bootstrap.example.com is attached to the NLB. bootstrap.example.com is used as the Common Name (CN) so that the certificate is valid for the bootstrap address, and Subject Alternative Names (SANs) are set for all broker DNS names. If the certificate is issued by a private CA, clients need to import the root and intermediate CA certificates to the trust store. If the certificate is issued by a public CA, the root and intermediate CA certificates will be in the default trust store.

The following table shows the required NLB configuration.

NLB Listener Type NLB Listener Port Certificate NLB Target Type NLB Targets
TLS 9000 bootstrap.example.com TLS All Broker ENIs
TLS 9001 bootstrap.example.com TLS Broker 1
TLS 9002 bootstrap.example.com TLS Broker 2
TLS 9003 bootstrap.example.com TLS Broker 3

Domain Name System

For this post, a Route 53 private hosted zone is used to host the DNS records for the custom domain, in this case example.com. The private hosted zone is associated with the Amazon MSK VPC, to enable DNS resolution for the client that is launched in the same VPC. If your client is in a different VPC than the MSK cluster, you need to associate the private hosted zone with that client’s VPC.

The Route 53 private hosted zone is not a required part of the solution. The most crucial part is that the client can perform DNS resolution against the custom domain and get the required responses. You can instead use your organization’s existing DNS, a Route 53 public hosted zone or Route 53 inbound resolver to resolve Route 53 private hosted zones from outside of AWS, or an alternative DNS solution.

The following figure shows the DNS records used by the client to resolve to the NLB. We use bootstrap for the initial client connection, and use b-1, b-2, and b-3 to reference each broker’s name.

The following table lists the DNS records required for a three-broker MSK cluster when using a Route 53 private or public hosted zone.

Record Record Type Value
bootstrap A NLB Alias
b-1 A NLB Alias
b-2 A NLB Alias
b-3 A NLB Alias

The following table lists the DNS records required for a three-broker MSK cluster when using other DNS solutions.

Record Record Type Value
bootstrap C NLB DNS A Record (e.g. name-id.elb.region.amazonaws.com)
b-1 C NLB DNS A Record
b-2 C NLB DNS A Record
b-3 C NLB DNS A Record

In the following sections, we go through the steps to configure a custom domain name for your MSK cluster and clients connecting with the custom domain.

Prerequisites

To deploy the solution, you need the following prerequisites:

Launch the CloudFormation template

Complete the following steps to deploy the CloudFormation template:

  1. Choose Launch Stack.

  1. Provide the stack name as msk-custom-domain.
  2. For MSKClientUserName, enter the user name of the secret used for SASL/SCRAM authentication with Amazon MSK.
  3. For MSKClientUserPassword, enter the password of the secret used for SASL/SCRAM authentication with Amazon MSK.

The CloudFormation template will deploy the following resources:

Set up the EC2 instance

Complete the following steps to configure your EC2 instance:

  1. On the Amazon EC2 console, connect to the instance msk-custom-domain-KafkaClientInstance1 using Session Manager, a capability of AWS Systems Manager.
  2. Switch to ec2-user:
    sudo su - ec2-user 
    cd

  3. Run the following commands to configure the SASL/SCRAM client properties, create Kafka access control lists (ACLs), and create a topic named customer:
    . ./cloudformation_outputs.sh 
    aws configure set region $REGION 
    export BS=$(aws kafka get-bootstrap-brokers --cluster-arn ${MSKClusterArn} | jq -r '.BootstrapBrokerStringSaslScram') 
    export ZOOKEEPER=$(aws kafka describe-cluster --cluster-arn $MSKClusterArn | jq -r '.ClusterInfo.ZookeeperConnectString')
    ./configure_sasl_scram_properties_and_kafka_acl.sh

Create a certificate

For this post, we use self-signed certificates. However, it’s recommended to use either a public certificate or a certificate signed by your organization’s private key infrastructure (PKI).

If you’re are using an AWS private CA for the private key infrastructure, refer to Creating a private CA for instructions to create and install a private CA.

Use the openSSL command to create a self-signed certificate. Modify the following command, adding the country code, state, city, and company:

SSLCONFIG="[req]
prompt = no
distinguished_name = req_distinguished_name
x509_extensions = v3_ca

[req_distinguished_name]
C = <<Country_Code>>
ST = <<State>>
L = <<City>>
O = <<Company>>
OU = 
emailAddress = 
CN = botstrap.example.com

[v3_ca]
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alternate_names

[alternate_names]
DNS.1 = bootstrap.example.com
DNS.2 = b-1.example.com
DNS.3 = b-2.example.com
DNS.4 = b-3.example.com
"

openssl req -x509 -newkey rsa:2048 -days 365 -nodes \
    -config <(echo "$SSLCONFIG") \
    -keyout msk-custom-domain-pvt-key.pem \
    -out msk-custom-domain-certificate.pem  

You can check the created certificate using the following command:

openssl x509 -text -noout -in msk-custom-domain-certificate.pem

Import the certificate to ACM

To use the self-signed certificate for the solution, you need to import the certificate to ACM:

export CertificateARN=$(aws acm import-certificate --certificate file://msk-custom-domain-certificate.pem --private-key file://msk-custom-domain-pvt-key.pem | jq -r '.CertificateArn')

echo $CertificateARN

After it’s imported, you can see the certificate in ACM.

Import the certificate to the Kafka client trust store

For the client to validate the server SSL certificate during the TLS handshake, you need to import the self-signed certificate to the client’s trust store.

  1. Run the following command to use the JVM trust store to create your client trust store:
    cp /usr/lib/jvm/jre-1.8.0-openjdk/lib/security/cacerts /home/ec2-user/kafka.client.truststore.jks 
    chmod 700 kafka.client.truststore.jks

  2. Import the self-signed certificate to the trust store by using the following command. Provide the keystore password as changeit.
    /usr/lib/jvm/jre-1.8.0-openjdk/bin/keytool -import \ 
    	-trustcacerts \ 
    	-noprompt \ 
    	-alias msk-cert \ 
    	-file msk-custom-domain-certificate.pem \ 
    	-keystore kafka.client.truststore.jks

  3. You need to include the trust store certificate location config properties used by Kafka clients to enable certification validation:
    echo 'ssl.truststore.location=/home/ec2-user/kafka.client.truststore.jks' >> /home/ec2-user/kafka/config/client_sasl.properties

Set up DNS resolution for clients within the VPC

To set up DNS resolution for clients, create a private hosted zone for the domain and associate the hosted zone with the VPC where the client is deployed:

aws route53 create-hosted-zone \
--name example.com \
--caller-reference "msk-custom-domain" \
--hosted-zone-config Comment="Private Hosted Zone for MSK",PrivateZone=true \
--vpc VPCRegion=$REGION,VPCId=$MSKVPCId

export HostedZoneId=$(aws route53 list-hosted-zones-by-vpc --vpc-id $MSKVPCId --vpc-region $REGION | jq -r '.HostedZoneSummaries[0].HostedZoneId')

Create EC2 target groups

Target groups route requests to individual registered targets, such as EC2 instances, using the protocol and port number that you specify. You can register a target with multiple target groups and you can register multiple targets to one target group.

For this post, you need four target groups: one for each broker instance and one that will point to all the brokers and will be used by clients for Amazon MSK connection bootstrapping.

The target group will receive traffic on port 9096 (SASL/SCRAM authentication) and will be associated with the Amazon MSK VPC:

aws elbv2 create-target-group \
    --name b-all-bootstrap \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId
    
aws elbv2 create-target-group \
    --name b-1 \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId
    
aws elbv2 create-target-group \
    --name b-2 \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId
    
aws elbv2 create-target-group \
    --name b-3 \
    --protocol TLS \
    --port 9096 \
    --target-type ip \
    --vpc-id $MSKVPCId

Register target groups with MSK broker IPs

You need to associate each target group with the broker instance (target) in the MSK cluster so that the traffic going through the target group can be routed to the individual broker instance.

Complete the following steps:

  1. Get the MSK broker hostnames:
echo $BS

This should show the brokers, which are part of bootstrap address. The hostname of broker 1 looks like the following code:

b-1.mskcustomdomaincluster.xxxxx.yy.kafka.region.amazonaws.com

To get the hostname of other brokers in the cluster, replace b-1 with values like b-2, b-3, and so on. For example, if you have six brokers in the cluster, you will have six broker hostnames starting with b-1 to b-6.

  1. To get the IP address of the individual brokers, use the nslookup command:
nslookup b-1.mskcustomdomaincluster.xxxxx.yy.kafka.region.amazonaws.com Server: 172.16.0.2
Address: 172.16.0.2#53

Non-authoritative answer:
Name: b-1.mskcustomdomaincluster.xxxxx.yy.kafka.region.amazonaws.com
Address: 172.16.1.225
  1. Modify the following commands with the IP addresses of each broker to create an environment variable that will be used later:
export B1=<<b-1_IP_Address>> 
export B2=<<b-2_IP_Address>> 
export B3=<<b-3_IP_Address>>

Next, you need to register the broker IP with the target group. For broker b-1, you will register the IP address with target group b-1.

  1. Provide the target group name b-1 to get the target group ARN. Then register the broker IP address with the target group.
export TARGET_GROUP_B_1_ARN=$(aws elbv2 describe-target-groups --names b-1 | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
--target-group-arn ${TARGET_GROUP_B_1_ARN} \
--targets Id=$B1
  1. Iterate the steps of obtaining the IP address from other broker hostnames and register the IP address with the corresponding target group for brokers b-2 and b-3:
B-2
export TARGET_GROUP_B_2_ARN=$(aws elbv2 describe-target-groups --names b-2 | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
    --target-group-arn ${TARGET_GROUP_B_2_ARN} \
    --targets Id=$B2
B-3
export TARGET_GROUP_B_3_ARN=$(aws elbv2 describe-target-groups --names b-3 | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
    --target-group-arn ${TARGET_GROUP_B_3_ARN} \
    --targets Id=$B3
  1. Also, you need to register all three broker IP addresses with the target group b-all-bootstrap. This target group will be used for routing the traffic for the Amazon MSK client connection bootstrap process.
export TARGET_GROUP_B_ALL_ARN=$(aws elbv2 describe-target-groups --names b-all-bootstrap | jq -r '.TargetGroups[0].TargetGroupArn')

aws elbv2 register-targets \
--target-group-arn ${TARGET_GROUP_B_ALL_ARN} \
--targets Id=$B1 Id=$B2 Id=$B3

Set up NLB listeners

Now that you have the target groups created and certificate imported, you’re ready to create the NLB and listeners.

Create the NLB with the following code:

aws elbv2 create-load-balancer \
--name msk-nlb-internal \
--scheme internal \
--type network \
--subnets $MSKVPCPrivateSubnet1 $MSKVPCPrivateSubnet2 $MSKVPCPrivateSubnet3 \
--security-groups $NLBSecurityGroupId

export NLB_ARN=$(aws elbv2 describe-load-balancers --names msk-nlb-internal | jq -r '.LoadBalancers[0].LoadBalancerArn')

Next, you configure the listeners that will be used by the clients to communicate with the MSK cluster. You need to create four listeners, one for each target group for ports 9000–9003. The following table lists the listener configurations.

Protocol Port Certificate NLB Target Type NLB Targets
TLS 9000 bootstrap.example.com TLS b-all-bootstrap
TLS 9001 bootstrap.example.com TLS b-1
TLS 9002 bootstrap.example.com TLS b-2
TLS 9003 bootstrap.example.com TLS b-3

Use the following code for port 9000:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9000 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_ALL_ARN

Use the following code for port 9001:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9001 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_1_ARN

Use the following code for port 9002:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9002 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_2_ARN

Use the following code for port 9003:

aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TLS \
--port 9003 \
--certificates CertificateArn=$CertificateARN \
--ssl-policy ELBSecurityPolicy-TLS13-1-2-2021-06 \
--default-actions Type=forward,TargetGroupArn=$TARGET_GROUP_B_3_ARN

Enable cross-zone load balancing

By default, cross-zone load balancing is disabled on NLBs. When disabled, each load balancer node distributes traffic to healthy targets in the same Availability Zone. For example, requests that come into the load balancer node in Availability Zone A will only be forwarded to a healthy target in Availability Zone A. If the only healthy target or the only registered target associated to an NLB listener is in another Availability Zone than the load balancer node receiving the traffic, the traffic is dropped.

Because the NLB has the bootstrap listener that is associated with a target group that has all brokers registered across multiple Availability Zones, Route 53 will respond to DNS queries against the NLB DNS name with the IP address of NLB ENIs in Availability Zones with healthy targets.

When the Kafka client tries to connect to a broker through the broker’s listener on the NLB, there will be a noticeable delay in receiving a response from the broker as the client tries to connect to the broker using all IPs returned by Route 53.

Enabling cross-zone load balancing distributes the traffic across the registered targets in all Availability Zones.

aws elbv2 modify-load-balancer-attributes --load-balancer-arn $NLB_ARN --attributes Key=load_balancing.cross_zone.enabled,Value=true

Create DNS A records in a private hosted zone

Create DNS A records to route the traffic to the network load balancer. The following table lists the records.

Record Record Type Value
bootstrap A NLB Alias
b-1 A NLB Alias
b-2 A NLB Alias
b-3 A NLB Alias

Alias record types will be used, so you need the NLB’s DNS name and hosted zone ID:

export NLB_DNS=$(aws elbv2 describe-load-balancers --names msk-nlb-internal | jq -r '.LoadBalancers[0].DNSName')

export NLB_ZoneId=$(aws elbv2 describe-load-balancers --names msk-nlb-internal | jq -r '.LoadBalancers[0].CanonicalHostedZoneId')

Create the bootstrap record, and then repeat this command to create the b-1, b-2, and b-3 records, modifying the Name field:

aws route53 change-resource-record-sets \
--hosted-zone-id $HostedZoneId \
--change-batch file://<(cat << EOF
{
   "Comment": "Create bootstrap record",
   "Changes": [{
      "Action": "CREATE",
      "ResourceRecordSet": {
         "Name": "bootstrap.example.com",
         "Type": "A",
         "AliasTarget": {
            "HostedZoneId": "$NLB_ZoneId",
            "DNSName": "$NLB_DNS",
            "EvaluateTargetHealth": true
         }
      }
   }]
}
EOF)

Optionally, to optimize cross-zone data charges, you can set b-1, b-2, and b-3 to the IP address of the NLB’s ENI that is in the same Availability Zone as each broker. For example, if b-2 is using an IP address that is in subnet 172.16.2.0/24, which is in Availability Zone A, you should use the NLB ENI that is in the same Availability Zone as the value for the DNS record.

The next step details how to use a custom domain name for bootstrap connectivity only. If all Kafka traffic needs to go through the NLB, as discussed earlier, proceed to the subsequent section to set up advertised listeners.

Configure the advertised listener in the MSK cluster

To get the listener details for broker 1, you provide entity-type as brokers and entity-name as 1 for the broker ID:

/home/ec2-user/kafka/bin/kafka-configs.sh --bootstrap-server $BS \
--entity-type brokers \
--entity-name 1 \
--command-config ~/kafka/config/client_sasl.properties \
--all \
--describe | grep 'listeners=CLIENT_SASL_SCRAM'

You will get an output like the following:

Listeners=CLIENT_SASL_SCRAM://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9096,CLIENT_SECURE://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9094,REPLICATION://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE:// b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9095 sensitive=false synonyms={STATIC_BROKER_CONFIG:listeners=CLIENT_SASL_SCRAM://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9096,CLIENT_SECURE://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9094,REPLICATION://b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE:// b-1.mskcustomdomaincluster.XXXX.yy.kafka.region.amazonaws.com:9095}

Going forward, clients will connect through the custom domain name. Therefore, you need to configure the advertised listeners to the custom domain hostname and port. For this, you need to copy the listener details and change the CLIENT_SASL_SCRAM listener to b-1.example.com:9001.

While you’re configuring the advertised listener, you also need to preserve the information about other listener types in the advertised listener because inter-broker communications also use the addresses in the advertised listener.

Based on our configuration, the advertised listener for broker 1 will look like the following code, with everything after sensitive=false removed:

CLIENT_SASL_SCRAM://b-1.example.com:9001,REPLICATION://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9095

Modify the following command as follows:

  • <<BROKER_NUMBER>> – Set to the broker ID being changed (for example, 1 for broker 1)
  • <<PORT_NUMBER>> – Set to the port number corresponding to broker ID (for example, 9001 for broker 1)
  • <<REPLICATION_DNS_NAME>> – Set to the DNS name for REPLICATION
  • <<REPLICATION_SECURE_DNS_NAME>> – Set to the DNS name for REPLICATION_SECURE
/home/ec2-user/kafka/bin/kafka-configs.sh --alter \
--bootstrap-server $BS \
--entity-type brokers \
--entity-name <<BROKER_NUMBER>> \
--command-config ~/kafka/config/client_sasl.properties \
--add-config advertised.listeners=[CLIENT_SASL_SCRAM://b-<<BROKER_NUMBER>>.example.com:<<PORT_NUMBER>>,REPLICATION://<<REPLICATION_DNS_NAME>>:9093,REPLICATION_SECURE://<<REPLICATION_SECURE_DNS_NAME>>:9095]

The command should look something like the following example:

/home/ec2-user/kafka/bin/kafka-configs.sh --alter \
--bootstrap-server $BS \
--entity-type brokers \
--entity-name 1 \
--command-config ~/kafka/config/client_sasl.properties \
--add-config advertised.listeners=[CLIENT_SASL_SCRAM://b-1.example.com:9001,REPLICATION://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9093,REPLICATION_SECURE://b-1-internal.mskcustomdomaincluster.xxxxxx.yy.kafka.region.amazonaws.com:9095]

Run the command to add the advertised listener for broker 1.

You need to get the listener details for the other brokers and configure the advertised.listener for each.

Test the setup

Set the bootstrap address to the custom domain. This is the A record created in the private hosted zone.

export BS=bootstrap.example.com:9000

List the MSK topics using the custom domain bootstrap address:

/home/ec2-user/kafka/bin/kafka-topics.sh --list \
--bootstrap-server $BS \
--command-config=/home/ec2-user/kafka/config/client_sasl.properties

You should see the topic customer.

Clean up

To stop incurring costs, it’s recommended to manually delete the private hosted zone, NLB, target groups, and imported certificate in ACM. Also, delete the CloudFormation stack to remove any resources provisioned by CloudFormation.

Use the following code to manually delete the aforementioned resources:

aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "bootstrap.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "b-1.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "b-2.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 change-resource-record-sets \
  --hosted-zone-id $HostedZoneId \
  --change-batch file://<(cat << EOF
{
  "Changes": [
    {
      "Action": "DELETE",
      "ResourceRecordSet": {
        "Name": "b-3.example.com",
        "Type": "A",
        "AliasTarget": {
          "HostedZoneId": "$NLB_ZoneId",
          "DNSName": "$NLB_DNS",
          "EvaluateTargetHealth": true
        }
      }
    }
  ]
}
EOF
)
    
aws route53 delete-hosted-zone --id $HostedZoneId
aws elbv2 delete-load-balancer --load-balancer-arn $NLB_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_ALL_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_1_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_2_ARN
aws elbv2 delete-target-group --target-group-arn $TARGET_GROUP_B_3_ARN

You need to wait up to 5 minutes for the completion of the NLB deletion:

aws acm delete-certificate --certificate-arn $CertificateARN

Now you can delete the CloudFormation stack.

Summary

This post explains how you can use an NLB, Route 53, and the advertised listener configuration option in Amazon MSK to support custom domain names with MSK clusters when using SASL/SCRAM authentication. You can use this solution to keep your existing Kafka bootstrap DNS name and reduce or remove the need to change client applications because of a migration, recovery process, or multi-cluster high availability. You can also use this solution to have the MSK bootstrap and broker names under your custom domain, enabling you to bring the DNS name in line with your naming convention (for example, msk.prod.example.com).

Try the solution out for yourself, and leave your questions and feedback in the comments section.


About the Authors

Subham Rakshit is a Senior Streaming Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build streaming architectures so they can get value from analyzing their streaming data. His two little daughters keep him occupied most of the time outside work, and he loves solving jigsaw puzzles with them. Connect with him on LinkedIn.

Mark Taylor is a Senior Technical Account Manager at Amazon Web Services, working with enterprise customers to implement best practices, optimize AWS usage, and address business challenges. Prior to joining AWS, Mark spent over 16 years in networking roles across industries, including healthcare, government, education, and payments. Mark lives in Folkestone, England, with his wife and two dogs. Outside of work, he enjoys watching and playing football, watching movies, playing board games, and traveling.

Stream multi-tenant data with Amazon MSK

Post Syndicated from Emanuele Levi original https://aws.amazon.com/blogs/big-data/stream-multi-tenant-data-with-amazon-msk/

Real-time data streaming has become prominent in today’s world of instantaneous digital experiences. Modern software as a service (SaaS) applications across all industries rely more and more on continuously generated data from different data sources such as web and mobile applications, Internet of Things (IoT) devices, social media platforms, and ecommerce sites. Processing these data streams in real time is key to delivering responsive and personalized solutions, and maximizes the value of data by processing it as close to the event time as possible.

AWS helps SaaS vendors by providing the building blocks needed to implement a streaming application with Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), and real-time processing applications with Amazon Managed Service for Apache Flink.

In this post, we look at implementation patterns a SaaS vendor can adopt when using a streaming platform as a means of integration between internal components, where streaming data is not directly exposed to third parties. In particular, we focus on Amazon MSK.

Streaming multi-tenancy patterns

When building streaming applications, you should take the following dimensions into account:

  • Data partitioning – Event streaming and storage needs to be isolated at the appropriate level, physical or logical, based on tenant ownership
  • Performance fairness – The performance coupling of applications processing streaming data for different tenants must be controlled and limited
  • Tenant isolation – A solid authorization strategy needs to be put in place to make sure tenants can access only their data

Underpinning all interactions with a multi-tenant system is the concept of SaaS identity. For more information, refer to SaaS Architecture Fundamentals.

SaaS deployment models

Tenant isolation is not optional for SaaS providers, and tenant isolation approaches will differ depending on your deployment model. The model is influenced by business requirements, and the models are not mutually exclusive. Trade-offs must be weighed across individual services to achieve a proper balance of isolation, complexity, and cost. There is no universal solution, and a SaaS vendor needs to carefully weigh their business and customer needs against three isolation strategies: silo, pool and bridge (or combinations thereof).

In the following sections, we explore these deployment models across data isolation, performance fairness, and tenant isolation dimensions.

Silo model

The silo model represents the highest level of data segregation, but also the highest running cost. Having a dedicated MSK cluster per tenant increases the risk of overprovisioning and requires duplication of management and monitoring tooling.

Having a dedicated MSK cluster per tenant makes sure tenant data partitioning occurs at the disk level when using an Amazon MSK Provisioned model. Both Amazon MSK Provisioned and Serverless clusters support server-side encryption at rest. Amazon MSK Provisioned further allows you to use a customer managed AWS Key Management Service (AWS KMS) key (see Amazon MSK encryption).

In a silo model, Kafka ACL and quotas is not strictly required unless your business requirements require them. Performance fairness is guaranteed because only a single tenant will be using the resources of the entire MSK cluster and are dedicated to applications producing and consuming events of a single tenant. This means spikes of traffic on a specific tenant can’t impact other tenants, and there is no risk of cross-tenant data access. As a drawback, having a provisioned cluster per tenant requires a right-sizing exercise per tenant, with a higher risk of overprovisioning than in the pool or bridge models.

You can implement tenant isolation the MSK cluster level with AWS Identity and Access Management (IAM) policies, creating per-cluster credentials, depending on the authentication scheme in use.

Pool model

The pool model is the simplest model where tenants share resources. A single MSK cluster is used for all tenants with data split into topics based on the event type (for example, all events related to orders go to the topic orders), and all tenant’s events are sent to the same topic. The following diagram illustrates this architecture.

Image showing a single streaming topic with multiple producers and consumers

This model maximizes operational simplicity, but reduces the tenant isolation options available because the SaaS provider won’t be able to differentiate per-tenant operational parameters and all responsibilities of isolation are delegated to the applications producing and consuming data from Kafka. The pool model also doesn’t provide any mechanism of physical data partitioning, nor performance fairness. A SaaS provider with these requirements should consider either a bridge or silo model. If you don’t have requirements to account for parameters such as per-tenant encryption keys or tenant-specific data operations, a pool model offers reduced complexity and can be a viable option. Let’s dig deeper into the trade-offs.

A common strategy to implement consumer isolation is to identify the tenant within each event using a tenant ID. The options available with Kafka are passing the tenant ID either as event metadata (header) or part of the payload itself as an explicit field. With this approach, the tenant ID will be used as a standardized field across all applications within both the message payload and the event header. This approach can reduce the risk of semantic divergence when components process and forward messages because event headers are handled differently by different processing frameworks and could be stripped when forwarded. Conversely, the event body is often forwarded as a single object and no contained information is lost unless the event is explicitly transformed. Including the tenant ID in the event header as well may simplify the implementation of services allowing you to specify tenants that need to be recovered or migrated without requiring the provider to deserialize the message payload to filter by tenant.

When specifying the tenant ID using either a header or as a field in the event, consumer applications will not be able to selectively subscribe to the events of a specific tenant. With Kafka, a consumer subscribes to a topic and receives all events sent to that topic of all tenants. Only after receiving an event will the consumer will be able to inspect the tenant ID to filter the tenant of interest, making access segregation virtually impossible. This means sensitive data must be encrypted to make sure a tenant can’t read another tenant’s data when viewing these events. In Kafka, server-side encryption can only be set at the cluster level, where all tenants sharing a cluster will share the same server-side encryption key.

In Kafka, data retention can only be set on the topic. In the pool model, events belonging to all tenants are sent to the same topic, so tenant-specific operations like deleting all data for a tenant will not be possible. The immutable, append-only nature of Kafka only allows an entire topic to be deleted, not selective events belonging to a specific tenant. If specific customer data in the stream requires the right to be forgotten, such as for GDPR, a pool model will not work for that data and silo should be considered for that specific data stream.

Bridge model

In the bridge model, a single Kafka cluster is used across all tenants, but events from different tenants are segregated into different topics. With this model, there is a topic for each group of related events per tenant. You can simplify operations by adopting a topic naming convention such as including the tenant ID in the topic name. This will practically create a namespace per tenant, and also allows different administrators to manage different tenants, setting permissions with a prefix ACL, and avoiding naming clashes (for example, events related to orders for tenant 1 go to tenant1.orders and orders of tenant 2 go to tenant2.orders). The following diagram illustrates this architecture.

Image showing multiple producers and consumers each publishing to a stream-per-tenant

With the bridge model, server-side encryption using a per-tenant key is not possible. Data from different tenants is stored in the same MSK cluster, and server-side encryption keys can be specified per cluster only. For the same reason, data segregation can only be achieved at file level, because separate topics are stored in separate files. Amazon MSK stores all topics within the same Amazon Elastic Block Store (Amazon EBS) volume.

The bridge model offers per-tenant customization, such as retention policy or max message size, because Kafka allows you to set these parameters per topic. The bridge model also simplifies segregating and decoupling event processing per tenant, allowing a stronger isolation between separate applications that process data of separate tenants.

To summarize, the bridge model offers the following capabilities:

  • Tenant processing segregation – A consumer application can selectively subscribe to the topics belonging to specific tenants and only receive events for those tenants. A SaaS provider will be able to delete data for specific tenants, selectively deleting the topics belonging to that tenant.
  • Selective scaling of the processing – With Kafka, the maximum number of parallel consumers is determined by the number of partitions of a topic, and the number of partitions can be set per topic, and therefore per tenant.
  • Performance fairness – You can implement performance fairness using Kafka quotas, supported by Amazon MSK, preventing the services processing a particularly busy tenant to consume too many cluster resources, at the expense of other tenants. Refer to the following two-part series for more details on Kafka quotas in Amazon MSK, and an example implementation for IAM authentication.
  • Tenant isolation – You can implement tenant isolation using IAM access control or Apache Kafka ACLs, depending on the authentication scheme that is used with Amazon MSK. Both IAM and Kafka ACLs allow you to control access per topic. You can authorize an application to access only the topics belonging to the tenant it is supposed to process.

Trade-offs in a SaaS environment

Although each model provides different capabilities for data partitioning, performance fairness, and tenant isolation, they also come with different costs and complexities. During planning, it’s important to identify what trade-offs you are willing to make for typical customers, and provide a tier structure to your client subscriptions.

The following table summarizes the supported capabilities of the three models in a streaming application.

. Pool Bridge Silo
Per-tenant encryption at rest No No Yes
Can implement right to be forgotten for single tenant No Yes Yes
Per-tenant retention policies No Yes Yes
Per-tenant event size limit No Yes Yes
Per-tenant replayability Yes (must implement with logic in consumers) Yes Yes

Anti-patterns

In the bridge model, we discussed tenant segregation by topic. An alternative would be segregating by partition, where all messages of a given type are sent to the same topic (for example, orders), but each tenant has a dedicated partition. This approach has many disadvantages and we strongly discourage it. In Kafka, partitions are the unit of horizontal scaling and balancing of brokers and consumers. Assigning partitions per tenants can introduce unbalancing of the cluster, and operational and performance issues that will be hard to overcome.

Some level of data isolation, such as per-tenant encryption keys, could be achieved using client-side encryption, delegating any encryption or description to the producer and consumer applications. This approach would allow you to use a separate encryption key per tenant. We don’t recommend this approach because it introduces a higher level of complexity in both the consumer and producer applications. It may also prevent you from using most of the standard programming libraries, Kafka tooling, and most Kafka ecosystem services, like Kafka Connect or MSK Connect.

Conclusion

In this post, we explored three patterns that SaaS vendors can use when architecting multi-tenant streaming applications with Amazon MSK: the pool, bridge, and silo models. Each model presents different trade-offs between operational simplicity, tenant isolation level, and cost efficiency.

The silo model dedicates full MSK clusters per tenant, offering a straightforward tenant isolation approach but incurring a higher maintenance and cost per tenant. The pool model offers increased operational and cost-efficiencies by sharing all resources across tenants, but provides limited data partitioning, performance fairness, and tenant isolation capabilities. Finally, the bridge model offers a good compromise between operational and cost-efficiencies while providing a good range of options to create robust tenant isolation and performance fairness strategies.

When architecting your multi-tenant streaming solution, carefully evaluate your requirements around tenant isolation, data privacy, per-tenant customization, and performance guarantees to determine the appropriate model. Combine models if needed to find the right balance for your business. As you scale your application, reassess isolation needs and migrate across models accordingly.

As you’ve seen in this post, there is no one-size-fits-all pattern for streaming data in a multi-tenant architecture. Carefully weighing your streaming outcomes and customer needs will help determine the correct trade-offs you can make while making sure your customer data is secure and auditable. Continue your learning journey on SkillBuilder with our SaaS curriculum, get hands-on with an AWS Serverless SaaS workshop or Amazon EKS SaaS workshop, or dive deep with Amazon MSK Labs.


About the Authors

Emmanuele Levi is a Solutions Architect in the Enterprise Software and SaaS team, based in London. Emanuele helps UK customers on their journey to refactor monolithic applications into modern microservices SaaS architectures. Emanuele is mainly interested in event-driven patterns and designs, especially when applied to analytics and AI, where he has expertise in the fraud-detection industry.

Lorenzo Nicora is a Senior Streaming Solution Architect helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working across industries, in consultancies and product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Nicholas Tunney is a Senior Partner Solutions Architect for Worldwide Public Sector at AWS. He works with Global SI partners to develop architectures on AWS for clients in the government, nonprofit healthcare, utility, and education sectors.  He is also a core member of the SaaS Technical Field Community where he gets to meet clients from all over the world who are building SaaS on AWS.

AWS Weekly Roundup: New AWS Heroes, Amazon API Gateway, Amazon Q and more (June 10, 2024)

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-new-aws-heroes-amazon-api-gateway-amazon-q-and-more-june-10-2024/

In the last AWS Weekly Roundup, Channy reminded us on how life has ups and downs. It’s just how life is. But, that doesn’t mean that we should do it alone. Farouq Mousa, AWS Community Builder, is fighting brain cancer and Allen Helton, AWS Serverless Hero, his daughter is fighting leukemia.

If you have a moment, please visit their campaign pages and give your support.

Meanwhile, we’ve just finished a few AWS Summits in India, Korea and also Thailand. As always, I had so much fun working together at Developer Lounge with AWS Heroes, AWS Community Builders, and AWS User Group leaders. Here’s a photo from everyone here.

Last Week’s Launches
Here are some launches that caught my attention last week:

Welcome, new AWS Heroes! — Last week, we just announced new cohort for AWS Heroes, worldwide group of AWS experts who go above and beyond to share knowledge and empower their communities.

Amazon API Gateway increased integration timeout limit — If you’re using Regional REST APIs and private REST APIs in Amazon API Gateway, now you can increase the integration timeout limit greater than 29 seconds. This allows you to run various workloads requiring longer timeouts.

Amazon Q offers inline completion in the command line — Now, Amazon Q Developer provides real-time AI-generated code suggestions as you type in your command line. As a regular command line interface (CLI) user, I’m really excited about this.

New common control library in AWS Audit Manager — This announcement helps you to save time when mapping enterprise controls into AWS Audit Manager. Check out Danilo’s post where he elaborated how that you can simplify risk and complicance assessment with the new common control library.

Amazon Inspector container image scanning for Amazon CodeCatalyst and GitHub actions — If you need to integrate your CI/CD with software vulnerabilities checking, you can use Amazon Inspector. Now, with this native integration in GitHub actions and Amazon CodeCatalyst, it streamlines your development pipeline process.

Ingest streaming data with Amazon OpenSearch Ingestion and Amazon Managed Streaming for Apache Kafka — With this new capability, now you can build more efficient data pipelines for your complex analytics use cases. Now, you can seamlessly index the data from your Amazon MSK Serverless clusters in Amazon OpenSearch service.

Amazon Titan Text Embeddings V2 now available in Amazon Bedrock Knowledge Base — You now can embed your data into a vector database using Amazon Titan Text Embeddings V2. This will be helpful for you to retrieve relevant information for various tasks.

Max tokens 8,192
Languages 100+ in pre-training
Fine-tuning supported No
Normalization supported Yes
Vector size 256, 512, 1,024 (default)

From Community.aws
Here’s my 3 personal favorites posts from community.aws:

Upcoming AWS events
Check your calendars and sign up for these AWS and AWS Community events:

  • AWS Summits — Join free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Register in your nearest city: Japan (June 20), Washington, DC (June 26–27), and New York (July 10).

  • AWS re:Inforce — Join us for AWS re:Inforce (June 10–12) in Philadelphia, PA. AWS re:Inforce is a learning conference focused on AWS security solutions, cloud security, compliance, and identity. Connect with the AWS teams that build the security tools and meet AWS customers to learn about their security journeys.

  • AWS Community Days — Join community-led conferences that feature technical discussions, workshops, and hands-on labs led by expert AWS users and industry leaders from around the world: Midwest | Columbus (June 13), Sri Lanka (June 27), Cameroon (July 13), New Zealand (August 15), Nigeria (August 24), and New York (August 28).

You can browse all upcoming in-person and virtual events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

Donnie

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Introducing support for Apache Kafka on Raft mode (KRaft) with Amazon MSK clusters

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/introducing-support-for-apache-kafka-on-raft-mode-kraft-with-amazon-msk-clusters/

Organizations are adopting Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to capture and analyze data in real time. Amazon MSK helps you build and run production applications on Apache Kafka without needing Kafka infrastructure management expertise or having to deal with the complex overhead associated with setting up and running Apache Kafka on your own. Since its inception, Apache Kafka has depended on Apache Zookeeper for storing and replicating the metadata of Kafka brokers and topics. Starting from Apache Kafka version 3.3, the Kafka community has adopted KRaft (Apache Kafka on Raft), a consensus protocol, to replace Kafka’s dependency on ZooKeeper for metadata management. In the future, the Apache Kafka community plans to remove the ZooKeeper mode entirely.

Today, we’re excited to launch support for KRaft on new clusters on Amazon MSK starting from version 3.7. In this post, we walk you through some details around how KRaft mode helps over the ZooKeeper approach. We also guide you through the process of creating MSK clusters with KRaft mode and how to connect your application to MSK clusters with KRaft mode.

Why was ZooKeeper replaced with KRaft mode

The traditional Kafka architecture relies on ZooKeeper as the authoritative source for cluster metadata. Read and write access to metadata in ZooKeeper is funneled through a single Kafka controller. For clusters with a large number of partitions, this architecture can create a bottleneck during scenarios such as an uncontrolled broker shutdown or controller failover, due to a single-controller approach.

KRaft mode addresses these limitations by managing metadata within the Kafka cluster itself. Instead of relying on a separate ZooKeeper cluster, KRaft mode stores and replicates the cluster metadata across multiple Kafka controller nodes, forming a metadata quorum. The KRaft controller nodes comprise a Raft quorum that manages the Kafka metadata log. By distributing the metadata management responsibilities across multiple controller nodes, KRaft mode improves recovery time for scenarios such as uncontrolled broker shutdown or controller failover. For more details on KRaft mode and its implementation, refer to the KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum.

The following figure compares the three-node MSK cluster architecture with ZooKeeper vs. KRaft mode.

Amazon MSK with KRaft mode

Until now, Amazon MSK has supported Kafka clusters that rely on ZooKeeper for metadata management. One of the key benefits of Amazon MSK is that it handles the complexity of setting up and managing the ZooKeeper cluster at no additional cost. Many organizations use Amazon MSK to run large, business-critical streaming applications that require splitting their traffic across thousands of partitions. As the size of a Kafka cluster grows, the amount of metadata generated within the cluster increases proportionally to the number of partitions.

Two key properties govern the number of partitions a Kafka cluster can support: the per-node partition count limit and the cluster-wide partition limit. As mentioned earlier, the metadata management system based on ZooKeeper imposed a bottleneck on the cluster-wide partition limitation in Apache Kafka. However, with the introduction of KRaft mode in Amazon MSK starting with version 3.7, Amazon MSK now enables the creation of clusters with up to 60 brokers vs. the default quota of 30 brokers in ZooKeeper mode. Kafka’s scalability still fundamentally relies on expanding the cluster by adding more nodes to increase overall capacity. Consequently, the cluster-wide partition limit continues to define the upper bounds of scalability within the Kafka system, because it determines the maximum number of partitions that can be distributed across the available nodes. Amazon MSK manages the KRaft controller nodes at no additional cost.

Create and access an MSK cluster with KRaft mode

Complete the following steps to configure an MSK cluster with KRaft mode:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Choose Create cluster.
  3. For Cluster creation method, select Custom create.
  4. For Cluster name, enter a name.
  5. For Cluster type¸ select Provisioned.
  6. For Apache Kafka version, choose 3.7.x.
  7. For Metadata mode, select KRaft.
  8. Leave the other settings as default and choose Create cluster.

When the cluster creation is successful, you can navigate to the cluster and choose View client integration information, which will provide details about the cluster bootstrap servers.

Adapt your client applications and tools for accessing MSK clusters with KRaft mode

With the adoption of KRaft mode in Amazon MSK, customers using client applications and tools that connect to ZooKeeper to interact with MSK clusters will need to update them to reflect the removal of ZooKeeper from the architecture. Starting with version 1.0, Kafka introduced the ability for admin tools to use the bootstrap servers (brokers) as input parameters instead of a ZooKeeper connection string, and started deprecating ZooKeeper connection strings starting with version 2.5. This change was part of the efforts to decouple Kafka from ZooKeeper and pave the way for its eventual replacement with KRaft mode for metadata management. Instead of specifying the ZooKeeper connection string, clients will need to use the bootstrap.servers configuration option to connect directly to the Kafka brokers. The following table summarizes these changes.

. With Zookeeper With KRaft
Client and Services bootstrap.servers=broker:<port> or zookeeper.connect=zookeeper:2181 (deprecated) bootstrap.servers=broker:<port>
Admin Tools kafka-topics --zookeeper zookeeper:2181 (deprecated) or kafka-topics —bootstrap-server broker:<port> … —command-config kafka-topics —bootstrap-server broker:<port> … —command-config

Summary

In this post, we discussed how Amazon MSK has launched support for KRaft mode for metadata management. We also described how KRaft works and how it’s different from ZooKeeper.

To get started, create a new cluster with KRaft mode using the AWS Management Console, and refer to the Amazon MSK Developer Guide for more information.


About the author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Safely remove Kafka brokers from Amazon MSK provisioned clusters

Post Syndicated from Vidhi Taneja original https://aws.amazon.com/blogs/big-data/safely-remove-kafka-brokers-from-amazon-msk-provisioned-clusters/

Today, we are announcing broker removal capability for Amazon Managed Streaming for Apache Kafka (Amazon MSK) provisioned clusters, which lets you remove multiple brokers from your provisioned clusters. You can now reduce your cluster’s storage and compute capacity by removing sets of brokers, with no availability impact, data durability risk, or disruption to your data streaming applications. Amazon MSK is a fully managed Apache Kafka service that makes it easy for developers to build and run highly available, secure, and scalable streaming applications. Administrators can optimize the costs of their Amazon MSK clusters by reducing broker count and adapting the cluster capacity to the changes in the streaming data demand, without affecting their clusters’ performance, availability, or data durability.

You can use Amazon MSK as a core foundation to build a variety of real-time streaming applications and high-performance event-driven architectures. As business needs and traffic patterns change, cluster capacity is often adjusted to optimize costs. Amazon MSK provides flexibility and elasticity for administrators to right-size MSK clusters. You can increase broker count or the broker size to manage the surge in traffic during peak events or decrease the instance size of brokers of the cluster to reduce capacity. However, to reduce the broker count, earlier you had to undertake effort-intensive migration to another cluster.

With the broker removal capability, you can now remove multiple brokers from your provisioned clusters to meet the varying needs of your streaming workloads. During and post broker removal, the cluster continues to handle read and write requests from the client applications. MSK performs the necessary validations to safeguard against data durability risks and gracefully removes the brokers from the cluster. By using broker removal capability, you can precisely adjust MSK cluster capacity, eliminating the need to change the instance size of every broker in the cluster or having to migrate to another cluster to reduce broker count.

How the broker removal feature works

Before you execute the broker removal operation, you must make some brokers eligible for removal by moving all partitions off of them. You can use Kafka admin APIs or Cruise Control to move partitions to other brokers that you intend to retain in the cluster.

You choose which brokers to remove and move the partitions from those brokers to other brokers using Kafka tools. Alternatively, you may have brokers that are not hosting any partitions. Then use Edit number of brokers feature using the AWS Management Console, or the Amazon MSK API UpdateBrokerCount. Here are details on how you can use this new feature:

  • You can remove a maximum of one broker per Availability Zone (AZ) in a single broker removal operation. To remove more brokers, you can call multiple broker removal operations consecutively after the prior operation has been completed. You must retain at least one broker per AZ in your MSK cluster.
  • The target number of broker nodes in the cluster must be a multiple of the number of availability zones (AZs) in the client subnets parameter. For example, a cluster with subnets in two AZs must have a target number of nodes that is a multiple of two.
  • If the brokers you removed were present in the bootstrap broker string, MSK will perform the necessary routing so that the client’s connectivity to the cluster is not disrupted. You don’t need to make any client changes to change your bootstrap strings.
  • You can add brokers back to your cluster anytime using AWS Console, or the UpdateBrokerCount API.
  • Broker removal is supported on Kafka versions 2.8.1 and above. If you have clusters in lower versions, you must first upgrade to version 2.8.1 or above and then remove brokers.
  • Broker removal doesn’t support the t3.small instance type.
  • You will stop incurring costs for the removed brokers once the broker removal operation is completed successfully.
  • When brokers are removed from a cluster, their associated local storage is removed as well.

Considerations before removing brokers

Removing brokers from an existing Apache Kafka cluster is a critical operation that needs careful planning to avoid service disruption. When deciding how many brokers you should remove from the cluster, determine your cluster’s minimum broker count by considering your requirements around availability, durability, local data retention, and partition count. Here are a few things you should consider:

  • Check Amazon CloudWatch BytesInPerSec and BytesOutPerSec metrics for your cluster. Look for the peak load over a period of 1 month. Use this data with MSK sizing Excel file to identify how many brokers you need to handle your peak load. If the number of brokers listed in the Excel file is higher than the number of brokers that would remain after removing brokers, do not proceed with this operation. This indicates that removing brokers would result in too few brokers for the cluster, which can lead to availability impact for your cluster or applications.
  • Check UserPartitionExists metrics to verify that you have at least 1 empty broker per AZ in your cluster. If not, make sure to remove partitions from at least one broker per AZ before invoking the operation.
  • If you have more than one broker per AZ with no user partitions on them, MSK will randomly pick one of those during the removal operation.
  • Check the PartitionCount metrics to know the number of partitions that exist on your cluster. Check per broker partition limit. The broker removal feature will not allow the removal of brokers if the service detects that any brokers in the cluster have breached the partition limit. In that case, check if any unused topics could be removed instead to free up broker resources.
  • Check if the estimated storage in the Excel file exceeds the currently provisioned storage for the cluster. In that case, first provision additional storage on that cluster. If you are hitting per-broker storage limits, consider approaches like using MSK tiered storage or removing unused topics. Otherwise, avoid moving partitions to just a few brokers as that may lead to a disk full issue.
  • If the brokers you are planning to remove host partitions, make sure those partitions are reassigned to other brokers in the cluster. Use the kafka-reassign-partitions.sh tool or Cruise Control to initiate partition reassignment. Monitor the progress of reassignment to completion. Disregard the __amazon_msk_canary, __amazon_msk_canary_state internal topics, because they are managed by the service and will be automatically removed by MSK while executing the operation.
  • Verify the cluster status is Active, before starting the removal process.
  • Check the performance of the workload on your production environment after you move those partitions. We recommend monitoring this for a week before you remove the brokers to make sure that the other brokers in your cluster can safely handle your traffic patterns.
  • If you experience any impact on your applications or cluster availability after removing brokers, you can add the same number of brokers that you removed earlier by using the UpdateBrokerCount API, and then reassign partitions to the newly added brokers.
  • We recommend you test the entire process in a non-production environment, to identify and resolve any issues before making changes in the production environment.

Conclusion

Amazon MSK’s new broker removal capability provides a safe way to reduce the capacity of your provisioned Apache Kafka clusters. By allowing you to remove brokers without impacting availability, data durability, or disrupting your streaming applications, this feature enables you to optimize costs and right-size your MSK clusters based on changing business needs and traffic patterns. With careful planning and by following the recommended best practices, you can confidently use this capability to manage your MSK resources more efficiently.

Start taking advantage of the broker removal feature in Amazon MSK today. Review the documentation and follow the step-by-step guide to test the process in a non-production environment. Once you are comfortable with the workflow, plan and execute broker removal in your production MSK clusters to optimize costs and align your streaming infrastructure with your evolving workload requirements.


About the Authors


Vidhi Taneja is a Principal Product Manager for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She is passionate about helping customers build streaming applications at scale and derive value from real-time data. Before joining AWS, Vidhi worked at Apple, Goldman Sachs and Nutanix in product management and engineering roles. She holds an MS degree from Carnegie Mellon University.


Anusha Dasarakothapalli is a Principal Software Engineer for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She started her software engineering career with Amazon in 2015 and worked on products such as S3-Glacier and S3 Glacier Deep Archive, before transitioning to MSK in 2022. Her primary areas of focus lie in streaming technology, distributed systems, and storage.


Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Nexthink scales to trillions of events per day with Amazon MSK

Post Syndicated from Moe Haidar original https://aws.amazon.com/blogs/big-data/nexthink-scales-to-trillions-of-events-per-day-with-amazon-msk/

Real-time data streaming and event processing present scalability and management challenges. AWS offers a broad selection of managed real-time data streaming services to effortlessly run these workloads at any scale.

In this post, Nexthink shares how Amazon Managed Streaming for Apache Kafka (Amazon MSK) empowered them to achieve massive scale in event processing. Experiencing business hyper-growth, Nexthink migrated to AWS to overcome the scaling limitations of on-premises solutions. With Amazon MSK, Nexthink now seamlessly processes trillions of events per day, reaching over 5 GB per second of aggregated throughput.

In the following sections, Nexthink introduces their product and the need for scalability. They then highlight the challenges of their legacy on-premises application and present their transition to a cloud-centered software as a service (SaaS) architecture powered by Amazon MSK. Finally, Nexthink details the benefits achieved by adopting Amazon MSK.

Nexthink’s need to scale

Nexthink is the leader in digital employee experience (DeX). The company is shaping the future of work by providing IT leaders and C-levels with insights into employees’ daily technology experiences at the device and application level. This allows IT to evolve from reactive problem-solving to proactive optimization.

The Nexthink Infinity platform combines analytics, monitoring, automation, and more to manage the employee digital experience. By collecting device and application events, processing them in real time, and storing them, our platform analyzes data to solve problems and boost experiences for over 15 million employees across five continents.

In just 3 years, Nexthink’s business grew tenfold, and with the introduction of more real-time data our application had to scale from processing 200 MB per second to 5 GB per second and trillions of events daily. To enable this growth, we modernized our application from an on-premises single-tenant monolith to a cloud-based scalable SaaS solution powered by Amazon MSK.

The next sections detail our modernization journey, including the challenges we faced and the benefits we realized with our new cloud-centered, AWS-based architecture.

The on-premises solution and its challenges

Let’s first explore our previous on-premises solution, Nexthink V6, before examining how Amazon MSK addressed its challenges. The following diagram illustrates its architecture.

Nexthink v6

V6 was made up of two monolithic, single-tenant Java and C++ applications that were tightly coupled. The portal was a backend-for-frontend Java application, and the core engine was an in-house C++ in-memory database application that was also handling device connections, data ingestion, aggregation, and querying. By bundling all these functions together, the engine became difficult to manage and improve.

V6 also lacked scalability. Initially supporting 10,000 devices, some new tenants had over 300,000 devices. We reacted by deploying multiple V6 engines per tenant, increasing complexity and cost, hampering user experience, and delaying time to market. This also led to longer proof of concept and onboarding cycles, which hurt the business.

Furthermore, the absence of a streaming platform like Kafka created dependencies between teams through tight HTTP/gRPC coupling. Additionally, teams couldn’t access real-time events before ingestion into the database, limiting feature development. We also lacked a data buffer, risking potential data loss during outages. Such constraints impeded innovation and increased risks.

In summary, although the V6 system served its initial purpose, reinventing it with cloud-centered technologies became imperative to enhance scalability, reliability, and foster innovation by our engineering and product teams.

Transitioning to a cloud-centered architecture with Amazon MSK

To achieve our modernization goals, after thorough research and iterations, we implemented an event-driven microservices design on Amazon Elastic Kubernetes Service (Amazon EKS), using Kafka on Amazon MSK for distributed event storage and streaming.

Our transition from the v6 on-prem solution to the cloud-centered platform was phased over four iterations:

  • Phase 1 – We lifted and shifted from on premises to virtual machines in the cloud, reducing operational complexities and accelerating proof of concept cycles while transparently migrating customers.
  • Phase 2 – We extended the cloud architecture by implementing new product features with microservices and self-managed Kafka on Kubernetes. However, operating Kafka clusters ourselves proved overly difficult, leading us to Phase 3.
  • Phase 3 – We switched from self-managed Kafka to Amazon MSK, improving stability and reducing operational costs. We realized that managing Kafka wasn’t our core competency or differentiator, and the overhead was high. Amazon MSK enabled us to focus on our core application, freeing us from the burden of undifferentiated Kafka management.
  • Phase 4 – Finally, we eliminated all legacy components, completing the transition to a fully cloud-centered SaaS platform. This multi-year journey of learning and transformation took 3 years.

Today, after our successful transition, we use Amazon MSK for two key functions:

  • Real-time data ingestion and processing of trillions of daily events from over 15 million devices worldwide, as illustrated in the following figure.

Nexthink Architecture Ingestion

  • Enabling an event-driven system that decouples data producers and consumers, as depicted in the following figure.

Nexthink Architecture Event Driven

To further enhance our scalability and resilience, we adopted a cell-based architecture using the wide availability of Amazon MSK across AWS Regions. We currently operate over 10 cells, each representing an independent regional deployment of our SaaS solution. This cell-based approach minimizes the area of impact in case of issues, addresses data residency requirements, and enables horizontal scaling across AWS Regions, as illustrated in the following figure.

Nexthink Architecture Cells

Benefits of Amazon MSK

Amazon MSK has been critical in enabling our event-driven design. In this section, we outline the main benefits we gained from its adoption.

Improved data resilience

In our new architecture, data from devices is pushed directly to Kafka topics in Amazon MSK, which provides high availability and resilience. This makes sure that events can be safely received and stored at any time. Our services consuming this data inherit the same resilience from Amazon MSK. If our backend ingestion services face disruptions, no event is lost, because Kafka retains all published messages. When our services resume, they seamlessly continue processing from where they left off, thanks to Kafka’s producer semantics, which allow processing messages exactly-once, at-least-once, or at-most-once based on application needs.

Amazon MSK enables us to tailor the data retention duration to our specific requirements, ranging from seconds to unlimited duration. This flexibility grants uninterrupted data availability to our application, which wasn’t possible with our previous architecture. Furthermore, to safeguard data integrity in the event of processing errors or corruption, Kafka enabled us to implement a data replay mechanism, ensuring data consistency and reliability.

Organizational scaling

By adopting an event-driven architecture with Amazon MSK, we decomposed our monolithic application into loosely coupled, stateless microservices communicating asynchronously via Kafka topics. This approach enabled our engineering organization to scale rapidly from just 4–5 teams in 2019 to over 40 teams and approximately 350 engineers today.

The loose coupling between event publishers and subscribers empowered teams to focus on distinct domains, such as data ingestion, identification services, and data lakes. Teams could develop solutions independently within their domains, communicating through Kafka topics without tight coupling. This architecture accelerated feature development by minimizing the risk of new features impacting existing ones. Teams could efficiently consume events published by others, offering new capabilities more rapidly while reducing cross-team dependencies.

The following figure illustrates the seamless workflow of adding new domains to our system.

Adding domains

Furthermore, the event-driven design allowed teams to build stateless services that could seamlessly auto scale based on MSK metrics like messages per second. This event-driven scalability eliminated the need for extensive capacity planning and manual scaling efforts, freeing up development time.

By using an event-driven microservices architecture on Amazon MSK, we achieved organizational agility, enhanced scalability, and accelerated innovation while minimizing operational overhead.

Seamless infrastructure scaling

Nexthink’s business grew tenfold in 3 years, and many new capabilities were added to the product, leading to a substantial increase in traffic from 200 MB per second to 5 GB per second. This exponential data growth was enabled by the robust scalability of Amazon MSK. Achieving such scale with an on-premises solution would have been challenging and expensive, if not infeasible.

Attempting to self-manage Kafka imposed unnecessary operational overhead without providing business value. Running it with just 5% of today’s traffic was already complex and required two engineers. At today’s volumes, we estimated needing 6–10 dedicated staff, increasing costs and diverting resources away from core priorities.

Real-time capabilities

By channeling all our data through Amazon MSK, we enabled real-time processing of events. This unlocked capabilities like real-time alerts, event-driven triggers, and webhooks that were previously unattainable. As such, Amazon MSK was instrumental in facilitating our event-driven architecture and powering impactful innovations.

Secure data access

Transitioning to our new architecture, we met our security and data integrity goals. With Kafka ACLs, we enforced strict access controls, allowing consumers and producers to only interact with authorized topics. We based these granular data access controls on criteria like data type, domain, and team.

To securely scale decentralized management of topics, we introduced proprietary Kubernetes Custom Resource Definitions (CRDs). These CRDs enabled teams to independently manage their own topics, settings, and ACLs without compromising security.

Amazon MSK encryption made sure that the data remained encrypted at rest and in transit. We also introduced a Bring Your Own Key (BYOK) option, allowing application-level encryption with customer keys for all single-tenant and multi-tenant topics.

Enhanced observability

Amazon MSK gave us great visibility into our data flows. The out-of-the-box Amazon CloudWatch metrics let us see the amount and types of data flowing through each topic and cluster. This helped us quantify the usage of our product features by tracking data volumes at the topic level. The Amazon MSK operational metrics enabled effortless monitoring and right-sizing of clusters and brokers. Overall, the rich observability of Amazon MSK facilitated data-driven decisions about architecture and product features.

Conclusion

Nexthink’s journey from an on-premises monolith to a cloud SaaS was streamlined by using Amazon MSK, a fully managed Kafka service. Amazon MSK allowed us to scale seamlessly while benefiting from enterprise-grade reliability and security. By offloading Kafka management to AWS, we could stay focused on our core business and innovate faster.

Going forward, we plan to further improve performance, costs, and scalability by adopting Amazon MSK capabilities such as tiered storage and AWS Graviton-based EC2 instance types.

We are also working closely with the Amazon MSK team to prepare for upcoming service features. Rapidly adopting new capabilities will help us remain at the forefront of innovation while continuing to grow our business.

To learn more about how Nexthink uses AWS to serve its global customer base, explore the Nexthink on AWS case study. Additionally, discover other customer success stories with Amazon MSK by visiting the Amazon MSK blog category.


About the Authors

Moe HaidarMoe Haidar is a principal engineer and special projects lead @ CTO office of Nexthink. He has been involved with AWS since 2018 and is a key contributor to the cloud transformation of the Nexthink platform to AWS. His focus is on product and technology incubation and architecture, but he also loves doing hands-on activities to keep his knowledge of technologies sharp and up to date. He still contributes heavily to the code base and loves to tackle complex problems.
Simone PomataSimone Pomata is Senior Solutions Architect at AWS. He has worked enthusiastically in the tech industry for more than 10 years. At AWS, he helps customers succeed in building new technologies every day.
Magdalena GargasMagdalena Gargas is a Solutions Architect passionate about technology and solving customer challenges. At AWS, she works mostly with software companies, helping them innovate in the cloud. She participates in industry events, sharing insights and contributing to the advancement of the containerization field.

Exploring real-time streaming for generative AI Applications

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/exploring-real-time-streaming-for-generative-ai-applications/

Foundation models (FMs) are large machine learning (ML) models trained on a broad spectrum of unlabeled and generalized datasets. FMs, as the name suggests, provide the foundation to build more specialized downstream applications, and are unique in their adaptability. They can perform a wide range of different tasks, such as natural language processing, classifying images, forecasting trends, analyzing sentiment, and answering questions. This scale and general-purpose adaptability are what makes FMs different from traditional ML models. FMs are multimodal; they work with different data types such as text, video, audio, and images. Large language models (LLMs) are a type of FM and are pre-trained on vast amounts of text data and typically have application uses such as text generation, intelligent chatbots, or summarization.

Streaming data facilitates the constant flow of diverse and up-to-date information, enhancing the models’ ability to adapt and generate more accurate, contextually relevant outputs. This dynamic integration of streaming data enables generative AI applications to respond promptly to changing conditions, improving their adaptability and overall performance in various tasks.

To better understand this, imagine a chatbot that helps travelers book their travel. In this scenario, the chatbot needs real-time access to airline inventory, flight status, hotel inventory, latest price changes, and more. This data usually comes from third parties, and developers need to find a way to ingest this data and process the data changes as they happen.

Batch processing is not the best fit in this scenario. When data changes rapidly, processing it in a batch may result in stale data being used by the chatbot, providing inaccurate information to the customer, which impacts the overall customer experience. Stream processing, however, can enable the chatbot to access real-time data and adapt to changes in availability and price, providing the best guidance to the customer and enhancing the customer experience.

Another example is an AI-driven observability and monitoring solution where FMs monitor real-time internal metrics of a system and produces alerts. When the model finds an anomaly or abnormal metric value, it should immediately produce an alert and notify the operator. However, the value of such important data diminishes significantly over time. These notifications should ideally be received within seconds or even while it’s happening. If operators receive these notifications minutes or hours after they happened, such an insight is not actionable and has potentially lost its value. You can find similar use cases in other industries such as retail, car manufacturing, energy, and the financial industry.

In this post, we discuss why data streaming is a crucial component of generative AI applications due to its real-time nature. We discuss the value of AWS data streaming services such as Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Firehose in building generative AI applications.

In-context learning

LLMs are trained with point-in-time data and have no inherent ability to access fresh data at inference time. As new data appears, you will have to continuously fine-tune or further train the model. This is not only an expensive operation, but also very limiting in practice because the rate of new data generation far supersedes the speed of fine-tuning. Additionally, LLMs lack contextual understanding and rely solely on their training data, and are therefore prone to hallucinations. This means they can generate a fluent, coherent, and syntactically sound but factually incorrect response. They are also devoid of relevance, personalization, and context.

LLMs, however, have the capacity to learn from the data they receive from the context to more accurately respond without modifying the model weights. This is called in-context learning, and can be used to produce personalized answers or provide an accurate response in the context of organization policies.

For example, in a chatbot, data events could pertain to an inventory of flights and hotels or price changes that are constantly ingested to a streaming storage engine. Furthermore, data events are filtered, enriched, and transformed to a consumable format using a stream processor. The result is made available to the application by querying the latest snapshot. The snapshot constantly updates through stream processing; therefore, the up-to-date data is provided in the context of a user prompt to the model. This allows the model to adapt to the latest changes in price and availability. The following diagram illustrates a basic in-context learning workflow.

A commonly used in-context learning approach is to use a technique called Retrieval Augmented Generation (RAG). In RAG, you provide the relevant information such as most relevant policy and customer records along with the user question to the prompt. This way, the LLM generates an answer to the user question using additional information provided as context. To learn more about RAG, refer to Question answering using Retrieval Augmented Generation with foundation models in Amazon SageMaker JumpStart.

A RAG-based generative AI application can only produce generic responses based on its training data and the relevant documents in the knowledge base. This solution falls short when a near-real-time personalized response is expected from the application. For example, a travel chatbot is expected to consider the user’s current bookings, available hotel and flight inventory, and more. Moreover, the relevant customer personal data (commonly known as the unified customer profile) is usually subject to change. If a batch process is employed to update the generative AI’s user profile database, the customer may receive dissatisfying responses based on old data.

In this post, we discuss the application of stream processing to enhance a RAG solution used for building question answering agents with context from real-time access to unified customer profiles and organizational knowledge base.

Near-real-time customer profile updates

Customer records are typically distributed across data stores within an organization. For your generative AI application to provide a relevant, accurate, and up-to-date customer profile, it is vital to build streaming data pipelines that can perform identity resolution and profile aggregation across the distributed data stores. Streaming jobs constantly ingest new data to synchronize across systems and can perform enrichment, transformations, joins, and aggregations across windows of time more efficiently. Change data capture (CDC) events contain information about the source record, updates, and metadata such as time, source, classification (insert, update, or delete), and the initiator of the change.

The following diagram illustrates an example workflow for CDC streaming ingestion and processing for unified customer profiles.

In this section, we discuss the main components of a CDC streaming pattern required to support RAG-based generative AI applications.

CDC streaming ingestion

A CDC replicator is a process that collects data changes from a source system (usually by reading transaction logs or binlogs) and writes CDC events with the exact same order they occurred in a streaming data stream or topic. This involves a log-based capture with tools such as AWS Database Migration Service (AWS DMS) or open source connectors such as Debezium for Apache Kafka connect. Apache Kafka Connect is part of the Apache Kafka environment, allowing data to be ingested from various sources and delivered to variety of destinations. You can run your Apache Kafka connector on Amazon MSK Connect within minutes without worrying about configuration, setup, and operating an Apache Kafka cluster. You only need to upload your connector’s compiled code to Amazon Simple Storage Service (Amazon S3) and set up your connector with your workload’s specific configuration.

There are also other methods for capturing data changes. For example, Amazon DynamoDB provides a feature for streaming CDC data to Amazon DynamoDB Streams or Kinesis Data Streams. Amazon S3 provides a trigger to invoke an AWS Lambda function when a new document is stored.

Streaming storage

Streaming storage functions as an intermediate buffer to store CDC events before they get processed. Streaming storage provides reliable storage for streaming data. By design, it is highly available and resilient to hardware or node failures and maintains the order of the events as they are written. Streaming storage can store data events either permanently or for a set period of time. This allows stream processors to read from part of the stream if there is a failure or a need for re-processing. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. Amazon MSK is a fully managed, highly available, and secure service provided by AWS for running Apache Kafka.

Stream processing

Stream processing systems should be designed for parallelism to handle high data throughput. They should partition the input stream between multiple tasks running on multiple compute nodes. Tasks should be able to send the result of one operation to the next one over the network, making it possible for processing data in parallel while performing operations such as joins, filtering, enrichment, and aggregations. Stream processing applications should be able to process events with regards to the event time for use cases where events could arrive late or correct computation relies on the time events occur rather than the system time. For more information, refer to Notions of Time: Event Time and Processing Time.

Stream processes continuously produce results in the form of data events that need to be output to a target system. A target system could be any system that can integrate directly with the process or via streaming storage as in intermediary. Depending on the framework you choose for stream processing, you will have different options for target systems depending on available sink connectors. If you decide to write the results to an intermediary streaming storage, you can build a separate process that reads events and applies changes to the target system, such as running an Apache Kafka sink connector. Regardless of which option you choose, CDC data needs extra handling due to its nature. Because CDC events carry information about updates or deletes, it’s important that they merge in the target system in the right order. If changes are applied in the wrong order, the target system will be out of sync with its source.

Apache Flink is a powerful stream processing framework known for its low latency and high throughput capabilities. It supports event time processing, exactly-once processing semantics, and high fault tolerance. Additionally, it provides native support for CDC data via a special structure called dynamic tables. Dynamic tables mimic the source database tables and provide a columnar representation of the streaming data. The data in dynamic tables changes with every event that is processed. New records can be appended, updated, or deleted at any time. Dynamic tables abstract away the extra logic you need to implement for each record operation (insert, update, delete) separately. For more information, refer to Dynamic Tables.

With Amazon Managed Service for Apache Flink, you can run Apache Flink jobs and integrate with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

AWS Glue is a fully managed extract, transform, and load (ETL) service, which means AWS handles the infrastructure provisioning, scaling, and maintenance for you. Although it’s primarily known for its ETL capabilities, AWS Glue can also be used for Spark streaming applications. AWS Glue can interact with streaming data services such as Kinesis Data Streams and Amazon MSK for processing and transforming CDC data. AWS Glue can also seamlessly integrate with other AWS services such as Lambda, AWS Step Functions, and DynamoDB, providing you with a comprehensive ecosystem for building and managing data processing pipelines.

Unified customer profile

Overcoming the unification of the customer profile across a variety of source systems requires the development of robust data pipelines. You need data pipelines that can bring and synchronize all records into one data store. This data store provides your organization with the holistic customer records view that is needed for operational efficiency of RAG-based generative AI applications. For building such a data store, an unstructured data store would be best.

An identity graph is a useful structure for creating a unified customer profile because it consolidates and integrates customer data from various sources, ensures data accuracy and deduplication, offers real-time updates, connects cross-systems insights, enables personalization, enhances customer experience, and supports regulatory compliance. This unified customer profile empowers the generative AI application to understand and engage with customers effectively, and adhere to data privacy regulations, ultimately enhancing customer experiences and driving business growth. You can build your identity graph solution using Amazon Neptune, a fast, reliable, fully managed graph database service.

AWS provides a few other managed and serverless NoSQL storage service offerings for unstructured key-value objects. Amazon DocumentDB (with MongoDB compatibility) is a fast, scalable, highly available, and fully managed enterprise document database service that supports native JSON workloads. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

Near-real-time organizational knowledge base updates

Similar to customer records, internal knowledge repositories such as company policies and organizational documents are siloed across storage systems. This is typically unstructured data and is updated in a non-incremental fashion. The use of unstructured data for AI applications is effective using vector embeddings, which is a technique of representing high dimensional data such as text files, images, and audio files as multi-dimensional numeric.

AWS provides several vector engine services, such as Amazon OpenSearch Serverless, Amazon Kendra, and Amazon Aurora PostgreSQL-Compatible Edition with the pgvector extension for storing vector embeddings. Generative AI applications can enhance the user experience by transforming the user prompt into a vector and use it to query the vector engine to retrieve contextually relevant information. Both the prompt and the vector data retrieved are then passed to the LLM to receive a more precise and personalized response.

The following diagram illustrates an example stream-processing workflow for vector embeddings.

Knowledge base contents need to be converted to vector embeddings before being written to the vector data store. Amazon Bedrock or Amazon SageMaker can help you access the model of your choice and expose a private endpoint for this conversion. Furthermore, you can use libraries such as LangChain to integrate with these endpoints. Building a batch process can help you convert your knowledge base content to vector data and store it in a vector database initially. However, you need to rely on an interval to reprocess the documents to synchronize your vector database with changes in your knowledge base content. With a large number of documents, this process can be inefficient. Between these intervals, your generative AI application users will receive answers according to the old content, or will receive an inaccurate answer because the new content is not vectorized yet.

Stream processing is an ideal solution for these challenges. It produces events as per existing documents initially and further monitors the source system and creates a document change event as soon as they occur. These events can be stored in streaming storage and wait to be processed by a streaming job. A streaming job reads these events, loads the content of the document, and transforms the contents to an array of related tokens of words. Each token further transforms into vector data via an API call to an embedding FM. Results are sent for storage to the vector storage via a sink operator.

If you’re using Amazon S3 for storing your documents, you can build an event-source architecture based on S3 object change triggers for Lambda. A Lambda function can create an event in the desired format and write that to your streaming storage.

You can also use Apache Flink to run as a streaming job. Apache Flink provides the native FileSystem source connector, which can discover existing files and read their contents initially. After that, it can continuously monitor your file system for new files and capture their content. The connector supports reading a set of files from distributed file systems such as Amazon S3 or HDFS with a format of plain text, Avro, CSV, Parquet, and more, and produces a streaming record. As a fully managed service, Managed Service for Apache Flink removes the operational overhead of deploying and maintaining Flink jobs, allowing you to focus on building and scaling your streaming applications. With seamless integration into the AWS streaming services such as Amazon MSK or Kinesis Data Streams, it provides features like automatic scaling, security, and resiliency, providing reliable and efficient Flink applications for handling real-time streaming data.

Based on your DevOps preference, you can choose between Kinesis Data Streams or Amazon MSK for storing the streaming records. Kinesis Data Streams simplifies the complexities of building and managing custom streaming data applications, allowing you to focus on deriving insights from your data rather than infrastructure maintenance. Customers using Apache Kafka often opt for Amazon MSK due to its straightforwardness, scalability, and dependability in overseeing Apache Kafka clusters within the AWS environment. As a fully managed service, Amazon MSK takes on the operational complexities associated with deploying and maintaining Apache Kafka clusters, enabling you to concentrate on constructing and expanding your streaming applications.

Because a RESTful API integration suits the nature of this process, you need a framework that supports a stateful enrichment pattern via RESTful API calls to track for failures and retry for the failed request. Apache Flink again is a framework that can do stateful operations in at-memory speed. To understand the best ways to make API calls via Apache Flink, refer to Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink.

Apache Flink provides native sink connectors for writing data to vector datastores such as Amazon Aurora for PostgreSQL with pgvector or Amazon OpenSearch Service with VectorDB. Alternatively, you can stage the Flink job’s output (vectorized data) in an MSK topic or a Kinesis data stream. OpenSearch Service provides support for native ingestion from Kinesis data streams or MSK topics. For more information, refer to Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion and Loading streaming data from Amazon Kinesis Data Streams.

Feedback analytics and fine-tuning

It’s important for data operation managers and AI/ML developers to get insight about the performance of the generative AI application and the FMs in use. To achieve that, you need to build data pipelines that calculate important key performance indicator (KPI) data based on the user feedback and variety of application logs and metrics. This information is useful for stakeholders to gain real-time insight about the performance of the FM, the application, and overall user satisfaction about the quality of support they receive from your application. You also need to collect and store the conversation history for further fine-tuning your FMs to improve their ability in performing domain-specific tasks.

This use case fits very well in the streaming analytics domain. Your application should store each conversation in streaming storage. Your application can prompt users about their rating of each answer’s accuracy and their overall satisfaction. This data can be in a format of a binary choice or a free form text. This data can be stored in a Kinesis data stream or MSK topic, and get processed to generate KPIs in real time. You can put FMs to work for users’ sentiment analysis. FMs can analyze each answer and assign a category of user satisfaction.

Apache Flink’s architecture allows for complex data aggregation over windows of time. It also provides support for SQL querying over stream of data events. Therefore, by using Apache Flink, you can quickly analyze raw user inputs and generate KPIs in real time by writing familiar SQL queries. For more information, refer to Table API & SQL.

With Amazon Managed Service for Apache Flink Studio, you can build and run Apache Flink stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by Apache Zeppelin and use Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. With support for user-defined functions (UDFs), Apache Flink allows for building custom operators to integrate with external resources such as FMs for performing complex tasks such as sentiment analysis. You can use UDFs to compute various metrics or enrich user feedback raw data with additional insights such as user sentiment. To learn more about this pattern, refer to Proactively addressing customer concern in real-time with GenAI, Flink, Apache Kafka, and Kinesis.

With Managed Service for Apache Flink Studio, you can deploy your Studio notebook as a streaming job with one click. You can use native sink connectors provided by Apache Flink to send the output to your storage of choice or stage it in a Kinesis data stream or MSK topic. Amazon Redshift and OpenSearch Service are both ideal for storing analytical data. Both engines provide native ingestion support from Kinesis Data Streams and Amazon MSK via a separate streaming pipeline to a data lake or data warehouse for analysis.

Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses and data lakes, using AWS-designed hardware and machine learning to deliver the best price-performance at scale. OpenSearch Service offers visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

You can use the outcome of such analysis combined with user prompt data for fine-tuning the FM when is needed. SageMaker is the most straightforward way to fine-tune your FMs. Using Amazon S3 with SageMaker provides a powerful and seamless integration for fine-tuning your models. Amazon S3 serves as a scalable and durable object storage solution, enabling straightforward storage and retrieval of large datasets, training data, and model artifacts. SageMaker is a fully managed ML service that simplifies the entire ML lifecycle. By using Amazon S3 as the storage backend for SageMaker, you can benefit from the scalability, reliability, and cost-effectiveness of Amazon S3, while seamlessly integrating it with SageMaker training and deployment capabilities. This combination enables efficient data management, facilitates collaborative model development, and makes sure that ML workflows are streamlined and scalable, ultimately enhancing the overall agility and performance of the ML process. For more information, refer to Fine-tune Falcon 7B and other LLMs on Amazon SageMaker with @remote decorator.

With a file system sink connector, Apache Flink jobs can deliver data to Amazon S3 in open format (such as JSON, Avro, Parquet, and more) files as data objects. If you prefer to manage your data lake using a transactional data lake framework (such as Apache Hudi, Apache Iceberg, or Delta Lake), all of these frameworks provide a custom connector for Apache Flink. For more details, refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi.

Summary

For a generative AI application based on a RAG model, you need to consider building two data storage systems, and you need to build data operations that keep them up to date with all the source systems. Traditional batch jobs are not sufficient to process the size and diversity of the data you need to integrate with your generative AI application. Delays in processing the changes in source systems result in an inaccurate response and reduce the efficiency of your generative AI application. Data streaming enables you to ingest data from a variety of databases across various systems. It also allows you to transform, enrich, join, and aggregate data across many sources efficiently in near-real time. Data streaming provides a simplified data architecture to collect and transform users’ real-time reactions or comments on the application responses, helping you deliver and store the results in a data lake for model fine-tuning. Data streaming also helps you optimize data pipelines by processing only the change events, allowing you to respond to data changes more quickly and efficiently.

Learn more about AWS data streaming services and get started building your own data streaming solution.


About the Authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Imtiaz (Taz) Sayed is the World-Wide Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Introducing enhanced functionality for worker configuration management in Amazon MSK Connect

Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/introducing-enhanced-functionality-for-worker-configuration-management-in-amazon-msk-connect/

Amazon MSK Connect is a fully managed service for Apache Kafka Connect. With a few clicks, MSK Connect allows you to deploy connectors that move data between Apache Kafka and external systems.

MSK Connect now supports the ability to delete MSK Connect worker configurations, tag resources, and manage worker configurations and custom plugins using AWS CloudFormation. Together, these new capabilities make it straightforward to manage your MSK Connect resources and automate deployments through CI/CD pipelines.

MSK Connect makes it effortless to stream data to and from Apache Kafka over a private connection without requiring infrastructure management expertise. With a few clicks, you can deploy connectors like an Amazon S3 sink connector for loading streaming data to Amazon Simple Storage Service (Amazon S3), deploy connectors developed by third parties like Debezium for streaming change logs from databases into Apache Kafka, or deploy your own connector customized for your use case.

MSK Connect integrates external systems or AWS services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your Apache Kafka cluster into a data sink. The connector can also perform lightweight tasks such as transformation, format conversion, or filtering data before delivering the data to a destination. You can use a plugin to create the connecter; these custom plugins are resources that contain the code that defines connector logic.

The primary components of MSK Connect are workers. Each worker is a Java virtual machine (JVM) process that runs the connector logic based on the worker configuration provided. Worker configurations are resources that contain your connector configuration properties that can be reused across multiple connectors. Each worker is comprised of a set of tasks that copy the data in parallel.

Today, we are announcing three new capabilities in MSK Connect:

  • The ability to delete worker configurations
  • Support for resource tags for enabling resource grouping, cost allocation and reporting, and access control with tag-based policies
  • Support in AWS CloudFormation to manage worker configurations and custom plugins

In the following sections, we look at the new functionalities in more detail.

Delete worker configurations

Connectors for integrating Amazon Managed Streaming for Apache Kafka (Amazon MSK) with other AWS and partner services are usually created using a worker configuration (default or custom). These configurations can grow with the creation and deletion of connectors, potentially creating configuration management issues.

You can now use the new delete worker configuration API to delete unused configurations. The service checks that the worker configuration is not in use by any connectors before deleting the configuration. Additionally, you can now use a prefix filter to list worker configurations and custom plugins using the ListWorkerConfigurations and ListCustomPlugins API calls. The prefix filter allows you to list the selective resources with names starting with the prefix so you can perform quick selective deletes.

To test the new delete API, complete the following steps:

  1. On the Amazon MSK console, create a new worker configuration.
  2. Provide a name and optional description.
  3. In the Worker configuration section, enter your configuration code.

MSK Connect Worker Configuration

After you create the configuration, a Delete option is available on the configuration detail page (see the following screenshot) if the configuration is not being used in any connector.

To support this new API, an additional workerConfigurationState has been added, so you can more easily track the state of the worker configuration. This new state will be returned in the API call responses for CreateWorkerConfiguration, DescribeWorkerConfiguration, and ListWorkerConfigurations.

MSK Connect Worker Configuration

  1. Choose Delete to delete the worker configuration.
  2. In the confirmation pop-up, enter the name of the worker configuration, then choose Delete.

Delete MSKC Worker Configuration

If the worker configuration is being used with any connector, the Delete option is disabled, as shown in the following screenshot.

Resource tags

MSK Connect now also has support for resource tags. Tags are key-value metadata that can be associated with AWS service resources. You can add tags to connectors, custom plugins, and worker configurations to organize and find resources used across AWS services. In the following screenshots, our example MSK Connect connector, plugin, and worker configuration have been tagged with the resource tag key project and value demo-tags.

You can now tag your Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3 resources with the same project name, for example. Then you can use the tag to search for all resources linked to this particular project for cost allocation, reporting, resource grouping, or access control. MSK Connect supports adding tags when creating resources, applying tags to an existing resource, removing tags from a resource, and querying tags associated with a resource.

AWS CloudFormation support

Previously, you were only able to provision an MSK Connect connector with AWS CloudFormation by using an existing worker configuration. With this new feature, you can now perform CREATE, READ, UPDATE, DELETE, and LIST operations on connectors, and create and add new worker configurations using AWS CloudFormation.

The following code is an example of creating a worker configuration:

{
"Type": "AWS::KafkaConnect::WorkerConfiguration"
"Properties":{
"Name": "WorkerConfigurationName",
"Description": "WorkerConfigurationDescription",
"PropertiesFileContent": String,
"Tags": [Tag,…],
}
}

The return values are as follows:

  • ARN of the newly created worker configuration
  • State of the new worker configuration
  • Creation time of new worker configuration
  • Latest revision of the new worker configuration

Conclusion

MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we discussed the new features that were added to MSK Connect, which streamline connector and worker management with the introduction of APIs for deleting worker configurations, tagging MSK Connect resources, and support in AWS CloudFormation to create non-default worker configurations.

These capabilities are available in all AWS Regions where Amazon MSK Connect is available. For a list of Region availability, refer to AWS Services by Region. To learn more about MSK Connect, visit the Amazon MSK Connect Developer Guide.


About the Authors

Chinmayi Narasimhadevara is a is a Solutions Architect focused on Big Data and Analytics at Amazon Web Services. Chinmayi has over 20 years of experience in information technology. She helps AWS customers build advanced, highly scalable and performant solutions.

Harita Pappu is Technical Account Manager based out California. She has over 18 years of experience working in software industry building and scaling applications. She is passionate about new technologies and focused on helping customers achieve cost optimization and operational excellence.

Build an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK using Python

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-serverless-streaming-pipeline-with-apache-kafka-on-amazon-msk-using-python/

The volume of data generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and travel. Organizations are looking for more ways to quickly use the constant inflow of data to innovate for their businesses and customers. They have to reliably capture, process, analyze, and load the data into a myriad of data stores, all in real time.

Apache Kafka is a popular choice for these real-time streaming needs. However, it can be challenging to set up a Kafka cluster along with other data processing components that scale automatically depending on your application’s needs. You risk under-provisioning for peak traffic, which can lead to downtime, or over-provisioning for base load, leading to wastage. AWS offers multiple serverless services like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Data Firehose, Amazon DynamoDB, and AWS Lambda that scale automatically depending on your needs.

In this post, we explain how you can use some of these services, including MSK Serverless, to build a serverless data platform to meet your real-time needs.

Solution overview

Let’s imagine a scenario. You’re responsible for managing thousands of modems for an internet service provider deployed across multiple geographies. You want to monitor the modem connectivity quality that has a significant impact on customer productivity and satisfaction. Your deployment includes different modems that need to be monitored and maintained to ensure minimal downtime. Each device transmits thousands of 1 KB records every second, such as CPU usage, memory usage, alarm, and connection status. You want real-time access to this data so you can monitor performance in real time, and detect and mitigate issues quickly. You also need longer-term access to this data for machine learning (ML) models to run predictive maintenance assessments, find optimization opportunities, and forecast demand.

Your clients that gather the data onsite are written in Python, and they can send all the data as Apache Kafka topics to Amazon MSK. For your application’s low-latency and real-time data access, you can use Lambda and DynamoDB. For longer-term data storage, you can use managed serverless connector service Amazon Data Firehose to send data to your data lake.

The following diagram shows how you can build this end-to-end serverless application.

end-to-end serverless application

Let’s follow the steps in the following sections to implement this architecture.

Create a serverless Kafka cluster on Amazon MSK

We use Amazon MSK to ingest real-time telemetry data from modems. Creating a serverless Kafka cluster is straightforward on Amazon MSK. It only takes a few minutes using the AWS Management Console or AWS SDK. To use the console, refer to Getting started using MSK Serverless clusters. You create a serverless cluster, AWS Identity and Access Management (IAM) role, and client machine.

Create a Kafka topic using Python

When your cluster and client machine are ready, SSH to your client machine and install Kafka Python and the MSK IAM library for Python.

  • Run the following commands to install Kafka Python and the MSK IAM library:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python
  • Create a new file called createTopic.py.
  • Copy the following code into this file, replacing the bootstrap_servers and region information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))
  • Run the createTopic.py script to create a new Kafka topic called mytopic on your serverless cluster:
python createTopic.py

Produce records using Python

Let’s generate some sample modem telemetry data.

  • Create a new file called kafkaDataGen.py.
  • Copy the following code into this file, updating the BROKERS and region information with the details for your cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())
  • Run the kafkaDataGen.py to continuously generate random data and publish it to the specified Kafka topic:
python kafkaDataGen.py

Store events in Amazon S3

Now you store all the raw event data in an Amazon Simple Storage Service (Amazon S3) data lake for analytics. You can use the same data to train ML models. The integration with Amazon Data Firehose allows Amazon MSK to seamlessly load data from your Apache Kafka clusters into an S3 data lake. Complete the following steps to continuously stream data from Kafka to Amazon S3, eliminating the need to build or manage your own connector applications:

  • On the Amazon S3 console, create a new bucket. You can also use an existing bucket.
  • Create a new folder in your S3 bucket called streamingDataLake.
  • On the Amazon MSK console, choose your MSK Serverless cluster.
  • On the Actions menu, choose Edit cluster policy.

cluster policy

  • Select Include Firehose service principal and choose Save changes.

firehose service principal

  • On the S3 delivery tab, choose Create delivery stream.

delivery stream

  • For Source, choose Amazon MSK.
  • For Destination, choose Amazon S3.

source and destination

  • For Amazon MSK cluster connectivity, select Private bootstrap brokers.
  • For Topic, enter a topic name (for this post, mytopic).

source settings

  • For S3 bucket, choose Browse and choose your S3 bucket.
  • Enter streamingDataLake as your S3 bucket prefix.
  • Enter streamingDataLakeErr as your S3 bucket error output prefix.

destination settings

  • Choose Create delivery stream.

create delivery stream

You can verify that the data was written to your S3 bucket. You should see that the streamingDataLake directory was created and the files are stored in partitions.

amazon s3

Store events in DynamoDB

For the last step, you store the most recent modem data in DynamoDB. This allows the client application to access the modem status and interact with the modem remotely from anywhere, with low latency and high availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

Lets first create a table in DynamoDB. Refer to DynamoDB API permissions: Actions, resources, and conditions reference to verify that your client machine has the necessary permissions.

  • Create a new file called createTable.py.
  • Copy the following code into the file, updating the region information:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")
  • Run the createTable.py script to create a table called device_status in DynamoDB:
python createTable.py

Now let’s configure the Lambda function.

  • On the Lambda console, choose Functions in the navigation pane.
  • Choose Create function.
  • Select Author from scratch.
  • For Function name¸ enter a name (for example, my-notification-kafka).
  • For Runtime, choose Python 3.11.
  • For Permissions, select Use an existing role and choose a role with permissions to read from your cluster.
  • Create the function.

On the Lambda function configuration page, you can now configure sources, destinations, and your application code.

  • Choose Add trigger.
  • For Trigger configuration, enter MSK to configure Amazon MSK as a trigger for the Lambda source function.
  • For MSK cluster, enter myCluster.
  • Deselect Activate trigger, because you haven’t configured your Lambda function yet.
  • For Batch size, enter 100.
  • For Starting position, choose Latest.
  • For Topic name¸ enter a name (for example, mytopic).
  • Choose Add.
  • On the Lambda function details page, on the Code tab, enter the following code:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")
  • Deploy the Lambda function.
  • On the Configuration tab, choose Edit to edit the trigger.

edit trigger

  • Select the trigger, then choose Save.
  • On the DynamoDB console, choose Explore items in the navigation pane.
  • Select the table device_status.

You will see Lambda is writing events generated in the Kafka topic to DynamoDB.

ddb table

Summary

Streaming data pipelines are critical for building real-time applications. However, setting up and managing the infrastructure can be daunting. In this post, we walked through how to build a serverless streaming pipeline on AWS using Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose, and other services. The key benefits are no servers to manage, automatic scalability of the infrastructure, and a pay-as-you-go model using fully managed services.

Ready to build your own real-time pipeline? Get started today with a free AWS account. With the power of serverless, you can focus on your application logic while AWS handles the undifferentiated heavy lifting. Let’s build something awesome on AWS!


About the Authors

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Michael Oguike is a Product Manager for Amazon MSK. He is passionate about using data to uncover insights that drive action. He enjoys helping customers from a wide range of industries improve their businesses using data streaming. Michael also loves learning about behavioral science and psychology from books and podcasts.

How VMware Tanzu CloudHealth migrated from self-managed Kafka to Amazon MSK

Post Syndicated from Rivlin Pereira original https://aws.amazon.com/blogs/big-data/how-vmware-tanzu-cloudhealth-migrated-from-self-managed-kafka-to-amazon-msk/

This is a post co-written with Rivlin Pereira & Vaibhav Pandey from Tanzu CloudHealth (VMware by Broadcom).

VMware Tanzu CloudHealth is the cloud cost management platform of choice for more than 20,000 organizations worldwide, who rely on it to optimize and govern their largest and most complex multi-cloud environments. In this post, we discuss how the VMware Tanzu CloudHealth DevOps team migrated their self-managed Apache Kafka workloads (running version 2.0) to Amazon Managed Streaming for Apache Kafka (Amazon MSK) running version 2.6.2. We discuss the system architectures, deployment pipelines, topic creation, observability, access control, topic migration, and all the issues we faced with the existing infrastructure, along with how and why we migrated to the new Kafka setup and some lessons learned.

Kafka cluster overview

In the fast-evolving landscape of distributed systems, VMware Tanzu CloudHealth’s next-generation microservices platform relies on Kafka as its messaging backbone. For us, Kafka’s high-performance distributed log system excels in handling massive data streams, making it indispensable for seamless communication. Serving as a distributed log system, Kafka efficiently captures and stores diverse logs, from HTTP server access logs to security event audit logs.

Kafka’s versatility shines in supporting key messaging patterns, treating messages as basic logs or structured key-value stores. Dynamic partitioning and consistent ordering ensure efficient message organization. The unwavering reliability of Kafka aligns with our commitment to data integrity.

The integration of Ruby services with Kafka is streamlined through the Karafka library, acting as a higher-level wrapper. Our other language stack services use similar wrappers. Kafka’s robust debugging features and administrative commands play a pivotal role in ensuring smooth operations and infrastructure health.

Kafka as an architectural pillar

In VMware Tanzu CloudHealth’s next-generation microservices platform, Kafka emerges as a critical architectural pillar. Its ability to handle high data rates, support diverse messaging patterns, and guarantee message delivery aligns seamlessly with our operational needs. As we continue to innovate and scale, Kafka remains a steadfast companion, enabling us to build a resilient and efficient infrastructure.

Why we migrated to Amazon MSK

For us, migrating to Amazon MSK came down to three key decision points:

  • Simplified technical operations – Running Kafka on a self-managed infrastructure was an operational overhead for us. We hadn’t updated Kafka version 2.0.0 for a while, and Kafka brokers were going down in production, causing issues with topics going offline. We also had to run scripts manually for increasing replication factors and rebalancing leaders, which was additional manual effort.
  • Deprecated legacy pipelines and simplified permissions – We were looking to move away from our existing pipelines written in Ansible to create Kafka topics on the cluster. We also had a cumbersome process of giving team members access to Kafka machines in staging and production, and we wanted to simplify this.
  • Cost, patching, and support – Because Apache Zookeeper is completely managed and patched by AWS, moving to Amazon MSK was going to save us time and money. In addition, we discovered that running Amazon MSK with the same type of brokers on Amazon Elastic Compute Cloud (Amazon EC2) was cheaper to run on Amazon MSK. Combined with the fact that we get security patches applied on brokers by AWS, migrating to Amazon MSK was an easy decision. This also meant that the team was freed up to work on other important things. Finally, getting enterprise support from AWS was also critical in our final decision to move to a managed solution.

How we migrated to Amazon MSK

With the key drivers identified, we moved ahead with a proposed design to migrate existing self-managed Kafka to Amazon MSK. We conducted the following pre-migration steps before the actual implementation:

  • Assessment:
    • Conducted a meticulous assessment of the existing EC2 Kafka cluster, understanding its configurations and dependencies
    • Verified Kafka version compatibility with Amazon MSK
  • Amazon MSK setup with Terraform
  • Network configuration:
    • Ensured seamless network connectivity between the EC2 Kafka and MSK clusters, fine-tuning security groups and firewall settings

After the pre-migration steps, we implemented the following for the new design:

  • Automated deployment, upgrade, and topic creation pipelines for MSK clusters:
    • In the new setup, we wanted to have automated deployments and upgrades of the MSK clusters in a repeatable fashion using an IaC tool. Therefore, we created custom Terraform modules for MSK cluster deployments as well as upgrades. These modules where called from a Jenkins pipeline for automated deployments and upgrades of the MSK clusters. For Kafka topic creation, we were using an Ansible-based home-grown pipeline, which wasn’t stable and led to a lot of complaints from dev teams. As a result, we evaluated options for deployments to Kubernetes clusters and used the Strimzi Topic Operator to create topics on MSK clusters. Topic creation was automated using Jenkins pipelines, which dev teams could self-service.
  • Better observability for clusters:
    • The old Kafka clusters didn’t have good observability. We only had alerts on Kafka broker disk size. With Amazon MSK, we took advantage of open monitoring using Prometheus. We stood up a standalone Prometheus server that scraped metrics from MSK clusters and sent them to our internal observability tool. As a result of improved observability, we were able to set up robust alerting for Amazon MSK, which wasn’t possible with our old setup.
  • Improved COGS and better compute infrastructure:
    • For our old Kafka infrastructure, we had to pay for managing Kafka, Zookeeper instances, plus any additional broker storage costs and data transfer costs. With the move to Amazon MSK, because Zookeeper is completely managed by AWS, we only have to pay for Kafka nodes, broker storage, and data transfer costs. As a result, in final Amazon MSK setup for production, we saved not only on infrastructure costs but also operational costs.
  • Simplified operations and enhanced security:
    • With the move to Amazon MSK, we didn’t have to manage any Zookeeper instances. Broker security patching was also taken care by AWS for us.
    • Cluster upgrades became simpler with the move to Amazon MSK; it’s a straightforward process to initiate from the Amazon MSK console.
    • With Amazon MSK, we got broker automatic scaling out of the box. As a result, we didn’t have to worry about brokers running out of disk space, thereby leading to additional stability of the MSK cluster.
    • We also got additional security for the cluster because Amazon MSK supports encryption at rest by default, and various options for encryption in transit are also available. For more information, refer to Data protection in Amazon Managed Streaming for Apache Kafka.

During our pre-migration steps, we validated the setup on the staging environment before moving ahead with production.

Kafka topic migration strategy

With the MSK cluster setup complete, we performed a data migration of Kafka topics from the old cluster running on Amazon EC2 to the new MSK cluster. To achieve this, we performed the following steps:

  • Set up MirrorMaker with Terraform – We used Terraform to orchestrate the deployment of a MirrorMaker cluster consisting of 15 nodes. This demonstrated the scalability and flexibility by adjusting the number of nodes based on the migration’s concurrent replication needs.
  • Implement a concurrent replication strategy – We implemented a concurrent replication strategy with 15 MirrorMaker nodes to expedite the migration process. Our Terraform-driven approach contributed to cost optimization by efficiently managing resources during the migration and ensured the reliability and consistency of the MSK and MirrorMaker clusters. It also showcased how the chosen setup accelerates data transfer, optimizing both time and resources.
  • Migrate data – We successfully migrated 2 TB of data in a remarkably short timeframe, minimizing downtime and showcasing the efficiency of the concurrent replication strategy.
  • Set up post-migration monitoring – We implemented robust monitoring and alerting during the migration, contributing to a smooth process by identifying and addressing issues promptly.

The following diagram illustrates the architecture after the topic migration was complete.
Mirror-maker setup

Challenges and lessons learned

Embarking on a migration journey, especially with large datasets, is often accompanied by unforeseen challenges. In this section, we delve into the challenges encountered during the migration of topics from EC2 Kafka to Amazon MSK using MirrorMaker, and share valuable insights and solutions that shaped the success of our migration.

Challenge 1: Offset discrepancies

One of the challenges we encountered was the mismatch in topic offsets between the source and destination clusters, even with offset synchronization enabled in MirrorMaker. The lesson learned here was that offset values don’t necessarily need to be identical, as long as offset sync is enabled, which makes sure the topics have the correct position to read the data from.

We addressed this problem by using a custom tool to run tests on consumer groups, confirming that the translated offsets were either smaller or caught up, indicating synchronization as per MirrorMaker.

Challenge 2: Slow data migration

The migration process faced a bottleneck—data transfer was slower than anticipated, especially with a substantial 2 TB dataset. Despite a 20-node MirrorMaker cluster, the speed was insufficient.

To overcome this, the team strategically grouped MirrorMaker nodes based on unique port numbers. Clusters of five MirrorMaker nodes, each with a distinct port, significantly boosted throughput, allowing us to migrate data within hours instead of days.

Challenge 3: Lack of detailed process documentation

Navigating the uncharted territory of migrating large datasets using MirrorMaker highlighted the absence of detailed documentation for such scenarios.

Through trial and error, the team crafted an IaC module using Terraform. This module streamlined the entire cluster creation process with optimized settings, enabling a seamless start to the migration within minutes.

Final setup and next steps

As a result of the move to Amazon MSK, our final setup after topic migration looked like the following diagram.
MSK Blog
We’re considering the following future improvements:

Conclusion.

In this post, we discussed how VMware Tanzu CloudHealth migrated their existing Amazon EC2-based Kafka infrastructure to Amazon MSK. We walked you through the new architecture, deployment and topic creation pipelines, improvements to observability and access control, topic migration challenges, and the issues we faced with the existing infrastructure, along with how and why we migrated to the new Amazon MSK setup. We also talked about all the advantages that Amazon MSK gave us, the final architecture we achieved with this migration, and lessons learned.

For us, the interplay of offset synchronization, strategic node grouping, and IaC proved pivotal in overcoming obstacles and ensuring a successful migration from Amazon EC2 Kafka to Amazon MSK. This post serves as a testament to the power of adaptability and innovation in migration challenges, offering insights for others navigating a similar path.

If you’re running self-managed Kafka on AWS, we encourage you to try the managed Kafka offering, Amazon MSK.


About the Authors

Rivlin Pereira is Staff DevOps Engineer at VMware Tanzu Division. He is very passionate about Kubernetes and works on CloudHealth Platform building and operating cloud solutions that are scalable, reliable and cost effective.

Vaibhav Pandey, a Staff Software Engineer at Broadcom, is a key contributor to the development of cloud computing solutions. Specializing in architecting and engineering data storage layers, he is passionate about building and scaling SaaS applications for optimal performance.

Raj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Todd McGrath is a data streaming specialist at Amazon Web Services where he advises customers on their streaming strategies, integration, architecture, and solutions. On the personal side, he enjoys watching and supporting his 3 teenagers in their preferred activities as well as following his own pursuits such as fishing, pickleball, ice hockey, and happy hour with friends and family on pontoon boats. Connect with him on LinkedIn.

Satya Pattanaik is a Sr. Solutions Architect at AWS. He has been helping ISVs build scalable and resilient applications on AWS Cloud. Prior joining AWS, he played significant role in Enterprise segments with their growth and success. Outside of work, he spends time learning “how to cook a flavorful BBQ” and trying out new recipes.

Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/best-practices-to-implement-near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-msk/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, straightforward, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the most widely used cloud data warehouse. You can run and scale analytics in seconds on all your data, without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift Streaming Ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use Structured Query Language (SQL) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss the best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK.

Overview of solution

We walk through an example pipeline to ingest data from an MSK topic into Amazon Redshift using Amazon Redshift streaming ingestion. We also show how to unnest JSON data using dot notation in Amazon Redshift. The following diagram illustrates our solution architecture.

The process flow consists of the following steps:

  1. Create a streaming materialized view in your Redshift cluster to consume live streaming data from the MSK topics.
  2. Use a stored procedure to implement change data capture (CDC) using the unique combination of Kafka Partition and Kafka Offset at the record level for the ingested MSK topic.
  3. Create a user-facing table in the Redshift cluster and use dot notation to unnest the JSON document from the streaming materialized view into data columns of the table. You can continuously load fresh data by calling the stored procedure at regular intervals.
  4. Establish connectivity between an Amazon QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

As part of this post, we also discuss the following topics:

  • Steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift
  • Best practices to achieve optimized performance from streaming materialized views
  • Monitoring techniques to track failures in Amazon Redshift streaming ingestion

Prerequisites

You must have the following:

Considerations while setting up your MSK topic

Keep in mind the following considerations when configuring your MSK topic:

  • Make sure that the name of your MSK topic is no longer than 128 characters.
  • As of this writing, MSK records containing compressed data can’t be directly queried in Amazon Redshift. Amazon Redshift doesn’t support any native decompression methods for client-side compressed data in an MSK topic.
  • Follow best practices while setting up your MSK cluster.
  • Review the streaming ingestion limitations for any other considerations.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to the Setting up IAM and performing streaming ingestion from Kafka.
  2. Make sure that data is flowing into your MSK topic using Amazon CloudWatch metrics (for example, BytesOutPerSec).
  3. Launch the query editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Redshift cluster for the next steps. The following steps were run in query editor v2.
  4. Create an external schema to map to the MSK cluster. Replace your IAM role ARN and the MSK cluster ARN in the following statement:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Optionally, if your topic names are case sensitive, you need to enable enable_case_sensitive_identifier to be able to access them in Amazon Redshift. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Create a materialized view to consume the stream data from the MSK topic:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

The metadata column kafka_value that arrives from Amazon MSK is stored in VARBYTE format in Amazon Redshift. For this post, you use the JSON_PARSE function to convert kafka_value to a SUPER data type. You also use the CAN_JSON_PARSE function in the filter condition to skip invalid JSON records and guard against errors due to JSON parsing failures. We discuss how to store the invalid data for future debugging later in this post.

  1. Refresh the streaming materialized view, which triggers Amazon Redshift to read from the MSK topic and load data into the materialized view:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the MSK topic to the Data column of SUPER type in the streaming materialized view Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Use dot notation as shown in the following code to unnest your JSON payload:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

The following screenshot shows what the result looks like after unnesting.

If you have arrays in your JSON document, consider unnesting your data using PartiQL statements in Amazon Redshift. For more information, refer to the section Unnest the JSON document in the post Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB.

Incremental data load strategy

Complete the following steps to implement an incremental data load:

  1. Create a table called Orders in Amazon Redshift, which end-users will use for visualization and business analysis:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Next, you create a stored procedure called SP_Orders_Load to implement CDC from a streaming materialized view and load into the final Orders table. You use the combination of Kafka_Partition and Kafka_Offset available in the streaming materialized view as system columns to implement CDC. The combination of these two columns will always be unique within an MSK topic, which makes sure that none of the records are missed during the process. The stored procedure contains the following components:

  • To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level.
  • Refresh the streaming materialized view manually if auto refresh is not enabled.
  • Create an audit table called Orders_Streaming_Audit if it doesn’t exist to keep track of the last offset for a partition that was loaded into Orders table during the last run of the stored procedure.
  • Unnest and insert only new or changed data into a staging table called Orders_Staging_Table, reading from the streaming materialized view Orders_Stream_MV, where Kafka_Offset is greater than the last processed Kafka_Offset recorded in the audit table Orders_Streaming_Audit for the Kafka_Partition being processed.
  • When loading for the first time using this stored procedure, there will be no data in the Orders_Streaming_Audit table and all the data from Orders_Stream_MV will get loaded into the Orders table.
  • Insert only business-relevant columns to the user-facing Orders table, selecting from the staging table Orders_Staging_Table.
  • Insert the max Kafka_Offset for every loaded Kafka_Partition into the audit table Orders_Streaming_Audit

We have added the intermediate staging table Orders_Staging_Table in this solution to help with the debugging in case of unexpected failures and trackability. Skipping the staging step and directly loading into the final table from Orders_Stream_MV can provide lower latency depending on your use case.

  1. Create the stored procedure with the following code:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Run the stored procedure to load data into the Orders table:
    call SP_Orders_Load();

  3. Validate data in the Orders table.

Establish cross-account streaming ingestion

If your MSK cluster belongs to a different account, complete the following steps to create IAM roles to set up cross-account streaming ingestion. Let’s assume the Redshift cluster is in account A and the MSK cluster is in account B, as shown in the following diagram.

Complete the following steps:

  1. In account B, create an IAM role called MyRedshiftMSKRole that allows Amazon Redshift (account A) to communicate with the MSK cluster (account B) named MyTestCluster. Depending on whether your MSK cluster uses IAM authentication or unauthenticated access to connect, you need to create an IAM role with one of the following policies:
    • An IAM policAmazonAmazon MSK using unauthenticated access:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • An IAM policy for Amazon MSK when using IAM authentication:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

The resource section in the preceding example gives access to all topics in the MyTestCluster MSK cluster. If you need to restrict the IAM role to specific topics, you need to replace the topic resource with a more restrictive resource policy.

  1. After you create the IAM role in account B, take note of the IAM role ARN (for example, arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. In account A, create a Redshift customizable IAM role called MyRedshiftRole, that Amazon Redshift will assume when connecting to Amazon MSK. The role should have a policy like the following, which allows the Amazon Redshift IAM Role in account A to assume the Amazon MSK role in account B:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Take note of the role ARN for the Amazon Redshift IAM role (for example, arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Go back to account B and add this role in the trust policy of the IAM role arn:aws:iam::0123456789:role/MyRedshiftMSKRole to allow account B to trust the IAM role from account A. The trust policy should look like the following code:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Sign in to the Amazon Redshift console as account A.
  6. Launch the query editor v2 or your preferred SQL client and run the following statements to access the MSK topic in account B. To map to the MSK cluster, create an external schema using role chaining by specifying IAM role ARNs, separated by a comma without any spaces around it. The role attached to the Redshift cluster comes first in the chain.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Performance considerations

Keep in mind the following performance considerations:

  • Keep the streaming materialized view simple and move transformations like unnesting, aggregation, and case expressions to a later step—for example, by creating another materialized view on top of the streaming materialized view.
  • Consider creating only one streaming materialized view in a single Redshift cluster or workgroup for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic. Live streaming data in a streaming materialized view can be shared across multiple Redshift clusters or Redshift Serverless workgroups using data sharing.
  • While defining your streaming materialized view, avoid using Json_Extract_Path_Text to pre-shred data, because Json_extract_path_text operates on the data row by row, which significantly impacts ingestion throughput. It is preferable to land the data as is from the stream and then shred it later.
  • Where possible, consider skipping the sort key in the streaming materialized view to accelerate the ingestion speed. When a streaming materialized view has a sort key, a sort operation will occur with every batch of ingested data from the stream. Sorting has a performance overheard depending on the sort key data type, number of sort key columns, and amount of data ingested in each batch. This sorting step can increase the latency before the streaming data is available to query. You should weigh which is more important: latency on ingestion or latency on querying the data.
  • For optimized performance of the streaming materialized view and to reduce storage usage, occasionally purge data from the materialized view using delete, truncate, or alter table append.
  • If you need to ingest multiple MSK topics in parallel into Amazon Redshift, start with a smaller number of streaming materialized views and keep adding more materialized views to evaluate the overall ingestion performance within a cluster or workgroup.
  • Increasing the number of nodes in a Redshift provisioned cluster or the base RPU of a Redshift Serverless workgroup can help boost the ingestion performance of a streaming materialized view. For optimal performance, you should aim to have as many slices in your Redshift provisioned cluster as there are partitions in your MSK topic, or 8 RPU for every four partitions in your MSK topic.

Monitoring techniques

Records in the topic that exceed the size of the target materialized view column at the time of ingestion will be skipped. Records that are skipped by the materialized view refresh will be logged in the SYS_STREAM_SCAN_ERRORS system table.

Errors that occur when processing a record due to a calculation or a data type conversion or some other logic in the materialized view definition will result in the materialized view refresh failure until the offending record has expired from the topic. To avoid these types of issues, test the logic of your materialized view definition carefully; otherwise, land the records into the default VARBYTE column and process them later.

The following are available monitoring views:

  • SYS_MV_REFRESH_HISTORY – Use this view to gather information about the refresh history of your streaming materialized views. The results include the refresh type, such as manual or auto, and the status of the most recent refresh. The following query shows the refresh history for a streaming materialized view:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Use this view to check the reason why a record failed to load via streaming ingestion from an MSK topic. As of writing this post, when ingesting from Amazon MSK, this view only logs errors when the record is larger than the materialized view column size. This view will also show the unique identifier (offset) of the MSK record in the position column. The following query shows the error code and error reason when a record exceeded the maximum size limit:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Use this view to monitor the number of records scanned at a given record_time. This view also tracks the offset of the last record read in the batch. The following query shows topic data for a specific materialized view:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Use this view to check the overall metrics for a streaming materialized view refresh. This will also log errors in the error_message column for errors that don’t show up in SYS_STREAM_SCAN_ERRORS. The following query shows the error causing the refresh failure of a streaming materialized view:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Additional considerations for implementation

You have the choice to optionally generate a materialized view on top of a streaming materialized view, allowing you to unnest and precompute results for end-users. This approach eliminates the need to store the results in a final table using a stored procedure.

In this post, you use the CAN_JSON_PARSE function to guard against any errors to more successfully ingest data—in this case, the streaming records that can’t be parsed are skipped by Amazon Redshift. However, if you want to keep track of your error records, consider storing them in a column using the following SQL when creating the streaming materialized view:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

You can also consider unloading data from the view SYS_STREAM_SCAN_ERRORS into an Amazon Simple Storage Service (Amazon S3) bucket and get alerts by sending a report via email using Amazon Simple Notification Service (Amazon SNS) notifications whenever a new S3 object is created.

Lastly, based on your data freshness requirement, you can use Amazon EventBridge to schedule the jobs in your data warehouse to call the aforementioned SP_Orders_Load stored procedure on a regular basis. EventBridge does this at fixed intervals, and you may need to have a mechanism (for example, an AWS Step Functions state machine) to monitor if the previous call to the procedure completed. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. You can also refer to Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API. Another option is to use Amazon Redshift query editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

Conclusion

In this post, we discussed best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK. We showed you an example pipeline to ingest data from an MSK topic into Amazon Redshift using streaming ingestion. We also showed a reliable strategy to perform incremental streaming data load into Amazon Redshift using Kafka Partition and Kafka Offset. Additionally, we demonstrated the steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift and discussed performance considerations for optimized ingestion rate. Lastly, we discussed monitoring techniques to track failures in Amazon Redshift streaming ingestion.

If you have any questions, leave them in the comments section.


About the Authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Adekunle Adedotun is a Sr. Database Engineer with Amazon Redshift service. He has been working on MPP databases for 6 years with a focus on performance tuning. He also provides guidance to the development team for new and existing service features.

Simplify data streaming ingestion for analytics using Amazon MSK and Amazon Redshift

Post Syndicated from Sebastian Vlad original https://aws.amazon.com/blogs/big-data/simplify-data-streaming-ingestion-for-analytics-using-amazon-msk-and-amazon-redshift/

Towards the end of 2022, AWS announced the general availability of real-time streaming ingestion to Amazon Redshift for Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), eliminating the need to stage streaming data in Amazon Simple Storage Service (Amazon S3) before ingesting it into Amazon Redshift.

Streaming ingestion from Amazon MSK into Amazon Redshift, represents a cutting-edge approach to real-time data processing and analysis. Amazon MSK serves as a highly scalable, and fully managed service for Apache Kafka, allowing for seamless collection and processing of vast streams of data. Integrating streaming data into Amazon Redshift brings immense value by enabling organizations to harness the potential of real-time analytics and data-driven decision-making.

This integration enables you to achieve low latency, measured in seconds, while ingesting hundreds of megabytes of streaming data per second into Amazon Redshift. At the same time, this integration helps make sure that the most up-to-date information is readily available for analysis. Because the integration doesn’t require staging data in Amazon S3, Amazon Redshift can ingest streaming data at a lower latency and without intermediary storage cost.

You can configure Amazon Redshift streaming ingestion on a Redshift cluster using SQL statements to authenticate and connect to an MSK topic. This solution is an excellent option for data engineers that are looking to simplify data pipelines and reduce the operational cost.

In this post, we provide a complete overview on how to configure Amazon Redshift streaming ingestion from Amazon MSK.

Solution overview

The following architecture diagram describes the AWS services and features you will be using.

architecture diagram describing the AWS services and features you will be using

The workflow includes the following steps:

  1. You start with configuring an Amazon MSK Connect source connector, to create an MSK topic, generate mock data, and write it to the MSK topic. For this post, we work with mock customer data.
  2. The next step is to connect to a Redshift cluster using the Query Editor v2.
  3. Finally, you configure an external schema and create a materialized view in Amazon Redshift, to consume the data from the MSK topic. This solution does not rely on an MSK Connect sink connector to export the data from Amazon MSK to Amazon Redshift.

The following solution architecture diagram describes in more detail the configuration and integration of the AWS services you will be using.
solution architecture diagram describing in more detail the configuration and integration of the AWS services you will be using
The workflow includes the following steps:

  1. You deploy an MSK Connect source connector, an MSK cluster, and a Redshift cluster within the private subnets on a VPC.
  2. The MSK Connect source connector uses granular permissions defined in an AWS Identity and Access Management (IAM) in-line policy attached to an IAM role, which allows the source connector to perform actions on the MSK cluster.
  3. The MSK Connect source connector logs are captured and sent to an Amazon CloudWatch log group.
  4. The MSK cluster uses a custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  5. The MSK cluster logs are captured and sent to an Amazon CloudWatch log group.
  6. The Redshift cluster uses granular permissions defined in an IAM in-line policy attached to an IAM role, which allows the Redshift cluster to perform actions on the MSK cluster.
  7. You can use the Query Editor v2 to connect to the Redshift cluster.

Prerequisites

To simplify the provisioning and configuration of the prerequisite resources, you can use the following AWS CloudFormation template:

Complete the following steps when launching the stack:

  1. For Stack name, enter a meaningful name for the stack, for example, prerequisites.
  2. Choose Next.
  3. Choose Next.
  4. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Submit.

The CloudFormation stack creates the following resources:

  • A VPC custom-vpc, created across three Availability Zones, with three public subnets and three private subnets:
    • The public subnets are associated with a public route table, and outbound traffic is directed to an internet gateway.
    • The private subnets are associated with a private route table, and outbound traffic is sent to a NAT gateway.
  • An internet gateway attached to the Amazon VPC.
  • A NAT gateway that is associated with an elastic IP and is deployed in one of the public subnets.
  • Three security groups:
    • msk-connect-sg, which will be later associated with the MSK Connect connector.
    • redshift-sg, which will be later associated with the Redshift cluster.
    • msk-cluster-sg, which will be later associated with the MSK cluster. It allows inbound traffic from msk-connect-sg, and redshift-sg.
  • Two CloudWatch log groups:
    • msk-connect-logs, to be used for the MSK Connect logs.
    • msk-cluster-logs, to be used for the MSK cluster logs.
  • Two IAM Roles:
    • msk-connect-role, which includes granular IAM permissions for MSK Connect.
    • redshift-role, which includes granular IAM permissions for Amazon Redshift.
  • A custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  • An MSK cluster, with three brokers deployed across the three private subnets of custom-vpc. The msk-cluster-sg security group and the custom-msk-cluster-configuration configuration are applied to the MSK cluster. The broker logs are delivered to the msk-cluster-logs CloudWatch log group.
  • A Redshift cluster subnet group, which is using the three private subnets of custom-vpc.
  • A Redshift cluster, with one single node deployed in a private subnet within the Redshift cluster subnet group. The redshift-sg security group and redshift-role IAM role are applied to the Redshift cluster.

Create an MSK Connect custom plugin

For this post, we use an Amazon MSK data generator deployed in MSK Connect, to generate mock customer data, and write it to an MSK topic.

Complete the following steps:

  1. Download the Amazon MSK data generator JAR file with dependencies from GitHub.
    awslabs github page for downloading the jar file of the amazon msk data generator
  2. Upload the JAR file into an S3 bucket in your AWS account.
    amazon s3 console image showing the uploaded jar file in an s3 bucket
  3. On the Amazon MSK console, choose Custom plugins under MSK Connect in the navigation pane.
  4. Choose Create custom plugin.
  5. Choose Browse S3, search for the Amazon MSK data generator JAR file you uploaded to Amazon S3, then choose Choose.
  6. For Custom plugin name, enter msk-datagen-plugin.
  7. Choose Create custom plugin.

When the custom plugin is created, you will see that its status is Active, and you can move to the next step.
amazon msk console showing the msk connect custom plugin being successfully created

Create an MSK Connect connector

Complete the following steps to create your connector:

  1. On the Amazon MSK console, choose Connectors under MSK Connect in the navigation pane.
  2. Choose Create connector.
  3. For Custom plugin type, choose Use existing plugin.
  4. Select msk-datagen-plugin, then choose Next.
  5. For Connector name, enter msk-datagen-connector.
  6. For Cluster type, choose Self-managed Apache Kafka cluster.
  7. For VPC, choose custom-vpc.
  8. For Subnet 1, choose the private subnet within your first Availability Zone.

For the custom-vpc created by the CloudFormation template, we are using odd CIDR ranges for public subnets, and even CIDR ranges for the private subnets:

    • The CIDRs for the public subnets are 10.10.1.0/24, 10.10.3.0/24, and 10.10.5.0/24
    • The CIDRs for the private subnets are 10.10.2.0/24, 10.10.4.0/24, and 10.10.6.0/24
  1. For Subnet 2, select the private subnet within your second Availability Zone.
  2. For Subnet 3, select the private subnet within your third Availability Zone.
  3. For Bootstrap servers, enter the list of bootstrap servers for TLS authentication of your MSK cluster.

To retrieve the bootstrap servers for your MSK cluster, navigate to the Amazon MSK console, choose Clusters, choose msk-cluster, then choose View client information. Copy the TLS values for the bootstrap servers.

  1. For Security groups, choose Use specific security groups with access to this cluster, and choose msk-connect-sg.
  2. For Connector configuration, replace the default settings with the following:
connector.class=com.amazonaws.mskdatagen.GeneratorSourceConnector
tasks.max=2
genkp.customer.with=#{Code.isbn10}
genv.customer.name.with=#{Name.full_name}
genv.customer.gender.with=#{Demographic.sex}
genv.customer.favorite_beer.with=#{Beer.name}
genv.customer.state.with=#{Address.state}
genkp.order.with=#{Code.isbn10}
genv.order.product_id.with=#{number.number_between '101','109'}
genv.order.quantity.with=#{number.number_between '1','5'}
genv.order.customer_id.matching=customer.key
global.throttle.ms=2000
global.history.records.max=1000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
  1. For Connector capacity, choose Provisioned.
  2. For MCU count per worker, choose 1.
  3. For Number of workers, choose 1.
  4. For Worker configuration, choose Use the MSK default configuration.
  5. For Access permissions, choose msk-connect-role.
  6. Choose Next.
  7. For Encryption, select TLS encrypted traffic.
  8. Choose Next.
  9. For Log delivery, choose Deliver to Amazon CloudWatch Logs.
  10. Choose Browse, select msk-connect-logs, and choose Choose.
  11. Choose Next.
  12. Review and choose Create connector.

After the custom connector is created, you will see that its status is Running, and you can move to the next step.
amazon msk console showing the msk connect connector being successfully created

Configure Amazon Redshift streaming ingestion for Amazon MSK

Complete the following steps to set up streaming ingestion:

  1. Connect to your Redshift cluster using Query Editor v2, and authenticate with the database user name awsuser, and password Awsuser123.
  2. Create an external schema from Amazon MSK using the following SQL statement.

In the following code, enter the values for the redshift-role IAM role, and the msk-cluster cluster ARN.

CREATE EXTERNAL SCHEMA msk_external_schema
FROM MSK
IAM_ROLE '<insert your redshift-role arn>'
AUTHENTICATION iam
CLUSTER_ARN '<insert your msk-cluster arn>';
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create an external schema from amazon msk

  1. Create a materialized view using the following SQL statement:
CREATE MATERIALIZED VIEW msk_mview AUTO REFRESH YES AS
SELECT
    "kafka_partition",
    "kafka_offset",
    "kafka_timestamp_type",
    "kafka_timestamp",
    "kafka_key",
    JSON_PARSE(kafka_value) as Data,
    "kafka_headers"
FROM
    "dev"."msk_external_schema"."customer"
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create a materialized view

  1. You can now query the materialized view using the following SQL statement:
select * from msk_mview LIMIT 100;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the materialized view

  1. To monitor the progress of records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_STATES monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_STATES;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the sys stream scan states monitoring view

  1. To monitor errors encountered on records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_ERRORS monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_ERRORS;
  1. Choose Run to run the SQL statement.redshift query editor v2 showing the SQL statement used to query the sys stream scan errors monitoring view

Clean up

After following along, if you no longer need the resources you created, delete them in the following order to prevent incurring additional charges:

  1. Delete the MSK Connect connector msk-datagen-connector.
  2. Delete the MSK Connect plugin msk-datagen-plugin.
  3. Delete the Amazon MSK data generator JAR file you downloaded, and delete the S3 bucket you created.
  4. After you delete your MSK Connect connector, you can delete the CloudFormation template. All the resources created by the CloudFormation template will be automatically deleted from your AWS account.

Conclusion

In this post, we demonstrated how to configure Amazon Redshift streaming ingestion from Amazon MSK, with a focus on privacy and security.

The combination of the ability of Amazon MSK to handle high throughput data streams with the robust analytical capabilities of Amazon Redshift empowers business to derive actionable insights promptly. This real-time data integration enhances the agility and responsiveness of organizations in understanding changing data trends, customer behaviors, and operational patterns. It allows for timely and informed decision-making, thereby gaining a competitive edge in today’s dynamic business landscape.

This solution is also applicable for customers that are looking to use Amazon MSK Serverless and Amazon Redshift Serverless.

We hope this post was a good opportunity to learn more about AWS service integration and configuration. Let us know your feedback in the comments section.


About the authors

Sebastian Vlad is a Senior Partner Solutions Architect with Amazon Web Services, with a passion for data and analytics solutions and customer success. Sebastian works with enterprise customers to help them design and build modern, secure, and scalable solutions to achieve their business outcomes.

Sharad Pai is a Lead Technical Consultant at AWS. He specializes in streaming analytics and helps customers build scalable solutions using Amazon MSK and Amazon Kinesis. He has over 16 years of industry experience and is currently working with media customers who are hosting live streaming platforms on AWS, managing peak concurrency of over 50 million. Prior to joining AWS, Sharad’s career as a lead software developer included 9 years of coding, working with open source technologies like JavaScript, Python, and PHP.

Secure connectivity patterns for Amazon MSK Serverless cross-account access

Post Syndicated from Tamer Soliman original https://aws.amazon.com/blogs/big-data/secure-connectivity-patterns-for-amazon-msk-serverless-cross-account-access/

Amazon MSK Serverless is a cluster type of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that makes it straightforward for you to run Apache Kafka without having to manage and scale cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain on a usage basis.

Deploying infrastructure across multiple VPCs and multiple accounts is considered best practice, facilitating scalability while maintaining isolation boundaries. In a multi-account environment, Kafka producers and consumers can exist within the same VPC—however, they are often located in different VPCs, sometimes within the same account, in a different account, or even in multiple different accounts. There is a need for a solution that can extend access to MSK Serverless clusters to producers and consumers from multiple VPCs within the same AWS account and across multiple AWS accounts. The solution needs to be scalable and straightforward to maintain.

In this post, we walk you through multiple solution approaches that address the MSK Serverless cross-VPC and cross-account access connectivity options, and we discuss the advantages and limitations of each approach.

MSK Serverless connectivity and authentication

When an MSK Serverless cluster is created, AWS manages the cluster infrastructure on your behalf and extends private connectivity back to your VPCs through VPC endpoints powered by AWS PrivateLink. You bootstrap your connection to the cluster through a bootstrap server that holds a record of all your underlying brokers.

At creation, a fully qualified domain name (FQDN) is assigned to your cluster bootstrap server. The bootstrap server FQDN has the general format of boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and your cluster brokers follow the format of bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, where ClusterUniqueID.xx is unique to your cluster and bxxxx is a dynamic broker range (b0001, b0037, and b0523 can be some of your assigned brokers at a point of time). It’s worth noting that the brokers assigned to your cluster are dynamic and change over time, but your bootstrap address remains the same for the cluster. All your communication with the cluster starts with the bootstrap server that can respond with the list of active brokers when required. For proper Kafka communication, your MSK client needs to be able to resolve the domain names of your bootstrap server as well as all your brokers.

At cluster creation, you specify the VPCs that you would like the cluster to communicate with (up to five VPCs in the same account as your cluster). For each VPC specified during cluster creation, cluster VPC endpoints are created along with a private hosted zone that includes a list of your bootstrap server and all dynamic brokers kept up to date. The private hosted zones facilitate resolving the FQDNs of your bootstrap server and brokers, from within the associated VPCs defined during cluster creation, to the respective VPC endpoints for each.

Cross-account access

To be able to extend private connectivity of your Kafka producers and consumers to your MSK Serverless cluster, you need to consider three main aspects: private connectivity, authentication and authorization, and DNS resolution.

The following diagram highlights the possible connectivity options. Although the diagram shows them all here for demonstration purposes, in most cases, you would use one or more of these options depending on your architecture, not necessary all in the same setup.

MSK cross account connectivity options

In this section, we discuss the different connectivity options along with their pros and cons. We also cover the authentication and DNS resolution aspects associated with the relevant connectivity options.

Private connectivity layer

This is the underlying private network connectivity. You can achieve this connectivity using VPC peering, AWS Transit Gateway, or PrivateLink, as indicated in the preceding diagram. VPC peering simplifies the setup, but it lacks the support for transitive routing. In most cases, peering is used when you have a limited number of VPCs or if your VPCs generally communicate with some limited number of core services VPCs without the need of lateral connectivity or transitive routing. On the other hand, AWS Transit Gateway facilitates transitive routing and can simplify the architecture when you have a large number of VPCs, and especially when lateral connectivity is required. PrivateLink is more suited for extending connectivity to a specific resource unidirectionally across VPCs or accounts without exposing full VPC-to-VPC connectivity, thereby adding a layer of isolation. PrivateLink is useful if you have overlapping CIDRs, which is a case that is not supported by Transit Gateway or VPC peering. PrivateLink is also useful when your connected parties are administrated separately, and when one-way connectivity and isolation are required.

If you choose PrivateLink as a connectivity option, you need to use a Network Load Balancer (NLB) with an IP type target group with its registered targets set as the IP addresses of the zonal endpoints of your MSK Serverless cluster.

Cluster authentication and authorization

In addition to having private connectivity and being able to resolve the bootstrap server and brokers domain names, for your producers and consumers to have access to your cluster, you need to configure your clients with proper credentials. MSK Serverless supports AWS Identity and Access Management (IAM) authentication and authorization. For cross-account access, your MSK client needs to assume a role that has proper credentials to access the cluster. This post focuses mainly on the cross-account connectivity and name resolution aspects. For more details on cross-account authentication and authorization, refer to the following GitHub repo.

DNS resolution

For Kafka producers and consumers located in accounts across the organization to be able to produce and consume to and from the centralized MSK Serverless cluster, they need to be able to resolve the FQDNs of the cluster bootstrap server as well as each of the cluster brokers. Understanding the dynamic nature of broker allocation, the solution will have to accommodate such a requirement. In the next section, we address how we can satisfy this part of the requirements.

Cluster cross-account DNS resolution

Now that we have discussed how MSK Serverless works, how private connectivity is extended, and the authentication and authorization requirements, let’s discuss how DNS resolution works for your cluster.

For every VPC associated with your cluster during cluster creation, a VPC endpoint is created along with a private hosted zone. Private hosted zones enable name resolve of the FQDNs of the cluster bootstrap server and the dynamically allocated brokers, from within each respective VPC. This works well when requests come from within any of the VPCs that were added during cluster creation because they already have the required VPC endpoints and relevant private hosted zones.

Let’s discuss how you can extend name resolution to other VPCs within the same account that were not included during cluster creation, and to others that may be located in other accounts.

You’ve already made your choice of the private connectivity option that best fits your architecture requirements, be it VPC peering, PrivateLink, or Transit Gateway. Assuming that you have also configured your MSK clients to assume roles that have the right IAM credentials in order to facilitate cluster access, you now need to address the name resolution aspect of connectivity. It’s worth noting that, although we list different connectivity options using VPC peering, Transit Gateway, and PrivateLink, in most cases only one or two of these connectivity options are present. You don’t necessarily need to have them all; they are listed here to demonstrate your options, and you are free to choose the ones that best fit your architecture and requirements.

In the following sections, we describe two different methods to address DNS resolution. For each method, there are advantages and limitations.

Private hosted zones

The following diagram highlights the solution architecture and its components. Note that, to simplify the diagram, and to make room for more relevant details required in this section, we have eliminated some of the connectivity options.

Cross-account access using Private Hosted Zones

The solution starts with creating a private hosted zone, followed by creating a VPC association.

Create a private hosted zone

We start by creating a private hosted zone for name resolution. To make the solution scalable and straightforward to maintain, you can choose to create this private hosted zone in the same MSK Serverless cluster account; in some cases, creating the private hosted zone in a centralized networking account is preferred. Having the private hosted zone created in the MSK Serverless cluster account facilitates centralized management of the private hosted zone alongside the MSK cluster. We can then associate the centralized private hosted zone with VPCs within the same account, or in different other accounts. Choosing to centralize your private hosted zones in a networking account can also be a viable solution to consider.

The purpose of the private hosted zone is to be able to resolve the FQDNs of the bootstrap server as well as all the dynamically assigned cluster-associated brokers. As discussed earlier, the bootstrap server FQDN format is boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and the cluster brokers use the format bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, with bxxxx being the broker ID. You need to create the new private hosted zone with the primary domain set as kafka-serverless.Region.amazonaws.com, with an A-Alias record called *.kafka-serverless.Region.amazonaws.com pointing to the Regional VPC endpoint of the MSK Serverless cluster in the MSK cluster VPC. This should be sufficient to direct all traffic targeting your cluster to the primary cluster VPC endpoints that you specified in your private hosted zone.

Now that you have created the private hosted zone, for name resolution to work, you need to associate the private hosted zone with every VPC where you have clients for the MSK cluster (producer or consumer).

Associate a private hosted zone with VPCs in the same account

For VPCs that are in the same account as the MSK cluster and weren’t included in the configuration during cluster creation, you can associate them to the private hosted zone created using the AWS Management Console by editing the private hosted zone settings and adding the respective VPCs. For more information, refer to Associating more VPCs with a private hosted zone.

Associate a private hosted zone in cross-account VPCs

For VPCs that are in a different account other than the MSK cluster account, refer to Associating an Amazon VPC and a private hosted zone that you created with different AWS accounts. The key steps are as follows:

  1. Create a VPC association authorization in the account where the private hosted zone is created (in this case, it’s the same account as the MSK Serverless cluster account) to authorize the remote VPCs to be associated with the hosted zone:
aws route53 create-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID
  1. Associate the VPC with the private hosted zone in the account where you have the VPCs with the MSK clients (remote account), referencing the association authorization you created earlier:
aws route53 list-vpc-association-authorizations --hosted-zone-id HostedZoneID
aws route53 associate-vpc-with-hosted-zone --hosted-zone-id HostedZoneID --VPC VPCRegion=Region,VPCId=vpc-ID
  1. Delete the VPC authorization to associate the VPC with the hosted zone:
aws route53 delete-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID

Deleting the authorization doesn’t affect the association, it just prevents you from re-associating the VPC with the hosted zone in the future. If you want to re-associate the VPC with the hosted zone, you’ll need to repeat steps 1 and 2 of this procedure.

Note that your VPC needs to have the enableDnsSupport and enableDnsHostnames DNS attributes enabled for this to work. These two settings can be configured under the VPC DNS settings. For more information, refer to DNS attributes in your VPC.

These procedures work well for all remote accounts when connectivity is extended using VPC peering or Transit Gateway. If your connectivity option uses PrivateLink, the private hosted zone needs to be created in the remote account instead (the account where the PrivateLink VPC endpoints are). In addition, an A-Alias record that resolves to the PrivateLink endpoint instead of the MSK cluster endpoint needs to be created as indicated in the earlier diagram. This will facilitate name resolution to the PrivateLink endpoint. If other VPCs need access to the cluster through that same PrivateLink setup, you need to follow the same private hosted zone association procedure as described earlier and associate your other VPCs with the private hosted zone created for your PrivateLink VPC.

Limitations

The private hosted zones solution has some key limitations.

Firstly, because you’re using kafka-serverless.Region.amazonaws.com as the primary domain for our private hosted zone, and your A-Alias record uses *.kafka-serverless.Region.amazonaws.com, all traffic to the MSK Serverless service originating from any VPC associated with this private hosted zone will be directed to the one specific cluster VPC Regional endpoint that you specified in the hosted zone A-Alias record.

This solution is valid if you have one MSK Serverless cluster in your centralized service VPC. If you need to provide access to multiple MSK Serverless clusters, you can use the same solution but adapt a distributed private hosted zone approach as opposed to a centralized approach. In a distributed private hosted zone approach, each private hosted zone can point to a specific cluster. The VPCs associated with that specific private hosted zone will communicate only to the respective cluster listed under the specific private hosted zone.

In addition, after you establish a VPC association with a private hosted zone resolving *.kafka-serverless.Region.amazonaws.com, the respective VPC will only be able to communicate with the cluster defined in that specific private hosted zone and no other cluster. An exception to this rule is if a local cluster is created within the same client VPC, in which case the clients within the VPC will only be able to communicate with only the local cluster.

You can also use PrivateLink to accommodate multiple clusters by creating a PrivateLink plus private hosted zone per cluster, replicating the configuration steps described earlier.

Both solutions, using distributed private hosted zones or PrivateLink, are still subject to the limitation that for each client VPC, you can only communicate with the one MSK Serverless cluster that your associated private hosted zone is configured for.

In the next section, we discuss another possible solution.

Resolver rules and AWS Resource Access Manager

The following diagram shows a high-level overview of the solution using Amazon Route 53 resolver rules and AWS Resource Access Manager.

Cross-account access using Resolver Rules and Resolver Endpoints

The solution starts with creating Route 53 inbound and outbound resolver endpoints, which are associated with the MSK cluster VPC. Then you create a resolver forwarding rule in the MSK account that is not associated with any VPC. Next, you share the resolver rule across accounts using Resource Access Manager. At the remote account where you need to extend name resolution to, you need to accept the resource share and associate the resolver rules with your target VPCs located in the remote account (the account where the MSK clients are located).

For more information about this approach, refer to the third use case in Simplify DNS management in a multi-account environment with Route 53 Resolver.

This solution accommodates multiple centralized MSK serverless clusters in a more scalable and flexible approach. Therefore, the solution counts on directing DNS requests to be resolved by the VPC where the MSK clusters are. Multiple MSK Serverless clusters can coexist, where clients in a particular VPC can communicate with one or more of them at the same time. This option is not supported with the private hosted zone solution approach.

Limitations

Although this solution has its advantages, it also has a few limitations.

Firstly, for a particular target consumer or producer account, all your MSK Serverless clusters need to be in the same core service VPC in the MSK account. This is due to the fact that the resolver rule is set on an account level and uses.kafka-serverless.Region.amazonaws.com as the primary domain, directing its resolution to one specific VPC resolver endpoint inbound/outbound pair within that service VPC. If you need to have separate clusters in different VPCs, consider creating separate accounts.

The second limitation is that all your client VPCs need to be in the same Region as your core MSK Serverless VPC. The reason behind this limitation is that resolver rules pointing to a resolver endpoint pair (in reality, they point to the outbound endpoint that loops into the inbound endpoints) need to be in the same Region as the resolver rules, and Resource Access Manager will extend the share only within the same Region. However, this solution is good when you have multiple MSK clusters in the same core VPC, and although your remote clients are in different VPCs and accounts, they are still within the same Region. A workaround for this limitation is to duplicate the creation of resolver rules and outbound resolver endpoint in a second Region, where the outbound endpoint loops back through the original first Region inbound resolver endpoint associated with the MSK Serverless cluster VPC (assuming IP connectivity is facilitated). This second Region resolver rule can then be shared using Resource Access Manager within the second Region.

Conclusion

You can configure MSK Serverless cross-VPC and cross-account access in multi-account environments using private hosted zones or Route 53 resolver rules. The solution discussed in this post allows you to centralize your configuration while extending cross-account access, making it a scalable and straightforward-to-maintain solution. You can create your MSK Serverless clusters with cross-account access for producers and consumers, keep your focus on your business outcomes, and gain insights from sources of data across your organization without having to right-size and manage a Kafka infrastructure.


About the Author

Tamer Soliman is a Senior Solutions Architect at AWS. He helps Independent Software Vendor (ISV) customers innovate, build, and scale on AWS. He has over two decades of industry experience, and is an inventor with three granted patents. His experience spans multiple technology domains including telecom, networking, application integration, data analytics, AI/ML, and cloud deployments. He specializes in AWS Networking and has a profound passion for machine leaning, AI, and Generative AI.

How Gupshup built their multi-tenant messaging analytics platform on Amazon Redshift

Post Syndicated from Gaurav Singh original https://aws.amazon.com/blogs/big-data/how-gupshup-built-their-multi-tenant-messaging-analytics-platform-on-amazon-redshift/

Gupshup is a leading conversational messaging platform, powering over 10 billion messages per month. Across verticals, thousands of large and small businesses in emerging markets use Gupshup to build conversational experiences across marketing, sales, and support. Gupshup’s carrier-grade platform provides a single messaging API for 30+ channels, a rich conversational experience-building tool kit for any use case, and a network of emerging market partnerships across messaging channels, device manufacturers, ISVs, and operators.

Objective

Gupshup wanted to build a messaging analytics platform that provided:

  • Build a platform to get detailed insights, data, and reports about WhatsApp/SMS campaigns and track the success of every text message sent by the end customers.
  • Easily gain insight into trends, delivery rates, and speed.
  • Save time and eliminate unnecessary processes.

About Redshift and some relevant features for the use case

Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. It makes it fast, simple, and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Amazon Redshift extends beyond traditional data warehousing workloads, by integrating with the AWS cloud with features such as querying the data lake with Spectrum, semistructured data ingestion and querying with PartiQL, streaming ingestion from Amazon Kinesis and Amazon MSK, Redshift ML, federated queries to Amazon Aurora and Amazon RDS operational databases, and federated materialized views.

In this use case, Gupshup is heavily relying on Amazon Redshift as their data warehouse to process billions of streaming events every month, performing intricate data-pipeline-like operations on such data and incrementally maintaining a hierarchy of aggregations on top of raw data. They have been enjoying the flexibility and convenience that Amazon Redshift has brought to their business. By leveraging the Amazon Redshift materialized views, Gupshup has been able to dramatically improve query performance on recurring and predictable workloads, such as dashboard queries from Business Intelligence (BI) tools. Additionally, extract, load, and transform (ELT) data processing is sped up and made easier. To store commonly used pre-computations and seamlessly utilize them to reduce latency on ensuing analytical queries, Redshift materialized views feature incremental refresh capability which enables Gupshup to be more agile while using less code. Without writing complicated code for incremental updates, they were able to deliver data latency of roughly 15 minutes for some use cases.

Overall architecture and implementation details with Redshift Materialized views

Gupshup uses a CDC mechanism to extract data from their source systems and persist it in S3 in order to meet these needs. A series of materialized view refreshes are used to calculate metrics, after which the incremental data from S3 is loaded into Redshift. This compiled data is then imported into Aurora PostgreSQL Serverless for operational reporting. The ability of Redshift to incrementally refresh materialized views, enabling it to process massive amounts of data progressively, the capacity for scaling, which utilizes concurrency and elastic resizing for vertical scaling, as well as the RA3 architecture, delivers the separation of storage and compute to scale one without worrying about the other, led Gupshup to make this choice. Gupshup chose Aurora PostgreSQL as the operational reporting layer due to its anticipated increase in concurrency and cost-effectiveness for queries that retrieve only precalculated metrics.

Incremental analytics is the main reason for Gupshup to use Redshift. The diagram shows a simplified version of a typical data processing pipeline where data comes via multiple streams. The streams need to be joined together, then enriched by joining with master data tables. This is followed by series of joins and aggregations. All this needs to be performed in incremental manner, providing 30 minutes of latency.

Gupshup uses Redshift’s incremental materialized view feature to accomplish this. All of the join, enrich, and aggregation statements are written using sql statements. The stream-to-stream joins are performed by ingesting both streams in a table sorted by the key fields. Then an incremental MV aggregates data by the key fields. Redshift then automatically takes care of keeping the MVs refreshed incrementally with incoming data. The incremental view maintenance feature works even for hierarchical aggregations with MVs based on other MVs. This allows Gupshup to build an entire processing pipeline incrementally. It has actually helped Gupshup reduce cycle time during the POC and prototyping phases. Moreover, no separate effort is required to process historical data versus live streaming data.

Apart from incremental analytics, Redshift simplifies a lot of operational aspects. E.g., use the snapshot-restore feature to quickly create a green experimental cluster from an existing blue serving cluster. In case the processing logic changes (which happens quite often in prototyping stages), they need to reprocess all historical data. Gupshup uses Redshift’s elastic scaling feature to temporarily scale the cluster up and then scale it down when done. They also use Redshift to directly power some of their high-concurrency dashboards. For such cases, the concurrency scaling feature of Redshift really comes in handy. Apart from this, they have a lot of in-house data analysts who need to run ad hoc queries on live production data. They use the workload management features of Redshift to make sure their analysts can run queries while ensuring that production queries do not get affected.

Benefits realized with Amazon Redshift

  • On-Demand Scaling
  • Ease of use and maintenance with less code
  • Performance benefits with an incremental MV refresh

Conclusion

Gupshup, an enterprise messaging company, needed a scalable data warehouse solution to analyze billions of events generated each month. They chose Amazon Redshift to build a cloud data warehouse that could handle this scale of data and enable fast analytics.

By combining Redshift’s scalability, snapshots, workload management, and low-operational approach, Gupshup provides data-driven insights in less than 15 minutes analytics refresh rate.

Overall, Redshift’s scalability, performance, ease of management, and cost effectiveness have allowed Gupshup to gain data-driven insights from billions of events in near real-time. A scalable and robust data foundation is enabling Gupshup to build innovative messaging products and a competitive advantage.

The incremental refresh of materialized views feature of Redshift allowed us to be more agile with less code:

  • For some use cases, we are able to provide data latency of about 15 minutes, without having to write complex code for incremental updates.
  • The incremental refresh feature is a main differentiating factor that gives Redshift an edge over some of its competitors. I request that you keep improving and enhancing it.

“The incremental refresh of materialized views feature of Redshift allowed us to be more agile with less code”

Pankaj Bisen, Director of AI and Analytics at Gupshup.


About the Authors

Shabi Abbas Sayed is a Senior Technical Account Manager at AWS. He is passionate about building scalable data warehouses and big data solutions working closely with the customers. He works with large ISVs customers, in helping them build and operate secure, resilient, scalable, and high-performance SaaS applications in the cloud.

Gaurav Singh is a Senior Solutions Architect at AWS, specializing in AI/ML and Generative AI. Based in Pune, India, he focuses on helping customers build, deploy, and migrate ML production workloads to SageMaker at scale. In his spare time, Gaurav loves to explore nature, read, and run.

Break data silos and stream your CDC data with Amazon Redshift streaming and Amazon MSK

Post Syndicated from Umesh Chaudhari original https://aws.amazon.com/blogs/big-data/break-data-silos-and-stream-your-cdc-data-with-amazon-redshift-streaming-and-amazon-msk/

Data loses value over time. We hear from our customers that they’d like to analyze the business transactions in real time. Traditionally, customers used batch-based approaches for data movement from operational systems to analytical systems. Batch load can run once or several times a day. A batch-based approach can introduce latency in data movement and reduce the value of data for analytics. Change Data Capture (CDC)-based approach has emerged as alternative to batch-based approaches. A CDC-based approach captures the data changes and makes them available in data warehouses for further analytics in real-time.

CDC tracks changes made in source database, such as inserts, updates, and deletes, and continually updates those changes to target database. When the CDC is high-frequency, the source database is changing rapidly, and the target database (i.e., usually a data warehouse) needs to reflect those changes in near real-time.

With the explosion of data, the number of data systems in organizations has grown. Data silos causes data to live in different sources, which makes it difficult to perform analytics.

To gain deeper and richer insights, you can bring all the changes from different data silos into one place, like data warehouse. This post showcases how to use streaming ingestion to bring data to Amazon Redshift.

Redshift streaming ingestion provides low latency, high-throughput data ingestion, which enables customers to derive insights in seconds instead of minutes. It’s simple to set up, and directly ingests streaming data into your data warehouse from Amazon Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK) without the need to stage in Amazon Simple Storage Service (Amazon S3). You can create materialized views using SQL statements. After that, using materialized-view refresh, you can ingest hundreds of megabytes of data per second.

Solution overview

In this post, we create a low-latency data replication between Amazon Aurora MySQL to Amazon Redshift Data Warehouse, using Redshift streaming ingestion from Amazon MSK. Using Amazon MSK, we securely stream data with a fully managed, highly available Apache Kafka service. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. We store CDC events in Amazon MSK, for a set duration of time, which makes it possible to deliver CDC events to additional destinations such as Amazon S3 data lake.

We deploy Debezium MySQL source Kafka connector on Amazon MSK Connect. Amazon MSK Connect makes it easy to deploy, monitor, and automatically scale connectors that move data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. Amazon MSK Connect is a fully compatible with Apache Kafka Connect, which enables you to lift and shift your Apache Kafka Connect applications with zero code changes.

This solution uses Amazon Aurora MySQL hosting the example database salesdb. Users of the database can perform the row-level INSERT, UPDATE, and DELETE operations to produce the change events in the example salesdb database. Debezium MySQL source Kafka Connector reads these change events and emits them to the Kafka topics in Amazon MSK. Amazon Redshift then read the messages from the Kafka topics from Amazon MSK using Amazon Redshift Streaming feature. Amazon Redshift stores these messages using materialized views and process them as they arrive.

You can see how CDC performs create event by looking at this example here. We are going to use OP field – its mandatory string describes the type of operation that caused the connector to generate the event, in our solution for processing. In this example, c indicates that the operation created a row. Valid values for OP field are:

  • c = create
  • u = update
  • d = delete
  • r = read (applies to only snapshots)

The following diagram illustrates the solution architecture:

This image shows the architecture of the solution. we are reading from Amazon Aurora using the Debezium connector for MySQL. Debezium Connector for MySQL is deployed on Amazon MSK Connect and ingesting the events inside Amazon MSK which are being ingested further to Amazon Redshift MV

The solution workflow consists of the following steps:

  • Amazon Aurora MySQL has a binary log (i.e., binlog) that records all operations(INSERT, UPDATE, DELETE) in the order in which they are committed to the database.
  • Amazon MSK Connect runs the source Kafka Connector called Debezium connector for MySQL, reads the binlog, produces change events for row-level INSERT, UPDATE, and DELETE operations, and emits the change events to Kafka topics in amazon MSK.
  • An Amazon Redshift-provisioned cluster is the stream consumer and can read messages from Kafka topics from Amazon MSK.
  • A materialized view in Amazon Redshift is the landing area for data read from the stream, which is processed as it arrives.
  • When the materialized view is refreshed, Amazon Redshift compute nodes allocate a group of Kafka partition to a compute slice.
  • Each slice consumes data from the allocated partitions until the view reaches parity with last Offset for the Kafka topic.
  • Subsequent materialized view refreshes read data from the last offset of the previous refresh until it reaches parity with the topic data.
  • Inside the Amazon Redshift, we created stored procedure to process CDC records and update target table.

Prerequisites

This post assumes you have a running Amazon MSK Connect stack in your environment with the following components:

  • Aurora MySQL hosting a database. In this post, you use the example database salesdb.
  • The Debezium MySQL connector running on Amazon MSK Connect, which connects Amazon MSK in your Amazon Virtual Private Cloud (Amazon VPC).
  • Amazon MSK cluster

If you don’t have an Amazon MSK Connect stack, then follow the instructions in the MSK Connect lab setup and verify that your source connector replicates data changes to the Amazon MSK topics.

You should provision the Amazon Redshift cluster in same VPC of Amazon MSK cluster. If you haven’t deployed one, then follow the steps here in the AWS Documentation.

We use AWS Identity and Access Management (AWS IAM) authentication for communication between Amazon MSK and Amazon Redshift cluster. Please make sure you have created an AWS IAM role with a trust policy that allows your Amazon Redshift cluster to assume the role. For information about how to configure the trust policy for the AWS IAM role, see Authorizing Amazon Redshift to access other AWS services on your behalf. After it’s created, the role should have the following AWS IAM policy, which provides permission for communication with the Amazon MSK cluster.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "MSKIAMpolicy",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:Connect"
            ],
            "Resource": [
                "arn:aws:kafka:*:0123456789:cluster/xxx/xxx",
                "arn:aws:kafka:*:0123456789:topic/*/*/*"
            ]
        },
        {
            "Sid": "MSKPolicy",
            "Effect": "Allow",
            "Action": [
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": "arn:aws:kafka:*:0123456789:cluster/xxx/xxx"
        }
    ]
}

Please replace the ARN containing xxx from above example policy with your Amazon MSK cluster’s ARN.

  • Also, verify that Amazon Redshift cluster has access to Amazon MSK cluster. In Amazon Redshift Cluster’s security group, add the inbound rule for MSK security group allowing port 9098. To see how to manage redshift cluster security group, refer Managing VPC security groups for a cluster.

image shows, how to add the inbound rule for MSK security group allowing port 9098, In Amazon Redshift Cluster’s security group

  • And, in the Amazon MSK cluster’s security group add the inbound rule allowing port 9098 for leader IP address of your Amazon Redshift Cluster, as shown in the following diagram. You can find the IP address for your Amazon Redshift Cluster’s leader node on properties tab of Amazon Redshift cluster from AWS Management Console.

image shows how to add the inbound rule allowing port 9098 for leader IP address of your Amazon Redshift Cluster,in the Amazon MSK cluster’s security group

Walkthrough

Navigate to the Amazon Redshift service from AWS Management Console, then set up Amazon Redshift streaming ingestion for Amazon MSK by performing the following steps:

  1. Enable_case_sensitive_identifier to true – In case you are using default parameter group for Amazon Redshift Cluster, you won’t be able to set enable_case_sensitive_identifier to true. You can create new parameter group with enable_case_sensitive_identifier to true and attach it to Amazon Redshift cluster. After you modify parameter values, you must reboot any clusters that are associated with the modified parameter group. It may take few minutes for Amazon Redshift cluster to reboot.

This configuration value that determines whether name identifiers of databases, tables, and columns are case sensitive. Once done, please open a new Amazon Redshift Query Editor V2, so that config changes we made are reflected, then follow next steps.

  1. Create an external schema that maps to the streaming data source.
CREATE EXTERNAL SCHEMA MySchema
FROM MSK
IAM_ROLE 'arn:aws:iam::YourRole:role/msk-redshift-streaming'
AUTHENTICATION IAM
CLUSTER_ARN 'arn:aws:kafka:us-east-1:2073196*****:cluster/MSKCluster-msk-connect-lab/849b47a0-65f2-439e-b181-1038ea9d4493-10'; // Replace last part with your cluster ARN, this is just for example.//

Once done, verify if you are seeing below tables created from MSK Topics:

image shows tables created from MSK Topics

  1. Create a materialized view that references the external schema.
CREATE MATERIALIZED VIEW customer_debezium AUTO REFRESH YES AS
SELECT
*,
json_parse(kafka_value) as payload
from
"dev"."myschema"."salesdb.salesdb.CUSTOMER" ; // Replace myshecma with name you have given to your external schema in step 2 //

Now, you can query newly created materialized view customer_debezium using below command.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

Check the materialized view is populated with the CDC records

  1. REFRESH MATERIALIZED VIEW (optional). This step is optional as we have already specified AUTO REFRESH AS YES while creating MV (materialized view).
REFRESH MATERIALIZED VIEW "dev"."public"."customer_debezium";

NOTE: Above the materialized view is auto-refreshed, which means if you don’t see the records immediately, then you have wait for few seconds and rerun the select statement. Amazon Redshift streaming ingestion view also comes with the option of a manual refresh, which allow you to manually refresh the object. You can use the following query that pulls streaming data to Redshift object immediately.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

images shows records from the customer_debezium MV

Process CDC records in Amazon Redshift

In following steps, we create the staging table to hold the CDC data, which is target table that holds the latest snapshot and stored procedure to process CDC records and update in target table.

  1. Create staging table: The staging table is a temporary table that holds all of the data that will be used to make changes to the target table, including both updates and inserts.
CREATE TABLE public.customer_stg (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
ts_ms bigint ENCODE az64,
op character varying(2) ENCODE lzo,
record_rank smallint ENCODE az64,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id); // In this particular example, we have used LZO encoding as LZO encoding works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. //
  1. Create target table

We use customer_target table to load the processed CDC events.

CREATE TABLE public.customer_target (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id);
  1. Create Last_extract_time debezium table and Inserting Dummy value.

We need to store the timestamp of last extracted CDC events. We use of debezium_last_extract table for this purpose. For initial record we insert a dummy value, which enables us to perform a comparison between current and next CDC processing timestamp.

CREATE TABLE public.debezium_last_extract (
process_name character varying(256) ENCODE lzo,
latest_refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE AUTO;

Insert into public.debezium_last_extract VALUES ('customer','1983-01-01 00:00:00');

SELECT * FROM "dev"."public"."debezium_last_extract";
  1. Create stored procedure

This stored procedure processes the CDC records and updates the target table with the latest changes.

CREATE OR REPLACE PROCEDURE public.incremental_sync_customer()

LANGUAGE plpgsql

AS $$

DECLARE

sql VARCHAR(MAX) := '';

max_refresh_time TIMESTAMP;

staged_record_count BIGINT :=0;

BEGIN

-- Get last loaded refresh_time number from target table

sql := 'SELECT MAX(latest_refresh_time) FROM debezium_last_extract where process_name =''customer'';';

EXECUTE sql INTO max_refresh_time;

-- Truncate staging table

EXECUTE 'TRUNCATE customer_stg;';

-- Insert (and transform) latest change record for member with sequence number greater than last loaded sequence number into temp staging table

EXECUTE 'INSERT INTO customer_stg ('||

'select coalesce(payload.after."CUST_ID",payload.before."CUST_ID") ::varchar as customer_id,payload.after."NAME"::varchar as customer_name,payload.after."MKTSEGMENT" ::varchar as market_segment, payload.ts_ms::bigint,payload."op"::varchar, rank() over (partition by coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar order by payload.ts_ms::bigint desc) as record_rank, refresh_time from CUSTOMER_debezium where refresh_time > '''||max_refresh_time||''');';

sql := 'SELECT COUNT(*) FROM customer_stg;';

EXECUTE sql INTO staged_record_count;

RAISE INFO 'Staged member records: %', staged_record_count;

// replace customer_stg with your staging table name //

-- Delete records from target table that also exist in staging table (updated/deleted records)

EXECUTE 'DELETE FROM customer_target using customer_stg WHERE customer_target.customer_id = customer_stg.customer_id';

// replace customer_target with your target table name //

-- Insert all records from staging table into target table

EXECUTE 'INSERT INTO customer_target SELECT customer_id,customer_name, market_segment, refresh_time FROM customer_stg where record_rank =1 and op <> ''d''';

-- Insert max refresh time to control table

EXECUTE 'INSERT INTO debezium_last_extract SELECT ''customer'', max(refresh_time) from customer_target ';

END

$$

images shows stored procedure with name incremental_sync_customer created in above step

Test the solution

Update example salesdb hosted on Amazon Aurora

  1. This will be your Amazon Aurora database and we access it from Amazon Elastic Compute Cloud (Amazon EC2) instance with Name= KafkaClientInstance.
  2. Please replace the Amazon Aurora endpoint with value of your Amazon Aurora endpoint and execute following command and the use salesdb.
mysql -f -u master -h mask-lab-salesdb.xxxx.us-east-1.rds.amazonaws.com --password=S3cretPwd99

image shows the details of the RDS for MySQL
  1. Do an update, insert , and delete in any of the tables created. You can also do update more than once to check the last updated record later in Amazon Redshift.

image shows the insert, updates and delete operations performed on RDS for MySQL

  1. Invoke the stored procedure incremental_sync_customer created in the above steps from Amazon Redshift Query Editor v2. You can manually run proc using following command or schedule it.
call incremental_sync_customer();
  1. Check the target table for latest changes. This step is to check latest values in target table. You’ll see that all the updates and deletes that you did in source table are shown at top as a result order by refresh_time.
SELECT * FROM "dev"."public"."customer_target" order by refresh_time desc;

image shows the records from from customer_target table in descending order

Extending the solution

In this solution, we showed CDC processing for the customer table, and you can use the same approach to extend it to other tables in the example salesdb database or add more databases to MSK Connect configuration property database.include.list.

Our proposed approach can work with any MySQL source supported by Debezium MySQL source Kafka Connector. Similarly, to extend this example to your workloads and use-cases, you need to create the staging and target tables according to the schema of the source table. Then you need to update the coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar as customer_id statements with the column names and types in your source and target tables. Like in example stated in this post, we used LZO encoding as LZO encoding, which works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. Another consideration to keep in mind while creating target and staging tables is choosing a distribution style and key based on data in source database. Here we have chosen distribution style as key with Customer_id, which are based on source data and schema update by following the best practices mentioned here.

Cleaning up

  1. Delete all the Amazon Redshift clusters
  2. Delete Amazon MSK Cluster and MSK Connect Cluster
  3. In case you don’t want to delete Amazon Redshift clusters, you can manually drop MV and tables created during this post using below commands:
drop MATERIALIZED VIEW customer_debezium;
drop TABLE public.customer_stg;
drop TABLE public.customer_target;
drop TABLE public.debezium_last_extract;

Also, please remove inbound security rules added to your Amazon Redshift and Amazon MSK Clusters, along with AWS IAM roles created in the Prerequisites section.

Conclusion

In this post, we showed you how Amazon Redshift streaming ingestion provided high-throughput, low-latency ingestion of streaming data from Amazon Kinesis Data Streams and Amazon MSK into an Amazon Redshift materialized view. We increased speed and reduced cost of streaming data into Amazon Redshift by eliminating the need to use any intermediary services.

Furthermore, we also showed how CDC data can be processed rapidly after generation, using a simple SQL interface that enables customers to perform near real-time analytics on variety of data sources (e.g., Internet-of-Things [ IoT] devices, system telemetry data, or clickstream data) from a busy website or application.

As you explore the options to simplify and enable near real-time analytics for your CDC data,

We hope this post provides you with valuable guidance. We welcome any thoughts or questions in the comments section.


About the Authors

Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with AWS customers to design and build real time data processing systems. He has 13 years of working experience in software engineering including architecting, designing, and developing data analytics systems.

Vishal Khatri is a Sr. Technical Account Manager and Analytics specialist at AWS. Vishal works with State and Local Government helping educate and share best practices with customers by leading and owning the development and delivery of technical content while designing end-to-end customer solutions.

Amazon MSK now provides up to 29% more throughput and up to 24% lower costs with AWS Graviton3 support

Post Syndicated from Sai Maddali original https://aws.amazon.com/blogs/big-data/amazon-msk-now-provides-up-to-29-more-throughput-and-up-to-24-lower-costs-with-aws-graviton3-support/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.

Today, we’re excited to bring the benefits of Graviton3 to Kafka workloads, with Amazon MSK now offering M7g instances for new MSK provisioned clusters. AWS Graviton processors are custom Arm-based processors built by AWS to deliver the best price-performance for your cloud workloads. For example, when running an MSK provisioned cluster using M7g.4xlarge instances, you can achieve up to 27% reduction in CPU usage and up to 29% higher write and read throughput compared to M5.4xlarge instances. These performance improvements, along with M7g’s lower prices provide up to 24% in compute cost savings over M5 instances.

In February 2023, AWS launched new Graviton3-based M7g instances. M7g instances are equipped with DDR5 memory, which provides up to 50% higher memory bandwidth than the DDR4 memory used in previous generations. M7g instances also deliver up to 25% higher storage throughput and up to 88% increase in network throughput compared to similar sized M5 instances to deliver price-performance benefits for Kafka workloads. You can read more about M7g features in New Graviton3-Based General Purpose (m7g) and Memory-Optimized (r7g) Amazon EC2 Instances.

Here are the specs for the M7g instances on MSK:

Name vCPUs Memory Network Bandwidth Storage Bandwidth
M7g.large 2 8 GiB up to 12.5 Gbps up to 10 Gbps
M7g.xlarge 4 16 GiB up to 12.5 Gbps up to 10 Gbps
M7g.2xlarge 8 32 GiB up to 15 Gbps up to 10 Gbps
M7g.4xlarge 16 64 GiB up to 15 Gbps up to 10 Gbps
M7g.8xlarge 32 128 GiB 15 Gbps 10 Gbps
M7g.12xlarge 48 192 GiB 22.5 Gbps 15 Gbps
M7g.16xlarge 64 256 GiB 30 Gbps 20 Gbps

M7g instances on Amazon MSK

Organizations are adopting Amazon MSK to capture and analyze data in real time, run machine learning (ML) workflows, and build event-driven architectures. Amazon MSK enables you to reduce operational overhead and run your applications with higher availability and durability. It also offers a consistent reduction in price-performance with capabilities such as Tiered Storage. With compute making up a large portion of Kafka costs, customers wanted a way to optimize them further and see Graviton instances providing them the quickest path. Amazon MSK has fully tested and validated M7g on all Kafka versions starting with version 2.8.2, making it to run critical workloads and benefit from Gravition3 cost savings.

You can get started by provisioning new clusters with the Graviton3-based M7g instances as the broker type using the AWS Management Console, APIs via the AWS SDK, and the AWS Command Line Interface (AWS CLI). M7g instances support all Amazon MSK and Kafka features, making it straightforward for you to run all your existing Kafka workloads with minimal changes. Amazon MSK supports Graviton3-based M7g instances from large through 16xlarge sizes to run all Kafka workloads.

Let’s take the M7g instances on MSK provisioned clusters for a test drive and see how it compares with Amazon MSK M5 instances.

M7g instances in action

Customers run a wide variety of workloads on Amazon MSK; some are latency sensitive, and some are throughput bound. In this post, we focus on M7g performance impact on throughput-bound workloads. M7g comes with an increase in network and storage throughput, providing a higher throughput per broker compared to an M5-based cluster.

To understand the implications, let’s look at how Kafka uses available throughput for writing or reading data. Every broker in the MSK cluster comes with a bounded storage and network throughput entitlement. Predominantly, writes in Kafka consume both storage and network throughput, whereas reads consume mostly network throughput. This is because a Kafka consumer is typically reading real-time data from a page cache and occasionally goes to disk to process old data. Therefore, the overall throughput gains also change based on the workload’s write to read throughput ratios.

Let’s look at the throughput gains based on an example. Our setup includes an MSK cluster with M7g.4xlarge instances and another with M5.4xlarge instances, with three nodes in three different Availability Zones. We also enabled TLS encryption, AWS Identity and Access Management (IAM) authentication, and a replication factor of 3 across both M7g and M5 MSK clusters. We also applied Amazon MSK best practices for broker configurations, including num.network.threads = 8 and num.io.threads = 16. On the client side for writes, we optimized the batch size with appropriate linger.ms and batch.size configurations. For the workload, we assumed 6 topics each with 64 partitions (384 per broker). For ingestion, we generated load with an average message size of 512 bytes and with one consumer group per topic. The amount of load sent to the clusters was identical.

As we ingest more data into the MSK cluster, the M7g.4xlarge instance supports higher throughput per broker, as shown in the following graph. After an hour of consistent writes, M7g.4xlarge brokers support up to 54 MB/s of write throughput vs. 40 MB/s with M5-based brokers, which represents a 29% increase.

We also see another important observation: M7g-based brokers consume much fewer CPU resources than M5s, even though they support 29% higher throughput. As seen in the following chart, CPU utilization of an M7g-based broker is on average 40%, whereas on an M5-based broker, it’s 47%.

As covered previously, customers may see different performance improvements based on the number of consumer group, batch sizes, and instance size. We recommend referring to MSK Sizing and Pricing to calculate M7g performance gains for your use case or creating a cluster based on M7g instances and benchmark the gains on your own.

Lower costs, with lesser operational burden, and higher resiliency

Since its launch, Amazon MSK has made it cost-effective to run your Kafka workloads, while still improving overall resiliency. Since day 1, you have been able to run brokers in multiple Availability Zones without worrying about additional networking costs. In October 2022, we launched Tiered Storage, which provides virtually unlimited storage at up to 50% lower costs. When you use Tiered Storage, you not only save on overall storage cost but also improve the overall availability and elasticity of your cluster.

Continuing down this path, we are now reducing compute costs for customers while still providing performance improvements. With M7g instances, Amazon MSK provides 24% savings on compute costs compared to similar sized M5 instances. When you move to Amazon MSK, you can not only lower your operational overhead using features such as Amazon MSK Connect, Amazon MSK Replicator, and automatic Kafka version upgrades, but also improve over resiliency and reduce their infrastructure costs.

Pricing and Regions

M7g instances on Amazon MSK are available today in the US (Ohio, N. Virginia, N. California, Oregon), Asia Pacific (Hyderabad, Mumbai, Seoul, Singapore, Sydney, Tokyo), Canada (Central), and EU (Ireland, London, Spain, Stockholm) Regions.

Refer to Amazon MSK pricing to learn about Graivton3-based instances with Amazon MSK pricing.

Summary

In this post, we discussed the performance gains achieved while using Graviton-based M7g instances. These instances can provide significant improvement in read and write throughput compared to similar sized M5 instances for Amazon MSK workloads. To get started, create a new cluster with M7g brokers using the AWS Management Console, and read our documentation for more information.


About the Authors

Sai Maddali is a Senior Manager Product Management at AWS who leads the product team for Amazon MSK. He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

Umesh is a Streaming Solutions Architect at AWS. He works with AWS customers to design and build real time data processing systems. He has 13 years of working experience in software engineering including architecting, designing, and developing data analytics systems.

Lanre Afod is a Solutions Architect focused with Global Financial Services at AWS, passionate about helping customers with deploying secure, scalable, high available and resilient architectures within the AWS Cloud.

AWS Weekly Roundup – EC2 DL2q instances, PartyRock, Amplify’s 6th birthday, and more – November 20, 2023

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-ec2-dl2q-instances-partyrock-amplifys-6th-birthday-and-more-november-20-2023/

Last week I saw an astonishing 160+ new service launches. There were so many updates that we decided to publish a weekly roundup again. This continues the same innovative pace of the previous week as we are getting closer to AWS re:Invent 2023.

Our News Blog team is also finalizing new blog posts for re:Invent to introduce awesome launches with service teams for your reading pleasure. Jeff Barr shared The Road to AWS re:Invent 2023 to explain our blogging journey and process. Please stay tuned in the next week!

Last week’s launches
Here are some of the launches that caught my attention last week:

Amazon EC2 DL2q instances – New DL2q instances are powered by Qualcomm AI 100 Standard accelerators and are the first to feature Qualcomm’s AI technology in the public cloud. With eight Qualcomm AI 100 Standard accelerators and 128 GiB of total accelerator memory, you can run popular generative artificial intelligence (AI) applications and extend to edge devices across smartphones, autonomous driving, personal compute, and extended reality headsets to develop and validate these AI workloads before deploying.

PartyRock for Amazon Bedrock – We introduced PartyRock, a fun and intuitive hands-on, generative AI app-building playground powered by Amazon Bedrock. You can experiment, learn all about prompt engineering, build mini-apps, and share them with your friends—all without writing any code or creating an AWS account.

You also can now access the Meta Llama 2 Chat 13B foundation model and Cohere Command Light, Embed English, and multilingual models for Amazon Bedrock.

AWS Amplify celebrates its sixth birthday – We announced six new launches; a new documentation site, support for Next.js 14 with our hosting and JavaScript library, added custom token providers and an automatic React Native social sign-in update to Amplify Auth, new ChangePassword and DeleteUser account settings components, and updated all Amplify UI packages to use new Amplify JavaScript v6. You can also use wildcard subdomains when using a custom domain with your Amplify application deployed to AWS Amplify Hosting.

Amplify docs site UI

Also check out other News Blog posts about major launches published in the past week:

Other AWS service launches
Here are some other bundled feature launches per AWS service:

Amazon Athena  – You can use a new cost-based optimizer (CBO) to enhance query performance based on table and column statistics, collected by AWS Glue Data Catalog and Athena JDBC 3.x driver, a new alternative that supports almost all authentication plugins. You can also use Amazon EMR Studio to develop and run interactive queries on Amazon Athena.

Amazon CloudWatch – You can use a new CloudWatch metric called EBS Stalled I/O Check to monitor the health of your Amazon EBS volumes, the regular expression for Amazon CloudWatch Logs Live Tail filter pattern syntax to search and match relevant log events, observability of SAP Sybase ASE database in CloudWatch Application Insights, and up to two stats commands in a Log Insights query to perform aggregations on the results.

Amazon CodeCatalyst – You can connect to a Amazon Virtual Private Cloud (Amazon VPC) from CodeCatalyst Workflows, provision infrastructure using Terraform within CodeCatalyst Workflows, access CodeCatalyst with your workforce identities configured in IAM Identity Center, and create teams made up of members of the CodeCatalyst space.

Amazon Connect – You can use a pre-built queue performance dashboard and Contact Lens conversational analytics dashboard to view and compare real-time and historical aggregated queue performance. You can use quick responses for chats, previously written formats such as typing in ‘/#greet’ to insert a personalized response, and scanning attachments to detect malware or other unwanted content.

AWS Glue – AWS Glue for Apache Spark added new six database connectors: Teradata, SAP HANA, Azure SQL, Azure Cosmos DB, Vertica, and MongoDB, as well as the native connectivity to Amazon OpenSearch Service.

AWS Lambda – You can see single pane view of metrics, logs, and traces in the AWS Lambda console and advanced logging controls to natively capture logs in JSON structured format. You can view the SAM template on the Lambda console and export the function’s configuration to AWS Application Composer. AWS Lambda also supports Java 21 and NodeJS 20 versions built on the new Amazon Linux 2023 runtime.

AWS Local Zones in Dallas – You can enable the new Local Zone in Dallas, Texas, us-east-1-dfw-2a, with Amazon EC2 C6i, M6i, R6i, C6gn, and M6g instances and Amazon EBS volume types gp2, gp3, io1, sc1, and st1. You can also access Amazon ECS, Amazon EKS, Application Load Balancer, and AWS Direct Connect in this new Local Zone to support a broad set of workloads at the edge.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) – You can standardize access control to Kafka resources using AWS Identity and Access Management (IAM) and build Kafka clients for Amazon MSK Serverless written in all programming languages. These are open source client helper libraries and code samples for popular languages, including Java, Python, Go, and JavaScript. Also, Amazon MSK now supports an enhanced version of Apache Kafka 3.6.0 that offers generally available Tiered Storage and automatically sends you storage capacity alerts when you are at risk of exhausting your storage.

Amazon OpenSearch Service Ingestion – You can migrate your data from Elasticsearch version 7.x clusters to the latest versions of Amazon OpenSearch Service and use persistent buffering to protect the durability of incoming data.

Amazon RDS –Amazon RDS for MySQL now supports creating active-active clusters using the Group Replication plugin, upgrading MySQL 5.7 snapshots to MySQL 8.0, and Innovation Release version of MySQL 8.1.

Amazon RDS Custom for SQL Server extends point-in-time recovery support for up to 1,000 databases, supports Service Master Key Retention to use transparent data encryption (TDE), table- and column-level encryption, DBMail and linked servers, and use SQL Server Developer edition with the bring your own media (BYOM).

Additionally, Amazon RDS Multi-AZ deployments with two readable standbys now supports minor version upgrades and system maintenance updates with typically less than one second of downtime when using Amazon RDS Proxy.

AWS Partner Central – You can use an improved user experience in AWS Partner Central to build and promote your offerings and the new Investments tab in the Partner Analytics Dashboard to gain actionable insights. You can now link accounts and associated users between Partner Central and AWS Marketplace and use an enhanced co-sell experience with APN Customer Engagements (ACE) manager.

Amazon QuickSight – You can programmatically manage user access and custom permissions support for roles to restrict QuickSight functionality to the QuickSight account for IAM Identity Center and Active Directory using APIs. You can also use shared restricted folders, a Contributor role and support for data source asset types in folders and the Custom Week Start feature, an addition designed to enhance the data analysis experience for customers across diverse industries and social contexts.

AWS Trusted Advisor – You can use new APIs to programmatically access Trusted Advisor best practices checks, recommendations, and prioritized recommendations and 37 new Amazon RDS checks that provide best practices guidance by analyzing DB instance configuration, usage, and performance data.

There’s a lot more launch news that I haven’t covered. See AWS What’s New for more details.

See you virtually in AWS re:Invent
AWS re:Invent 2023Next week we’ll hear the latest from AWS, learn from experts, and connect with the global cloud community in Las Vegas. If you come, check out the agenda, session catalog, and attendee guides before your departure.

If you’re not able to attend re:Invent in person this year, we’re offering the option to livestream our Keynotes and Innovation Talks. With the registration for online pass, you will have access to on-demand keynote, Innovation Talks, and selected breakout sessions after the event.

Channy

Triggering AWS Lambda function from a cross-account Amazon Managed Streaming for Apache Kafka

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/compute/triggering-aws-lambda-function-from-a-cross-account-amazon-managed-streaming-for-apache-kafka/

This post is written by Subham Rakshit, Senior Specialist Solutions Architect, and Ismail Makhlouf, Senior Specialist Solutions Architect.

Many organizations use a multi-account strategy for stream processing applications. This involves decomposing the overall architecture into a single producer account and many consumer accounts. Within AWS, in the producer account, you can use Amazon Managed Streaming for Apache Kafka (Amazon MSK), and in their consumer accounts have AWS Lambda functions for event consumption. This blog post explains how you can trigger Lambda functions from a cross-account Amazon MSK cluster.

The Lambda event sourcing mapping (ESM) for Amazon MSK continuously polls for new events from the Amazon MSK cluster, aggregates them into batches, and then triggers the target Lambda function. The ESM for Amazon MSK functions as a serverless set of Kafka consumers that ensures that each event is processed at least once. Additionally, events are processed in the same order they are received within each Kafka partition. In addition, the ESM batches the stream of data and filters the events based on configured logic.

Overview

Amazon MSK supports two different deployment types: provisioned and serverless. Triggering a Lambda function from a cross-account Amazon MSK cluster is only supported with a provisioned cluster deployed within the same Region. To facilitate this functionality, Amazon MSK uses multi-VPC private connectivity, powered by AWS PrivateLink, which simplifies connecting Kafka consumers hosted in different AWS accounts to an Amazon MSK cluster.

The following diagram illustrates the architecture of this example:

Architecture Diagram

The architecture is divided in two parts: the producer and the consumer.

In the producer account, you have the Amazon MSK cluster with multi-VPC connectivity enabled. Multi-VPC connectivity is only available for authenticated Amazon MSK clusters. Cluster policies are required to grant permissions to other AWS accounts, allowing them to establish private connectivity to the Amazon MSK cluster. You can delegate permissions to relevant roles or users. When combined with AWS Identity and Access Management (IAM) client authentication, cluster policies offer fine-grained control over Kafka data plane permissions for connecting applications.

In the consumer account, you have the Lambda ESM for Amazon MSK and the managed VPC connection deployed within the same VPC. The managed VPC connection allows private connectivity from the consumer application VPC to the Amazon MSK cluster. The Lambda ESM for Amazon MSK connects to the cross-account Amazon MSK cluster via IAM authentication. It also supports SASL/SCRAM, and mutual TLS (mTLS) authenticated clusters. The ESM receives the event from the Kafka topic and invokes the Lambda function to process it.

Deploying the example application

To set up the Lambda function trigger from a cross-account Amazon MSK cluster as the event source, follow these steps. The AWS CloudFormation templates for deploying the example are accessible in the GitHub repository.

As a part of this example, some sample data is published using the Kafka console producer and Lambda processes these events and writes to Amazon S3.

Pre-requisites

For this example, you need two AWS accounts. This post uses the following naming conventions:

  • Producer (for example, account No: 1111 1111 1111): Account that hosts the Amazon MSK cluster and Kafka client instance.
  • Consumer (for example, account No: 2222 2222 2222): Account that hosts the Lambda function and consumes events from Amazon MSK.

To get started:

  1. Clone the repository locally:
    git clone https://github.com/aws-samples/lambda-cross-account-msk.git
  2. Set up the producer account: you must configure the VPC networking, deploy the Amazon MSK cluster, and a Kafka client instance to publish data. To do this, deploy the CloudFormation template producer-account.yaml from the AWS console and take note of the MSKClusterARN from the CloudFormation outputs tab.
  3. Set up the consumer account: To set up the consumer account, you need the Lambda function, IAM role used by the Lambda function, and S3 bucket receiving the data. For this, deploy the CloudFormation template consumer-account.yaml from the AWS console with the input parameter MSKAccountId, that is the producer AWS account ID (for example, account Id: 1111 1111 1111). Note the LambdaRoleArn from the CloudFormation outputs tab.

Setting up multi-VPC connectivity in the Amazon MSK cluster

Once the accounts are created, you must enable connectivity between them. By enabling multi-VPC private connectivity in the Amazon MSK cluster, you set up the network connection to allow the cross-account consumers to connect to the cluster.

  1. In the producer account, navigate to the Amazon MSK console.
  2. Choose producer-cluster, and go to the Properties tab.
  3. Scroll to Networking settings, choose Edit, and select Turn on multi-VPC connectivity. This takes some time, then appears as follows.Networking settings
  4. Add the necessary cluster policy to allow cross-account consumers to connect to Amazon MSK. In the producer account, deploy the CloudFormation template producer-msk-cluster-policy.yaml from the AWS console with the following input parameters:
    • MSKClusterArnAmazon Resource Name (ARN) of the Amazon MSK cluster in producer account. Find this information in the CloudFormation output of producer-account.yaml.
    • LambdaRoleArn – ARN of the IAM role attached to the Lambda function in the consumer account. Find this information in the CloudFormation output of consumer-account.yaml.
    • LambdaAccountId – Consumer AWS account ID (for example, account Id: 2222 2222 2222).

Creating a Kafka topic in Amazon MSK and publishing events

In the producer account, navigate to the Amazon MSK console. Choose the Amazon MSK cluster named producer-cluster. Choose View client information to show the bootstrap server.

Client information

The CloudFormation template also deploys a Kafka client instance to create topics and publish events.

To access the client, go to the Amazon EC2 console and choose the instance producer-KafkaClientInstance1. Connect to EC2 instance with Session Manager:

sudo su - ec2-user
#Set MSK Broker IAM endpoint
export BS=<<Provide IAM bootstrap address here>>

You must use the single-VPC Private endpoint for the Amazon MSK cluster and not the multi-VPC private endpoint, as you are going to publish events from a Kafka console producer from the producer account.

Run these scripts to create the customer topic and publish sample events in the topic:

./kafka_create_topic.sh
./kafka_produce_events.sh

Creating a managed VPC connection in the consumer account

To establish a connection to the Amazon MSK cluster in the producer account, you must create a managed VPC connection in the consumer account. Lambda communicates with cross-account Amazon MSK through this managed VPC connection.

For detailed setup steps, read the Amazon MSK managed VPC connection documentation.

Configuring the Lambda ESM for Amazon MSK

The final step is to set up the Lambda ESM for Amazon MSK. Setting up the ESM enables you to connect to the Amazon MSK cluster in the producer account via the managed VPC endpoint. This allows you to trigger the Lambda function to process the data produced from the Kafka topic:

  1. In the consumer account, go to the Lambda console.
  2. Open the Lambda function msk-lambda-cross-account-iam.
  3. Go to the Configuration tab, select Triggers, and choose Add Trigger.
  4. For Trigger configuration, select Amazon MSK.

Lambda trigger

To configure this trigger:

  1. Select the shared Amazon MSK cluster. This automatically defaults to the IAM authentication that is used to connect to the cluster.
    MSK Lambda trigger
  2. By default, the Active trigger check box is enabled. This ensures that the trigger is in the active state after creation. For the other values:
    1. Keep the Batch size default to 100.
    2. Change the Starting Position to Trim horizon.
    3. Set the Topic name as customer.
    4. Set the Consumer Group ID as msk-lambda-iam.

Trigger configuration

Scroll to the bottom and choose Add. This starts creating the Amazon MSK trigger, which takes several minutes. After creation, the state of the trigger shows as Enabled.

Verifying the output on the consumer side

The Lambda function receives the events and writes them in an S3 bucket.

To validate that the function is working, go to the consumer account and navigate to the S3 console. Search for the cross-account-lambda-consumer-data-<<REGION>>-<<AWS Account Id>> bucket. In the bucket, you see the customer-data-<<datetime>>.csv files.

S3 bucket objects

Cleaning up

You must empty and delete the S3 bucket, managed VPC connection, and the Lambda ESM for Amazon MSK manually from the consumer account. Next, delete the CloudFormation stacks from the AWS console from both the producer and consumer accounts to remove all other resources created as a part of the example.

Conclusion

With Lambda and Amazon MSK, you can now build a decentralized application distributed across multiple AWS accounts. This post shows how you can set up Amazon MSK as an event source for cross-account Lambda functions and also walks you through the configuration required in both producer and consumer accounts.

For further reading on AWS Lambda with Amazon MSK as an event source, visit the documentation.

For more serverless learning resources, visit Serverless Land.