All posts by Ali Alemi

Exploring real-time streaming for generative AI Applications

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/exploring-real-time-streaming-for-generative-ai-applications/

Foundation models (FMs) are large machine learning (ML) models trained on a broad spectrum of unlabeled and generalized datasets. FMs, as the name suggests, provide the foundation to build more specialized downstream applications, and are unique in their adaptability. They can perform a wide range of different tasks, such as natural language processing, classifying images, forecasting trends, analyzing sentiment, and answering questions. This scale and general-purpose adaptability are what makes FMs different from traditional ML models. FMs are multimodal; they work with different data types such as text, video, audio, and images. Large language models (LLMs) are a type of FM and are pre-trained on vast amounts of text data and typically have application uses such as text generation, intelligent chatbots, or summarization.

Streaming data facilitates the constant flow of diverse and up-to-date information, enhancing the models’ ability to adapt and generate more accurate, contextually relevant outputs. This dynamic integration of streaming data enables generative AI applications to respond promptly to changing conditions, improving their adaptability and overall performance in various tasks.

To better understand this, imagine a chatbot that helps travelers book their travel. In this scenario, the chatbot needs real-time access to airline inventory, flight status, hotel inventory, latest price changes, and more. This data usually comes from third parties, and developers need to find a way to ingest this data and process the data changes as they happen.

Batch processing is not the best fit in this scenario. When data changes rapidly, processing it in a batch may result in stale data being used by the chatbot, providing inaccurate information to the customer, which impacts the overall customer experience. Stream processing, however, can enable the chatbot to access real-time data and adapt to changes in availability and price, providing the best guidance to the customer and enhancing the customer experience.

Another example is an AI-driven observability and monitoring solution where FMs monitor real-time internal metrics of a system and produces alerts. When the model finds an anomaly or abnormal metric value, it should immediately produce an alert and notify the operator. However, the value of such important data diminishes significantly over time. These notifications should ideally be received within seconds or even while it’s happening. If operators receive these notifications minutes or hours after they happened, such an insight is not actionable and has potentially lost its value. You can find similar use cases in other industries such as retail, car manufacturing, energy, and the financial industry.

In this post, we discuss why data streaming is a crucial component of generative AI applications due to its real-time nature. We discuss the value of AWS data streaming services such as Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Firehose in building generative AI applications.

In-context learning

LLMs are trained with point-in-time data and have no inherent ability to access fresh data at inference time. As new data appears, you will have to continuously fine-tune or further train the model. This is not only an expensive operation, but also very limiting in practice because the rate of new data generation far supersedes the speed of fine-tuning. Additionally, LLMs lack contextual understanding and rely solely on their training data, and are therefore prone to hallucinations. This means they can generate a fluent, coherent, and syntactically sound but factually incorrect response. They are also devoid of relevance, personalization, and context.

LLMs, however, have the capacity to learn from the data they receive from the context to more accurately respond without modifying the model weights. This is called in-context learning, and can be used to produce personalized answers or provide an accurate response in the context of organization policies.

For example, in a chatbot, data events could pertain to an inventory of flights and hotels or price changes that are constantly ingested to a streaming storage engine. Furthermore, data events are filtered, enriched, and transformed to a consumable format using a stream processor. The result is made available to the application by querying the latest snapshot. The snapshot constantly updates through stream processing; therefore, the up-to-date data is provided in the context of a user prompt to the model. This allows the model to adapt to the latest changes in price and availability. The following diagram illustrates a basic in-context learning workflow.

A commonly used in-context learning approach is to use a technique called Retrieval Augmented Generation (RAG). In RAG, you provide the relevant information such as most relevant policy and customer records along with the user question to the prompt. This way, the LLM generates an answer to the user question using additional information provided as context. To learn more about RAG, refer to Question answering using Retrieval Augmented Generation with foundation models in Amazon SageMaker JumpStart.

A RAG-based generative AI application can only produce generic responses based on its training data and the relevant documents in the knowledge base. This solution falls short when a near-real-time personalized response is expected from the application. For example, a travel chatbot is expected to consider the user’s current bookings, available hotel and flight inventory, and more. Moreover, the relevant customer personal data (commonly known as the unified customer profile) is usually subject to change. If a batch process is employed to update the generative AI’s user profile database, the customer may receive dissatisfying responses based on old data.

In this post, we discuss the application of stream processing to enhance a RAG solution used for building question answering agents with context from real-time access to unified customer profiles and organizational knowledge base.

Near-real-time customer profile updates

Customer records are typically distributed across data stores within an organization. For your generative AI application to provide a relevant, accurate, and up-to-date customer profile, it is vital to build streaming data pipelines that can perform identity resolution and profile aggregation across the distributed data stores. Streaming jobs constantly ingest new data to synchronize across systems and can perform enrichment, transformations, joins, and aggregations across windows of time more efficiently. Change data capture (CDC) events contain information about the source record, updates, and metadata such as time, source, classification (insert, update, or delete), and the initiator of the change.

The following diagram illustrates an example workflow for CDC streaming ingestion and processing for unified customer profiles.

In this section, we discuss the main components of a CDC streaming pattern required to support RAG-based generative AI applications.

CDC streaming ingestion

A CDC replicator is a process that collects data changes from a source system (usually by reading transaction logs or binlogs) and writes CDC events with the exact same order they occurred in a streaming data stream or topic. This involves a log-based capture with tools such as AWS Database Migration Service (AWS DMS) or open source connectors such as Debezium for Apache Kafka connect. Apache Kafka Connect is part of the Apache Kafka environment, allowing data to be ingested from various sources and delivered to variety of destinations. You can run your Apache Kafka connector on Amazon MSK Connect within minutes without worrying about configuration, setup, and operating an Apache Kafka cluster. You only need to upload your connector’s compiled code to Amazon Simple Storage Service (Amazon S3) and set up your connector with your workload’s specific configuration.

There are also other methods for capturing data changes. For example, Amazon DynamoDB provides a feature for streaming CDC data to Amazon DynamoDB Streams or Kinesis Data Streams. Amazon S3 provides a trigger to invoke an AWS Lambda function when a new document is stored.

Streaming storage

Streaming storage functions as an intermediate buffer to store CDC events before they get processed. Streaming storage provides reliable storage for streaming data. By design, it is highly available and resilient to hardware or node failures and maintains the order of the events as they are written. Streaming storage can store data events either permanently or for a set period of time. This allows stream processors to read from part of the stream if there is a failure or a need for re-processing. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. Amazon MSK is a fully managed, highly available, and secure service provided by AWS for running Apache Kafka.

Stream processing

Stream processing systems should be designed for parallelism to handle high data throughput. They should partition the input stream between multiple tasks running on multiple compute nodes. Tasks should be able to send the result of one operation to the next one over the network, making it possible for processing data in parallel while performing operations such as joins, filtering, enrichment, and aggregations. Stream processing applications should be able to process events with regards to the event time for use cases where events could arrive late or correct computation relies on the time events occur rather than the system time. For more information, refer to Notions of Time: Event Time and Processing Time.

Stream processes continuously produce results in the form of data events that need to be output to a target system. A target system could be any system that can integrate directly with the process or via streaming storage as in intermediary. Depending on the framework you choose for stream processing, you will have different options for target systems depending on available sink connectors. If you decide to write the results to an intermediary streaming storage, you can build a separate process that reads events and applies changes to the target system, such as running an Apache Kafka sink connector. Regardless of which option you choose, CDC data needs extra handling due to its nature. Because CDC events carry information about updates or deletes, it’s important that they merge in the target system in the right order. If changes are applied in the wrong order, the target system will be out of sync with its source.

Apache Flink is a powerful stream processing framework known for its low latency and high throughput capabilities. It supports event time processing, exactly-once processing semantics, and high fault tolerance. Additionally, it provides native support for CDC data via a special structure called dynamic tables. Dynamic tables mimic the source database tables and provide a columnar representation of the streaming data. The data in dynamic tables changes with every event that is processed. New records can be appended, updated, or deleted at any time. Dynamic tables abstract away the extra logic you need to implement for each record operation (insert, update, delete) separately. For more information, refer to Dynamic Tables.

With Amazon Managed Service for Apache Flink, you can run Apache Flink jobs and integrate with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

AWS Glue is a fully managed extract, transform, and load (ETL) service, which means AWS handles the infrastructure provisioning, scaling, and maintenance for you. Although it’s primarily known for its ETL capabilities, AWS Glue can also be used for Spark streaming applications. AWS Glue can interact with streaming data services such as Kinesis Data Streams and Amazon MSK for processing and transforming CDC data. AWS Glue can also seamlessly integrate with other AWS services such as Lambda, AWS Step Functions, and DynamoDB, providing you with a comprehensive ecosystem for building and managing data processing pipelines.

Unified customer profile

Overcoming the unification of the customer profile across a variety of source systems requires the development of robust data pipelines. You need data pipelines that can bring and synchronize all records into one data store. This data store provides your organization with the holistic customer records view that is needed for operational efficiency of RAG-based generative AI applications. For building such a data store, an unstructured data store would be best.

An identity graph is a useful structure for creating a unified customer profile because it consolidates and integrates customer data from various sources, ensures data accuracy and deduplication, offers real-time updates, connects cross-systems insights, enables personalization, enhances customer experience, and supports regulatory compliance. This unified customer profile empowers the generative AI application to understand and engage with customers effectively, and adhere to data privacy regulations, ultimately enhancing customer experiences and driving business growth. You can build your identity graph solution using Amazon Neptune, a fast, reliable, fully managed graph database service.

AWS provides a few other managed and serverless NoSQL storage service offerings for unstructured key-value objects. Amazon DocumentDB (with MongoDB compatibility) is a fast, scalable, highly available, and fully managed enterprise document database service that supports native JSON workloads. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

Near-real-time organizational knowledge base updates

Similar to customer records, internal knowledge repositories such as company policies and organizational documents are siloed across storage systems. This is typically unstructured data and is updated in a non-incremental fashion. The use of unstructured data for AI applications is effective using vector embeddings, which is a technique of representing high dimensional data such as text files, images, and audio files as multi-dimensional numeric.

AWS provides several vector engine services, such as Amazon OpenSearch Serverless, Amazon Kendra, and Amazon Aurora PostgreSQL-Compatible Edition with the pgvector extension for storing vector embeddings. Generative AI applications can enhance the user experience by transforming the user prompt into a vector and use it to query the vector engine to retrieve contextually relevant information. Both the prompt and the vector data retrieved are then passed to the LLM to receive a more precise and personalized response.

The following diagram illustrates an example stream-processing workflow for vector embeddings.

Knowledge base contents need to be converted to vector embeddings before being written to the vector data store. Amazon Bedrock or Amazon SageMaker can help you access the model of your choice and expose a private endpoint for this conversion. Furthermore, you can use libraries such as LangChain to integrate with these endpoints. Building a batch process can help you convert your knowledge base content to vector data and store it in a vector database initially. However, you need to rely on an interval to reprocess the documents to synchronize your vector database with changes in your knowledge base content. With a large number of documents, this process can be inefficient. Between these intervals, your generative AI application users will receive answers according to the old content, or will receive an inaccurate answer because the new content is not vectorized yet.

Stream processing is an ideal solution for these challenges. It produces events as per existing documents initially and further monitors the source system and creates a document change event as soon as they occur. These events can be stored in streaming storage and wait to be processed by a streaming job. A streaming job reads these events, loads the content of the document, and transforms the contents to an array of related tokens of words. Each token further transforms into vector data via an API call to an embedding FM. Results are sent for storage to the vector storage via a sink operator.

If you’re using Amazon S3 for storing your documents, you can build an event-source architecture based on S3 object change triggers for Lambda. A Lambda function can create an event in the desired format and write that to your streaming storage.

You can also use Apache Flink to run as a streaming job. Apache Flink provides the native FileSystem source connector, which can discover existing files and read their contents initially. After that, it can continuously monitor your file system for new files and capture their content. The connector supports reading a set of files from distributed file systems such as Amazon S3 or HDFS with a format of plain text, Avro, CSV, Parquet, and more, and produces a streaming record. As a fully managed service, Managed Service for Apache Flink removes the operational overhead of deploying and maintaining Flink jobs, allowing you to focus on building and scaling your streaming applications. With seamless integration into the AWS streaming services such as Amazon MSK or Kinesis Data Streams, it provides features like automatic scaling, security, and resiliency, providing reliable and efficient Flink applications for handling real-time streaming data.

Based on your DevOps preference, you can choose between Kinesis Data Streams or Amazon MSK for storing the streaming records. Kinesis Data Streams simplifies the complexities of building and managing custom streaming data applications, allowing you to focus on deriving insights from your data rather than infrastructure maintenance. Customers using Apache Kafka often opt for Amazon MSK due to its straightforwardness, scalability, and dependability in overseeing Apache Kafka clusters within the AWS environment. As a fully managed service, Amazon MSK takes on the operational complexities associated with deploying and maintaining Apache Kafka clusters, enabling you to concentrate on constructing and expanding your streaming applications.

Because a RESTful API integration suits the nature of this process, you need a framework that supports a stateful enrichment pattern via RESTful API calls to track for failures and retry for the failed request. Apache Flink again is a framework that can do stateful operations in at-memory speed. To understand the best ways to make API calls via Apache Flink, refer to Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink.

Apache Flink provides native sink connectors for writing data to vector datastores such as Amazon Aurora for PostgreSQL with pgvector or Amazon OpenSearch Service with VectorDB. Alternatively, you can stage the Flink job’s output (vectorized data) in an MSK topic or a Kinesis data stream. OpenSearch Service provides support for native ingestion from Kinesis data streams or MSK topics. For more information, refer to Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion and Loading streaming data from Amazon Kinesis Data Streams.

Feedback analytics and fine-tuning

It’s important for data operation managers and AI/ML developers to get insight about the performance of the generative AI application and the FMs in use. To achieve that, you need to build data pipelines that calculate important key performance indicator (KPI) data based on the user feedback and variety of application logs and metrics. This information is useful for stakeholders to gain real-time insight about the performance of the FM, the application, and overall user satisfaction about the quality of support they receive from your application. You also need to collect and store the conversation history for further fine-tuning your FMs to improve their ability in performing domain-specific tasks.

This use case fits very well in the streaming analytics domain. Your application should store each conversation in streaming storage. Your application can prompt users about their rating of each answer’s accuracy and their overall satisfaction. This data can be in a format of a binary choice or a free form text. This data can be stored in a Kinesis data stream or MSK topic, and get processed to generate KPIs in real time. You can put FMs to work for users’ sentiment analysis. FMs can analyze each answer and assign a category of user satisfaction.

Apache Flink’s architecture allows for complex data aggregation over windows of time. It also provides support for SQL querying over stream of data events. Therefore, by using Apache Flink, you can quickly analyze raw user inputs and generate KPIs in real time by writing familiar SQL queries. For more information, refer to Table API & SQL.

With Amazon Managed Service for Apache Flink Studio, you can build and run Apache Flink stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by Apache Zeppelin and use Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. With support for user-defined functions (UDFs), Apache Flink allows for building custom operators to integrate with external resources such as FMs for performing complex tasks such as sentiment analysis. You can use UDFs to compute various metrics or enrich user feedback raw data with additional insights such as user sentiment. To learn more about this pattern, refer to Proactively addressing customer concern in real-time with GenAI, Flink, Apache Kafka, and Kinesis.

With Managed Service for Apache Flink Studio, you can deploy your Studio notebook as a streaming job with one click. You can use native sink connectors provided by Apache Flink to send the output to your storage of choice or stage it in a Kinesis data stream or MSK topic. Amazon Redshift and OpenSearch Service are both ideal for storing analytical data. Both engines provide native ingestion support from Kinesis Data Streams and Amazon MSK via a separate streaming pipeline to a data lake or data warehouse for analysis.

Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses and data lakes, using AWS-designed hardware and machine learning to deliver the best price-performance at scale. OpenSearch Service offers visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

You can use the outcome of such analysis combined with user prompt data for fine-tuning the FM when is needed. SageMaker is the most straightforward way to fine-tune your FMs. Using Amazon S3 with SageMaker provides a powerful and seamless integration for fine-tuning your models. Amazon S3 serves as a scalable and durable object storage solution, enabling straightforward storage and retrieval of large datasets, training data, and model artifacts. SageMaker is a fully managed ML service that simplifies the entire ML lifecycle. By using Amazon S3 as the storage backend for SageMaker, you can benefit from the scalability, reliability, and cost-effectiveness of Amazon S3, while seamlessly integrating it with SageMaker training and deployment capabilities. This combination enables efficient data management, facilitates collaborative model development, and makes sure that ML workflows are streamlined and scalable, ultimately enhancing the overall agility and performance of the ML process. For more information, refer to Fine-tune Falcon 7B and other LLMs on Amazon SageMaker with @remote decorator.

With a file system sink connector, Apache Flink jobs can deliver data to Amazon S3 in open format (such as JSON, Avro, Parquet, and more) files as data objects. If you prefer to manage your data lake using a transactional data lake framework (such as Apache Hudi, Apache Iceberg, or Delta Lake), all of these frameworks provide a custom connector for Apache Flink. For more details, refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi.

Summary

For a generative AI application based on a RAG model, you need to consider building two data storage systems, and you need to build data operations that keep them up to date with all the source systems. Traditional batch jobs are not sufficient to process the size and diversity of the data you need to integrate with your generative AI application. Delays in processing the changes in source systems result in an inaccurate response and reduce the efficiency of your generative AI application. Data streaming enables you to ingest data from a variety of databases across various systems. It also allows you to transform, enrich, join, and aggregate data across many sources efficiently in near-real time. Data streaming provides a simplified data architecture to collect and transform users’ real-time reactions or comments on the application responses, helping you deliver and store the results in a data lake for model fine-tuning. Data streaming also helps you optimize data pipelines by processing only the change events, allowing you to respond to data changes more quickly and efficiently.

Learn more about AWS data streaming services and get started building your own data streaming solution.


About the Authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Imtiaz (Taz) Sayed is the World-Wide Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Amazon MSK Serverless now supports Kafka clients written in all programming languages

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/amazon-msk-serverless-now-supports-kafka-clients-written-in-all-programming-languages/

Amazon MSK Serverless is a cluster type for Amazon Managed Streaming for Apache Kafka (Amazon MSK) that is the most straightforward way to run Apache Kafka clusters without having to manage compute and storage capacity. With MSK Serverless, you can run your applications without having to provision, configure, or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring and moving them to even load across a cluster.

With today’s launch, MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. Administrators can simplify and standardize access control to Kafka resources using AWS Identity and Access Management (IAM). This support for IAM in Amazon MSK is based on SASL/OUATHBEARER, an open standard for authorization and authentication.

In this post, we show how you can connect your applications to MSK Serverless with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including Java, Python, Go, JavaScript, and .NET. Using IAM authentication and authorization is the preferred choice of many customers because you can secure Kafka resources just like you do with all other AWS services. Additionally, you get all the other benefits of IAM, such as temporary role-based credentials, precisely scoped permission policies, and more. Now you can use MSK Serverless with IAM-based authentication more broadly with the support for multiple languages.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. For example, you can create an IAM user and a policy that allows the user to write to a specific Kafka topic but restricts access to other resources without worrying about managing Kafka ACLs. After you provide the identity policies with the necessary permissions, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. With OAUTHBEARER support, you can build clients that can work across both Amazon MSK and other Kafka environments. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, Amazon MSK provides new code libraries for the following programming languages in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the bootstrap address along with its request to access Apache Kafka resources.
  3. The MSK Serverless cluster decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK Serverless clusters with no additional cost. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. You also need to make sure the security group associated with your MSK Serverless cluster has an inbound rule allowing the traffic from the client applications in the associated VPCs to the TCP port number 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

For this post, we use the Python programming language. We also use https://github.com/dpkp/kafka-python as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for Python with your Kafka client library, run the following command:
    $ pip install aws-msk-iam-sasl-signer-python

  2. Import the installed Amazon MSK IAM SASL signer library in your code:
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import socket
    import time
    from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    class MSKTokenProvider():
            def token(self):
                token, _ = MSKAuthTokenProvider.generate_auth_token('<your aws region>')
                return token

  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your Python Kafka client properties, and pass the token provider in the configuration object:
    tp = MSKTokenProvider()
    
        producer = KafkaProducer(
            bootstrap_servers='<Amazon MSK Serverless bootstrap string>',
            security_protocol='SASL_SSL',
            sasl_mechanism='OAUTHBEARER',
            sasl_oauth_token_provider=tp,
            client_id=”my.kafka.client.unique.id”,
        )

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. You can run your applications without having to configure and manage the infrastructure or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring, and ensures an even balance of partition distribution across brokers in the cluster (auto-balancing).

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Amazon MSK IAM authentication now supports all programming languages

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/amazon-msk-iam-authentication-now-supports-all-programming-languages/

The AWS Identity and Access Management (IAM) authentication feature in Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports all programming languages. Administrators can simplify and standardize access control to Kafka resources using IAM. This support is based on SASL/OUATHBEARER, an open standard for authorization and authentication. Both Amazon MSK provisioned and serverless cluster types support the new Amazon MSK IAM expansion to all programming languages.

In this post, we show how you can connect your applications to MSK clusters with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including JavaPythonGoJavaScript, and .NET. You can also use standard IAM access controls such as temporary role-based credentials and precisely scoped permission policies more broadly with the multiple language support on Amazon MSK.

For clients that need to connect from other VPCs to an MSK cluster, whether in a same or a different AWS account, you can enable multi-VPC private connectivity and cluster policy support. IAM access control via cluster policy helps you manage all access to the cluster and topics in one place. For example, you can control which IAM principals have write access to certain topics, and which principals can only read from them. Users who are using IAM client authentication can also add permissions for required kafka-cluster actions in the cluster resource policy.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. After IAM authentication is enabled for your cluster, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, new code libraries for the following programming languages are available in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the IAM bootstrap broker addresses along with its request to access Apache Kafka resources.
  3. The MSK broker decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK clusters with Kafka version 2.7.1 and above. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Set up the MSK cluster

You need to enable the IAM access control authentication scheme for your MSK provisioned cluster and wait until the cluster finishes updating and turns to the Active state. This is because SASL/OAUTHBEARER uses the same broker addresses for IAM authentication.

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. Next, update your application to use the bootstrap server addresses for IAM authentication. You also need to make sure the security group associated with your MSK cluster has an inbound rule allowing the traffic from the client applications in the same VPC as the cluster to the TCP port 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

In this post, we use the JavaScript programming language. We also use https://github.com/tulios/kafkajs as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for JavaScript with your Kafka client library, run the following command:
    npm install https://github.com/aws/aws-msk-iam-sasl-signer-js

  2. You need to import the installed Amazon MSK IAM SASL signer library in your code:
    const { Kafka } = require('kafkajs')
    
    const { generateAuthToken } = require('aws-msk-iam-sasl-signer-js')

  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    async function oauthBearerTokenProvider(region) {
        // Uses AWS Default Credentials Provider Chain to fetch credentials
        const authTokenResponse = await generateAuthToken({ region });
        return {
            value: authTokenResponse.token
        }
    }

  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your JavaScript Kafka client properties, and pass the token provider in the configuration object:
    const run = async () => {
        const kafka = new Kafka({
            clientId: 'my-app',
            brokers: [bootstrap server addresses for IAM],
            ssl: true,
            sasl: {
                mechanism: 'oauthbearer',
                oauthBearerProvider: () => oauthBearerTokenProvider('us-east-1')
            }
        })
    
        const producer = kafka.producer()
        const consumer = kafka.consumer({ groupId: 'test-group' })
    
        // Producing
        await producer.connect()
        await producer.send({
            topic: 'test-topic',
            messages: [
                { value: 'Hello KafkaJS user!' },
            ],
        })
    
        // Consuming
        await consumer.connect()
        await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
    
        await consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                console.log({
                    partition,
                    offset: message.offset,
                    value: message.value.toString(),
                })
            },
        })
    }
    
    run().catch(console.error)

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

IAM access control for Amazon MSK enables you to handle both authentication and authorization for your MSK cluster. This eliminates the need to use one mechanism for authentication and another for authorization. For example, when a client tries to write to your cluster, Amazon MSK uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

With today’s launch, Amazon MSK IAM authentication now supports all programming languages. This means you can connect your applications in all languages without worrying about implementing separate authentication and authorization mechanisms. For workloads that require Amazon MSK multi-VPC private connectivity and cluster policy support, you can now simplify connectivity to your MSK brokers and manage all access to the cluster and topics in one place that is your cluster policy.

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Connect Kafka client applications securely to your Amazon MSK cluster from different VPCs and AWS accounts

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/connect-kafka-client-applications-securely-to-your-amazon-msk-cluster-from-different-vpcs-and-aws-accounts/

You can now use Amazon Managed Streaming for Apache Kafka (Amazon MSK) multi-VPC private connectivity (powered by AWS PrivateLink) and cluster policy support for MSK clusters to simplify connectivity of your Kafka clients to your brokers. Amazon MSK is a fully managed service that makes it easy for you to build and run applications that use Kafka to process streaming data. When you create an MSK cluster, the cluster resources are available to clients within the same Amazon VPC. This allows you to launch the cluster within specific subnets of the VPC, associate it with security groups, and attach IP addresses from your VPC’s address space through elastic network interfaces (ENIs). Network traffic between clients and the cluster stays within the AWS network, with internet access to the cluster not possible by default.

If you have workloads segmented across several VPCs and AWS accounts, there may be scenarios in which you need to make your MSK brokers accessible to Kafka clients across VPCs. With the launch of Amazon MSK multi-VPC private connectivity, you can now privately access your MSK brokers from your client applications in another VPC within the same AWS account or another AWS account without enabling public access or creating and managing your own networking infrastructure for private connectivity. A cluster policy is an AWS Identity and Access Management (IAM) resource-based policy, which is defined for your MSK cluster to provide cross-account IAM principals permissions to set up private connectivity to the cluster.

This post introduces Amazon MSK multi-VPC connectivity and how you can privately access your MSK clusters from your clients in other VPCs. It also shows how to define a cluster policy for your MSK clusters. These new two capabilities simplify configuring cross-VPC network access and setting up permissions needed for Kafka clients to privately connect to MSK brokers in a different account.

Before Amazon MSK multi-VPC connectivity

Before Amazon MSK multi-VPC connectivity, the network admin needed to choose one of the following secure connectivity patterns. Admins had to repeat certain steps for each broker in the cluster.

  • Amazon VPC peering is the simplest networking construct that enables bidirectional connectivity between two VPCs. In this approach, the network admin had to update each VPC with the IP addresses of each broker in the routing tables of all subnets. You can’t use this connectivity pattern when there are overlapping IPv4 or IPv6 CIDR blocks in the VPCs.
  • AWS Transit Gateway provides a highly available and scalable design for connecting VPCs. In this approach, the network admin constantly had to update the routing tables attached to each transit gateway. Unlike VPC peering that can go cross-Region, AWS Transit Gateway is a regional service, but you can use inter-Region peering between transit gateways to route traffic across regions. AWS Transit Gateway has the maximum bandwidth (burst) per Availability Zone per VPC connection (50 Gbps). This could become a challenge for some workloads.
  • AWS PrivateLink is an AWS networking service that provides private access to a specific service instead of all resources within a VPC and without traversing the public internet. It also eliminates the need to expose the entire VPC or subnet, and prevents issues like having to deal with overlapping CIDR blocks between the VPC that hosts the MSK cluster ENIs and the Kafka client VPC. AWS PrivateLink can scale to an unlimited number of VPCs and unlike the other options, traffic here is unidirectional. Because of these benefits, AWS PrivateLink is a popular choice to manage private connectivity. However, this connectivity pattern comes with additional complexity. It requires creating multiple Network Load Balancers (NLBs) per cluster and creating private service endpoints per NLB in the service account. Furthermore, admins had to create private endpoints per private service endpoint, and an Amazon Route 53 alias record per private endpoint in every client account.

The following diagram illustrates the architecture of customer-managed VPC endpoints between different VPCs in different AWS accounts with IAM authentication.

Before multi-vpc connectivity

After Amazon MSK multi-VPC connectivity and cluster policy

You can now enable multi-VPC and cross-account connectivity for your MSK clusters in a few simple steps and pay for what you use. This eliminates the overhead of creating and managing AWS PrivateLink infrastructure. When new brokers are added to a cluster, private connectivity is maintained without the need to make configuration changes, saving you from the overhead and complexity of managing the underlying network infrastructure.

The following diagram illustrates this updated architecture of using Amazon MSK multi-VPC connectivity to connect a client from a different AWS account.

after multi-vpc connectivity

Solution overview

Establishing multi-VPC private connectivity involves turning on this feature for the cluster and configuring the Kafka clients to connect privately to the cluster.

The following are the high-level steps to configure the cluster:

  1. Enable the multi-VPC private connectivity feature for a subset of authentication schemes that are enabled for your MSK cluster.
  2. If a Kafka client is in an AWS account that is different than the cluster, attach a resource-based policy to the MSK cluster to authorize IAM principals for creating cross-account connectivity.
  3. Share the cluster ARN with the IAM principal associated with the Kafka client that needs to create the cross-account access to MSK cluster.

The following are the high-level steps to configure the clients:

  1. Create a managed VPC endpoint for the client VPC that needs to connect privately to the MSK cluster.
  2. Update the VPC endpoint’s security group settings to enable outbound connectivity to the MSK cluster.
  3. Set up the client to use the cluster’s connection string to connect privately to the cluster.

Cluster setup

In this post, we only show the steps for enabling Amazon MSK multi-VPC connectivity for a provisioned cluster.

  1. To enable Amazon MSK multi-VPC connectivity on your existing cluster, choose Turn on multi-VPC connectivity on the Amazon MSK console.
    turn on multi-vpc connectivity
    Note that multi-VPC connectivity cannot be turned on with a cluster that allows unauthenticated access. This is to prevent unauthenticated access from different VPCs.
  2. Select the authentication methods that you allow clients in other VPCs to use.
    The list of authentication methods is populated based on your cluster’s security configuration.
  3. Review the settings and choose Turn on selection. After the multi-VPC connectivity is enabled on your cluster, Amazon MSK will create the NLB and VPC endpoint service infrastructure required for private connectivity. Amazon MSK will vend a new set of bootstrap broker strings that can be used for private connectivity. These can be accessed using the View client info option on the Amazon MSK console. The next step is to provide the IAM principals associated with your clients the permissions to connect privately to your cluster. To do this, you need to attach a cluster policy to the cluster. Turn on selection
  4. Choose Edit cluster policy in the Security section of the cluster details page on the Amazon MSK console.
    The new cluster policy allows for defining a Basic or Advanced cluster policy. With the Basic option, you can simply enter AWS account IDs of your client’s VPCs. This policy allows all allowed principals in those AWS accounts to perform CreateVPCConnection, GetBootstrapBrokers, DescribeCluster, and DescribeClusterV2 actions that are required for creating the cross-VPC connectivity to your cluster. However, in other cases, you may need a more complex policy that allows for additional actions, or principals other than AWS accounts, such as IAM roles, role sessions, IAM users, and more. You can author a cluster policy according to IAM JSON policy guidance and provide that to the cluster in Advanced mode.
  5. Define your cluster policy and choose Save changes.cluster policy

Client setup

On the client side, first you need to attach an identity policy to the IAM principal who wants to create a managed VPC connection. The identity policy must provide permission for creating a managed VPC connection. The necessary permissions are part of the AWS managed policy AmazonMSKFullAccess.

  1. In the other AWS account with the IAM principal you configured, use the new Managed VPC connection page on the Amazon MSK console to create Amazon MSK managed VPC connections.
    A managed VPC connection maps to an AWS PrivateLink endpoint under the hood, and Amazon MSK uses the managed VPC connection to orchestrate private connectivity to the cluster. You simply need to create the managed VPC connection and pay standard AWS PrivateLink charges for the underlying endpoint.Create a connection
  2. Enter the AWS Resource Name (ARN) of the cluster that you want to connect to.
  3. Choose Verify to verify the cluster information and its minimum requirements for cross-connectivity.
  4. Select an authentication method from the provided values.
  5. Choose the VPC ID where your Kafka clients are located, and choose their subnet IDs. You can add more subnets using the Add subnet option.
    The specified client subnet must have Availability Zone IDs that match the cluster’s Availability Zone IDs. This makes sure the clients are located in a same physical Availability Zone as the cluster brokers. Amazon MSK uses the port range 14001:14100 for all authentication methods. You need to select a security group that allows outbound traffic to this port. The following screenshot shows an example.
  6. Review the settings and choose Create connection.Review and create a connection
    The process will take a few minutes.
  7. When it’s complete, you can obtain the clients’ connection string from the details page of your connection.
  8. The next step is to update the outbound rules for the VPC endpoint security group to allow communication to the port range 14001:14100.client setup review

Use the Amazon MSK-managed VPC connection

After you create the managed VPC connection, connecting privately to the cluster is easy. Simply use the new connection string to connect to the cluster. For example, you may connect from an Amazon Elastic Compute Cloud (Amazon EC2) instance on your client VPC. Then run the following command to verify if you can connect and perform actions against the topics in the MSK cluster:

export MSK_VPC=<YOUR CLIENT CONNECTION STRING GOES HERE>
bin/kafka-topics.sh --bootstrap-server $MSK_VPC -command-config /home/ec2-user/kafka/config/client-config.properties –list

console results

IAM authentication

Before the launch of Amazon MSK multi-VPC connectivity, Kafka clients in other AWS accounts who opted in IAM authentication, needed to assume another IAM role in the cluster’s account. To facilitate this, admins had to create multiple IAM roles and write a trust policy that allows authenticated principals from the client’s accounts to assume corresponding roles through the sts:AssumeRole API call. This approach was challenging to scale when the number of VPCs or AWS accounts grew. With the launch of this cluster policy, cross-account access control is now simplified because you can attach a cluster policy to your clusters to specify which cross-account clients have what permissions on resources within the cluster.

This capability allows you to manage all access to the cluster and topics in one place. For example, you can control which IAM principals have write access to certain topics, and which principals can only read from them. Users who are using IAM client authentication can also add permissions for required kafka-cluster actions in the cluster resource policy.

Availability and pricing

You can now use Amazon MSK multi-VPC connectivity in all commercial Regions where Amazon MSK is offered, including China and GovCloud (US) Regions.

You pay $0.006 per GB data processed for private connectivity and $0.0225 per private connectivity hour per authentication scheme in US East (Ohio). Refer to our Pricing page for more details.

Conclusion

With Amazon MSK multi-VPC private connectivity, you can now privately access your MSK brokers from your client applications in another VPC within the same AWS account or another AWS account, with minimal configuration. You no longer have to create, manage, and update multiple networking resources in multiple VPCs, or make Amazon MSK configuration changes to connect your Kafka clients across VPCs and accounts. Amazon MSK creates and manages the resources for you. With Cluster policy support, you can easily provide your cross-account client principals permissions to connect privately to your MSK cluster. Further, if you are using IAM client authentication, you can also leverage the cluster policy to centrally control clients’ permissions to perform operations on the cluster. Use the Amazon MSK multi-VPC connectivity and the cluster policy feature today to simplify your secure connectivity infrastructure.

For further reading on Amazon MSK, visit the official product page and our AWS Documentation.


About the authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

How to choose the right Amazon MSK cluster type for you

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/how-to-choose-the-right-amazon-msk-cluster-type-for-you/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is an AWS streaming data service that manages Apache Kafka infrastructure and operations, making it easy for developers and DevOps managers to run Apache Kafka applications and Kafka Connect connectors on AWS, without the need to become experts in operating Apache Kafka. Amazon MSK operates, maintains, and scales Apache Kafka clusters, provides enterprise-grade security features out of the box, and has built-in AWS integrations that accelerate development of streaming data applications. You can easily get started by creating an MSK cluster using the AWS Management Console with a few clicks.

When creating a cluster, you must choose a cluster type from two options: provisioned or serverless. Choosing the best cluster type for each workload depends on the type of workload and your DevOps preferences. Amazon MSK provisioned clusters offer more flexibility in how you scale, configure, and optimize your cluster. Amazon MSK Serverless, on the other hand, makes scaling, load management, and operation of the cluster easier for you. With MSK Serverless, you can run your applications without having to configure, manage the infrastructure, or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring as well as ensuring an even balance of partition distribution across brokers in the cluster (auto-balancing).

In this post, I examine a use case with the fictitious company AnyCompany, who plans to use Amazon MSK for two applications. They must decide between provisioned or serverless cluster types. I describe a process by which they work backward from the applications’ requirements to find the best MSK cluster type for their workloads, including how the organizational structure and application requirements are relevant in finding the best offering. Lastly, I examine the requirements and their relationship to Amazon MSK features.

Use case

AnyCompany is an enterprise organization that is ready to move two of their Kafka applications to Amazon MSK.

The first is a large ecommerce platform, which is a legacy application that currently uses a self-managed Apache Kafka cluster run in their data centers. AnyCompany wants to migrate this application to the AWS Cloud and use Amazon MSK to reduce maintenance and operations overhead. AnyCompany has a DataOps team that has been operating self-managed Kafka clusters in their data centers for years. AnyCompany wants to continue using the DataOps team to manage the MSK cluster on behalf of the development team. There is very little flexibility for code changes. For example, a few modules of the application require plaintext communication and access to the Apache ZooKeeper cluster that comes with an MSK cluster. The ingress throughput for this application doesn’t fluctuate often. The ecommerce platform only experiences a surge in user activity during special sales events. The DataOps team has a good understanding of this application’s traffic pattern, and are confident that they can optimize an MSK cluster by setting some custom broker-level configurations.

The second application is a new cloud-native gaming application currently in development. AnyCompany hopes to launch this gaming application soon followed by a marketing campaign. Throughput needs for this application are unknown. The application is expected to receive high traffic initially, then user activity should decline gradually. Because the application is going to launch first in the US, traffic during the day is expected to be higher than at night. This application offers a lot of flexibility in terms of Kafka client version, encryption in transit, and authentication. Because this is a cloud-native application, AnyCompany hopes they can delegate full ownership of its infrastructure to the development team.

Solution overview

Let’s examine a process that helps AnyCompany decide between the two Amazon MSK offerings. The following diagram shows this process at a high level.

In the following sections, I explain each step in detail and the relevant information that AnyCompany needs to collect before they make a decision.

Competency in Apache Kafka

AWS recommends a list of best practices to follow when using the Amazon MSK provisioned offering. Amazon MSK provisioned, offers more flexibility so you make scaling decisions based on what’s best for your workloads. For example, you can save on cost by consolidating a group of workloads into a single cluster. You can decide which metrics are important to monitor and optimize your cluster through applying custom configurations to your brokers. You can choose your Apache Kafka version, among different supported versions, and decide when to upgrade to a new version. Amazon MSK takes care of applying your configuration and upgrading each broker in a rolling fashion.

With more flexibility, you have more responsibilities. You need to make sure your cluster is right-sized at any time. You can achieve this by monitoring a set of cluster-level, broker-level, and topic-level metrics to ensure you have enough resources that are needed for your throughput. You also need to make sure the number of partitions assigned to each broker doesn’t exceed the numbers suggested by Amazon MSK. If partitions are unbalanced, you need to even-load them across all brokers. If you have more partitions than recommended, you need to either upgrade brokers to a larger size or increase the number of brokers in your cluster. There are also best practices for the number of TCP connections when using AWS Identity and Access Management (IAM) authentication.

An MSK Serverless cluster takes away the complexity of right-sizing clusters and balancing partitions across brokers. This makes it easy for developers to focus on writing application code.

AnyCompany has an experienced DataOps team who are familiar with scaling operations and best practices for the MSK provisioned cluster type. AnyCompany can use their DataOps team’s Kafka expertise for building automations and easy-to-follow standard procedures on behalf of the ecommerce application team. The gaming development team is an exception, because they are expected to take the full ownership of the infrastructure.

In the following sections, I discuss other steps in the process before deciding which cluster type is right for each application.

Custom configuration

In certain use cases, you need to configure your MSK cluster differently from its default settings. This could be due to your application requirements. For example, AnyCompany’s ecommerce platform requires setting up brokers such that the default retention period for all topics is set to 72 hours. Also, topics should be or auto-created when they are requested and don’t exist.

The Amazon MSK provisioned offering provides a default configuration for brokers, topics, and Apache ZooKeeper nodes. It also allows you to create custom configurations and use them to create new MSK clusters or update existing clusters. An MSK cluster configuration consists of a set of properties and their corresponding values.

MSK Serverless doesn’t allow applying broker-level configuration. This is because AWS takes care of configuring and managing the backend nodes. It takes away the heavy lifting of configuring the broker nodes. You only need to manage your applications’ topics. To learn more, refer to the list of topic-level configurations that MSK Serverless allows you to change.

Unlike the ecommerce platform, AnyCompany’s gaming application doesn’t need broker-level custom configuration. The developers want to set the retention.ms and max.message.bytes per each topic only.

Application requirements

Apache Kafka applications differ in terms of their security; the way they connect, write, or read data; data retention period; and scaling patterns. For example, some applications can only scale vertically, whereas other applications can scale only horizontally. Although a flexible application can work with encryption in transit, a legacy application may only be able to communicate in plaintext format.

Cluster-level quotas

Amazon MSK enforces some quotas to ensure the performance, reliability, and availability of the service for all customers. These quotas are subject to change at any time. To access the latest values for each dimension, refer to Amazon MSK quota. Note that some of the quotas are soft limits and can be increased using a support ticket.

When choosing a cluster type in Amazon MSK, it’s important to understand your application requirements and compare those against quotas in relation with each offering. This makes sure you choose the best cluster type that meets your goals and application’s needs. Let’s examine how you can calculate the throughput you need and other important dimensions you need to compare with Amazon MSK quotas:

  • Number of clusters per account – Amazon MSK may have quotas for how many clusters you can create in a single AWS account. If this is limiting your ability to create more clusters, you can consider creating those in multiple AWS accounts and using secure connectivity patterns to provide access to your applications.
  • Message size – You need to make sure the maximum message size that your producer writes for a single message is lower than the configured size in the MSK cluster. MSK provisioned clusters allow you to change the default value in a custom configuration. If you choose MSK Serverless, check this value in Amazon MSK quota. The average message size is helpful when calculating the total ingress or egress throughput of the cluster, which I demonstrate later in this post.
  • Message rate per second – This directly influences total ingress and egress throughput of the cluster. Total ingress throughput equals the message rate per second multiplied by message size. You need to make sure your producer is configured for optimal throughput by adjusting batch.size and linger.ms properties. If you’re choosing MSK Serverless, you need to make sure you configure your producer to optimal batches with the rate that is lower than its request rate quota.
  • Number of consumer groups – This directly influences the total egress throughput of the cluster. Total egress throughput equals the ingress throughput multiplied by the number of consumer groups. If you’re choosing MSK Serverless, you need to make sure your application can work with these quotas.
  • Maximum number of partitions – Amazon MSK provisioned recommends not exceeding certain limits per broker (depending the broker size). If the number of partitions per broker exceeds the maximum value specified in the previous table, you can’t perform certain upgrade or update operations. MSK Serverless also has a quota of maximum number of partitions per cluster. You can request to increase the quota by creating a support case.

Partition-level quotas

Apache Kafka organizes data in structures called topics. Each topic consists of a single or many partitions. Partitions are the degree of parallelism in Apache Kafka. The data is distributed across brokers using data partitioning. Let’s examine a few important Amazon MSK requirements, and how you can make sure which cluster type works better for your application:

  • Maximum throughput per partition – MSK Serverless automatically balances the partitions of your topic between the backend nodes. It instantly scales when your ingress throughput increases. However, each partition has a quota of how much data it accepts. This is to ensure the data is distributed evenly across all partitions and backend nodes. In an MSK Serverless cluster, you need to create your topic with enough partitions such that the aggregated throughput is equal to the maximum throughput your application requires. You also need to make sure your consumers read data with a rate that is below the maximum egress throughput per partition quota. If you’re using Amazon MSK provisioned, there is no partition-level quota for write and read operations. However, AWS recommends you monitor and detect hot partitions and control how partitions should balance among the broker nodes.
  • Data storage – The amount of time each message is kept in a particular topic directly influences the total amount of storage needed for your cluster. Amazon MSK allows you to manage the retention period at the topic level. MSK provisioned clusters allow broker-level configuration to set the default data retention period. MSK Serverless clusters allow unlimited data retention, but there is a separate quota for the maximum data that can be stored in each partition.

Security

Amazon MSK recommends that you secure your data in the following ways. Availability of the security features varies depending on the cluster type. Before making a decision about your cluster type, check if your preferred security options are supported by your choice of cluster type.

  • Encryption at rest – Amazon MSK integrates with AWS Key Management Service (AWS KMS) to offer transparent server-side encryption. Amazon MSK always encrypts your data at rest. When you create an MSK cluster, you can specify the KMS key that you want Amazon MSK to use to encrypt your data at rest.
  • Encryption in transit – Amazon MSK uses TLS 1.2. By default, it encrypts data in transit between the brokers of your MSK cluster. You can override this default when you create the cluster. For communication between clients and brokers, you must specify one of the following settings:
    • Only allow TLS encrypted data. This is the default setting.
    • Allow both plaintext and TLS encrypted data.
    • Only allow plaintext data.
  • Authentication and authorization – Use IAM to authenticate clients and allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.

Cost of ownership

Amazon MSK helps you avoid spending countless hours and significant resources just managing your Apache Kafka cluster, adding little or no value to your business. With a few clicks on the Amazon MSK console, you can create highly available Apache Kafka clusters with settings and configuration based on Apache Kafka’s deployment best practices. Amazon MSK automatically provisions and runs Apache Kafka clusters. Amazon MSK continuously monitors cluster health and automatically replaces unhealthy nodes with no application downtime. In addition, Amazon MSK secures Apache Kafka clusters by encrypting data at rest and in transit. These capabilities can significantly reduce your Total Cost of Ownership (TCO).

With MSK provisioned clusters, you can specify and then scale cluster capacity to meet your needs. With MSK Serverless clusters, you don’t need to specify or scale cluster capacity. MSK Serverless automatically scales the cluster capacity based on the throughput, and you only pay per GB of data that your producers write to and your consumers read from the topics in your cluster. Additionally, you pay an hourly rate for your serverless clusters and an hourly rate for each partition that you create. The MSK Serverless cluster type generally offers a lower cost of ownership by taking away the cost of engineering resources needed for monitoring, capacity planning, and scaling MSK clusters. However, if your organization has a DataOps team with Kafka competency, you can use this competency to operate optimized MSK provisioned clusters. This allows you to save on Amazon MSK costs by consolidating several Kafka applications into a single cluster. There are a few critical considerations to decide when and how to split your workloads between multiple MSK clusters.

Apache ZooKeeper

Apache ZooKeeper is a service included in Amazon MSK when you create a cluster. It manages the Apache Kafka metadata and acts as a quorum controller for leader elections. Although interacting with ZooKeeper is not a recommended pattern, some Kafka applications have a dependency to connect directly to ZooKeeper. During the migration to Amazon MSK, you may find a few of these applications in your organization. This could be because they use an older version of the Kafka client library or other reasons. For example, applications that help with Apache Kafka admin operations or visibility such as Cruise Control usually need this kind of access.

Before you choose your cluster type, you first need to check which offering provides direct access to the ZooKeeper cluster. As of writing this post, only Amazon MSK provisioned provides direct access to ZooKeeper.

How AnyCompany chooses their cluster types

AnyCompany first needs to collect some important requirements about each of their applications. The following table shows these requirements. The rows marked with an asterisk (*) are calculated based on the values in previous rows.

Dimension Ecommerce Platform Gaming Application
Message rate per second 150,000 1,000
Maximum message size 15 MB 1 MB
Average message size 30 KB 15 KB
* Ingress throughput (average message size * message rate per second) 4.5GBps 15MBps
Number of consumer groups 2 1
* Outgress throughput (ingress throughput * number of consumer groups) 9 GBps 15 MBps
Number of topics 100 10
Average partition per topic 100 5
* Total number of partitions (number of topics * average partition per topic) 10,000 50
* Ingress per partition (ingress throughput / total number of partitions) 450 KBps 300 KBps
* Outgress per partition (outgress throughput / total number of partitions) 900 KBps 300 KBps
Data retention 72 hours 168 hours
* Total storage needed (ingress throughput * retention period in seconds) 1,139.06 TB 1.3 TB
Authentication Plaintext and SASL/SCRAM IAM
Need ZooKeeper access Yes No

For the gaming application, AnyCompany doesn’t want to use their in-house Kafka competency to support an MSK provisioned cluster. Also, the gaming application doesn’t need custom configuration, and its throughput needs are below the quotas set by the MSK Serverless cluster type. In this scenario, an MSK Serverless cluster makes more sense.

For the e-commerce platform, AnyCompany wants to use their Kafka competency. Moreover, their throughput needs exceed the MSK Serverless quotas, and the application requires some broker-level custom configuration. The ecommerce platform also can’t split between multiple clusters. Because of these reasons, AnyCompany chooses the MSK provisioned cluster type in this scenario. Additionally, AnyCompany can save more on cost with the Amazon MSK provisioned pricing model. Their throughput is consistent at most times and AnyCompany wants to use their DataOps team to optimize a provisioned MSK cluster and make scaling decisions based on their own expertise.

Conclusion

Choosing the best cluster type for your applications may seem complicated at first. In this post, I showed a process that helps you work backward from your application’s requirement and the resources available to you. MSK provisioned clusters offer more flexibility in how you scale, configure, and optimize your cluster. MSK Serverless, on the other hand, is a cluster type that makes it easier for you to run Apache Kafka clusters without having to manage compute and storage capacity. I generally recommend you begin with MSK Serverless if your application doesn’t require broker-level custom configurations, and your application throughput needs don’t exceed the quotas for the MSK Serverless cluster type. Sometimes it’s best to split your workloads between multiple MSK Serverless clusters, but if that isn’t possible, you may need to consider an MSK provisioned cluster. To operate an optimized MSK provisioned cluster, you need to have Kafka competency within your organization.

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Split your monolithic Apache Kafka clusters using Amazon MSK Serverless

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/split-your-monolithic-apache-kafka-clusters-using-amazon-msk-serverless/

Today, many companies are building real-time applications to improve their customer experience and get immediate insights from their data before it loses its value. As the result, companies have been facing increasing demand to provide data streaming services such as Apache Kafka for developers. To meet this demand, companies typically start with a small- or medium-sized, centralized Apache Kafka cluster to build a global streaming service. Over time, they scale the capacity of the cluster to match the demand for streaming. They choose to keep a monolithic cluster to simplify staffing and training by bringing all technical expertise in a single place. This approach also has cost benefits because it reduces the technical debt, overall operational costs, and complexity. In a monolithic cluster, the extra capacity is shared among all applications, therefore it usually reduces the overall streaming infrastructure cost.

In this post, I explain a few challenges with a centralized approach, and introduce two strategies for implementing a decentralized approach, using Amazon MSK Serverless. A decentralized strategy enables you to provision multiple Apache Kafka clusters instead of a monolithic one. I discuss how this strategy helps you optimize clusters per your application’s security, storage, and performance needs. I also discuss the benefits of a decentralized model and how to migrate from a monolithic cluster to a multi-cluster deployment model.

MSK Serverless can reduce the overhead and cost of managing Apache Kafka clusters. It automatically provisions and scales compute and storage resources for Apache Kafka clusters and automatically manages cluster capacity. It monitors how the partitions are distributed across the backend nodes and reassigns the partitions automatically when necessary. It integrates with other AWS services such as Amazon CloudWatch, where you can monitor the health of the cluster. The choice of MSK Serverless in this post is deliberate, even though the concepts can be applied to the Amazon MSK provisioned offering as well.

Overview of solution

Apache Kafka is an open-source, high-performance, fault-tolerant, and scalable platform for building real-time streaming data pipelines and applications. Apache Kafka simplifies producing and consuming streaming data by decoupling producers from the consumers. Producers simply interact with a single data store (Apache Kafka) to send their data. Consumers read the continuously flowing data, independent from the architecture or the programming language of the producers.

Apache Kafka is a popular choice for many use cases, such as:

  • Real-time web and log analytics
  • Transaction and event sourcing
  • Messaging
  • Decoupling microservices
  • Streaming ETL (extract, transform, and load)
  • Metrics and log aggregation

Challenges with a monolithic Apache Kafka cluster

Monolithic Apache Kafka saves companies from having to install and maintain multiple clusters in their data centers. However, this approach comes with common disadvantages:

  • The entire streaming capacity is consolidated in one place, making capacity planning difficult and complicated. You typically need more time to plan and reconfigure the cluster. For example, when preparing for sales or large campaign events, it’s hard to predict and calculate an aggregation of needed capacity across all applications. This can also inhibit the growth of your company because reconfiguring a large cluster for a new workload often takes longer than a small cluster.
  • Organizational conflicts may occur regarding the ownership and maintenance of the Apache Kafka cluster, because a monolithic cluster is a shared resource.
  • The Apache Kafka cluster becomes a single point of failure. Any downtime means the outage of all related applications.
  • If you choose to increase Apache Kafka’s resiliency with a multi-datacenter deployment, then you typically must have a cluster with the same size (as large) in the other data center, which is expensive.
  • Maintenance and operation activities, such as version upgrades or installing OS patches, take significantly longer for larger clusters due to the distributed nature of Apache Kafka architecture.
  • A faulty application can impact the reliability of the whole cluster and other applications.
  • Version upgrades have to wait until all applications are tested with the new Apache Kafka version. This limits any application from experimenting with Apache Kafka features quickly.
  • This model makes it difficult to attribute the cost of running the cluster to the applications for chargeback purposes.

The following diagram shows a monolithic Apache Kafka architecture.

diagram shows a monolithic Apache Kafka architecture.

Decentralized model

A decentralized Apache Kafka deployment model involves provisioning, configuring, and maintaining multiple clusters. This strategy generally isn’t preferred because managing multiple clusters requires heavy investments in operational excellence, advanced monitoring, infrastructure as code, security, and hardware procurement in on-premises environments.

However, provisioning decentralized Apache Kafka clusters using MSK Serverless doesn’t require those investments. It can scale the capacity and storage up and down instantly based on the application requirement, adding new workloads or scaling operations without the need for complex capacity planning. It also provides a throughput-based pricing model, and you pay for the storage and throughput that your applications use. Moreover, with MSK Serverless, you no longer need to perform standard maintenance tasks such as Apache Kafka version upgrade, partition reassignments, or OS patching.

With MSK Serverless, you benefit from a decentralized deployment without the operational burden that usually comes with a self-managed Apache Kafka deployment. In this strategy, the DevOps managers don’t have to spend time provisioning, configuring, monitoring, and operating multiple clusters. Instead, they invest more in building more operational tools to onboard more real-time applications.

In the remainder of this post, I discuss different strategies for implementing a decentralized model. Furthermore, I highlight the benefits and challenges of each strategy so you can decide what works best for your organization.

Write clusters and read clusters

In this strategy, write clusters are responsible for ingesting data from the producers. You can add new workloads by creating new topics or creating new MSK Serverless clusters. If you need to scale the size of current workloads, you simply increase the number of partitions of your topics if the ordering isn’t important. MSK Serverless manages the capacity instantly as per the new configuration.

Each MSK Serverless cluster provides up to 200 MBps of write throughput and 400 MBps of read throughput. It also allocates up to 5 MBps of write throughput and 10 MBps of read throughput per partition.

Data consumers within any organization can usually be divided to two main categories:

  • Time-sensitive workloads, which need data with very low latency (such as millisecond or subsecond) and can only tolerate a very short Recovery Time Objective (RTO)
  • Time-insensitive workloads, which can tolerate higher latency (sub-10 seconds to minute-level latency) and longer RTO

Each of these categories also can be further divided into subcategories based on certain conditions, such as data classification, regulatory compliance, or service level agreements (SLAs). Read clusters can be set up according to your business or technical requirements, or even organizational boundaries, which can be used by the specific group of consumers. Finally, the consumers are configured to run against the associated read cluster.

To connect the write clusters to read clusters, a data replication pipeline is necessary. You can build a data replication pipeline many ways. Because MSK Serverless supports the standard Apache Kafka APIs, you can use the standard Apache Kafka tools such as MirrorMaker 2 to set up replications between Apache Kafka clusters.

The following diagram shows the architecture for this strategy.

diagram shows the architecture for this strategy.

This approach has the following benefits:

  • Producers are isolated from the consumers; therefore, your write throughput can scale independently from your read throughput. For example, if you have reached your max read throughput with existing clusters and need to add a new consumer group, you can simply provision a new MSK Serverless cluster and set up replication between the write cluster and the new read cluster.
  • It helps enforce security and regulatory compliance. You can build streaming jobs that can mask or remove the sensitive fields of data events, such as personally identifiable information (PII), while replicating the data.
  • Different clusters can be configured differently in terms of retention. For example, read clusters can be configured with different maximum retention periods, to save on storage cost depending on their requirements.
  • You can prioritize your response time for outages for certain clusters over the others.
  • For implementing increased resiliency, you can have fewer clusters in the backup Region by only replicating the data from the write clusters. Other clusters such as read clusters can be provisioned when the workload failover is invoked. In this model, with the MSK Serverless pricing model, you pay additionally for what you use (lighter replica) in the backup Region.

There are a few important notes to keep in mind when choosing this strategy:

  • It requires setting up multiple replications between clusters, which comes with additional operational and maintenance complexity.
  • Replication tools such as MirrorMaker 2 only support at-least-once processing semantics. This means that during failures and restarts, data events can be duplicated. If you have consumers that can’t tolerate data duplication, I suggest building data pipelines that support the exactly-once processing semantic for replicating the data, such as Apache Flink, instead of using MirrorMaker 2.
  • Because consumers don’t consume data directly from the write clusters, the latency is increased between the writers and the readers.
  • In this strategy, even though there are multiple Apache Kafka clusters, ownership and control still reside with one team, and the resources are in a single AWS account.

Segregating clusters

For some companies, providing access to Apache Kafka through a central data platform can create scaling, ownership, and accountability challenges. Infrastructure teams may not understand the specific business needs of an application, such as data freshness or latency requirements, security, data schemas, or a specific method needed for data ingestion.

You can often reduce these challenges by giving ownership and autonomy to the team who owns the application. You allow them to build and manage their application and needed infrastructure, rather than only being able to use a common central platform. For instance, development teams are responsible for provisioning, configuring, maintaining, and operating their own Apache Kafka clusters. They’re the domain experts of their application requirements, and they can manage their cluster according to their application needs. This reduces overall friction and puts application teams accountable to their advertised SLAs.

As mentioned before, MSK Serverless minimizes the operation and maintenance work associated with Apache Kafka clusters. This enables the autonomous application teams to manage their clusters according to industry best practices, without needing to be an expert in running highly available Apache Kafka clusters on AWS. If the MSK Serverless cluster is provisioned within their AWS account, they also own all the costs associated with operating their applications and the data streaming services.

The following diagram shows the architecture for this strategy.

diagram shows the architecture for this strategy.

This approach has the following benefits:

  • MSK Serverless clusters are managed by different teams; therefore, the overall management work is minimized.
  • Applications are isolated from each other. A faulty application or downtime of a cluster doesn’t impact other applications.
  • Consumers read data directly with low latency from the same cluster where the data is written.
  • Each MSK Serverless cluster scales differently per its write and read throughput.
  • Simple cost attribution means that application teams own their infrastructure and its cost.
  • Total ownership of the streaming infrastructure allows developers to adopt streaming faster and deliver more functionalities. It may also help shorten their response time to failures and outages.

Compared to the previous strategy, this approach has the following disadvantages:

  • It’s difficult to enforce a unified security or regulatory compliance across many teams.
  • Duplicate copies of the same data may be ingested in multiple clusters. This increases the overall cost.
  • To increase resiliency, each team individually needs to set up replications between MSK Serverless clusters.

Moving from a centralized to decentralized strategy

MSK Serverless provides AWS Command Line Interface (AWS CLI) tools and support for AWS CloudFormation templates for provisioning clusters in minutes. You can implement any of the strategies that I mentioned earlier via the methods AWS provides, and migrate your producers and consumers when the new clusters are ready.

The following steps provide further guidance on implementation of these strategies:

  1. Begin by focusing on the current issues with the monolithic Apache Kafka. Next, compare the challenges with the benefits and disadvantages, as listed under each strategy. This helps you decide which strategy serves your company the best.
  2. Identify and document each application’s performance, resiliency, SLA, and ownership requirements separately.
  3. Attempt grouping applications that have similar requirements. For example, you may find a few applications that run batch analytics; therefore, they’re not sensitive to data freshness and also don’t need access to sensitive (or PII) data. If you decide segregating clusters is the right strategy for your company, you may choose to group applications by the team who owns them.
  4. Compare each group of applications’ storage and streaming throughput requirements against the MSK Serverless quotas. This helps you determine whether one MSK Serverless cluster can provide the needed aggregated streaming capacity. Otherwise, further divide larger groups to smaller ones.
  5. Create MSK Serverless clusters per each group you identified earlier via the AWS Management Console, AWS CLI, or CloudFormation templates.
  6. Identify the topics that correspond to each new MSK Serverless cluster.
  7. Choose the best migration pattern to Amazon MSK according to the replication requirements. For example, when you don’t need data transformation, and duplicate data events can be tolerated by applications, you can use Apache Kafka migration tools such as MirrorMaker 2.0.
  8. After you have verified the data is replicating correctly to the new clusters, first restart the consumers against the new cluster. This ensures no data will be lost as the result of the migration.
  9. After the consumers resume processing data, restart the producers against the new cluster, and shut down the replication pipeline you created earlier.

As of this writing, MSK Serverless only supports AWS Identity and Access Management (IAM) for authentication and access control. For more information, refer to Securing Apache Kafka is easy and familiar with IAM Access Control for Amazon MSK. If your applications use other methods supported by Apache Kafka, you need to modify your application code to use IAM Access Control instead or use the Amazon MSK provisioned offering.

Summary

MSK Serverless eliminates operational overhead, including the provisioning, configuration, and maintenance of highly available Apache Kafka. In this post, I showed how splitting Apache Kafka clusters helps improve the security, performance, scalability, and reliability of your overall data streaming services and applications. I also described two main strategies for splitting a monolithic Apache Kafka cluster using MSK Serverless. If you’re using Amazon MSK provisioned offering, these strategies are still relevant when considering moving from a centralized to a decentralized model. You can decide the right strategy depending on your company’s specific needs.

For further reading on Amazon MSK, visit the official product page.


About the Author

About the author Ali AlemiAli Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/common-streaming-data-enrichment-patterns-in-amazon-kinesis-data-analytics-for-apache-flink/

Stream data processing allows you to act on data in real time. Real-time data analytics can help you have on-time and optimized responses while improving overall customer experience.

Apache Flink is a distributed computation framework that allows for stateful real-time data processing. It provides a single set of APIs for building batch and streaming jobs, making it easy for developers to work with bounded and unbounded data. Apache Flink provides different levels of abstraction to cover a variety of event processing use cases.

Amazon Kinesis Data Analytics is an AWS service that provides a serverless infrastructure for running Apache Flink applications. This makes it easy for developers to build highly available, fault tolerant, and scalable Apache Flink applications without needing to become an expert in building, configuring, and maintaining Apache Flink clusters on AWS.

Data streaming workloads often require data in the stream to be enriched via external sources (such as databases or other data streams). For example, assume you are receiving coordinates data from a GPS device and need to understand how these coordinates map with physical geographic locations; you need to enrich it with geolocation data. You can use several approaches to enrich your real-time data in Kinesis Data Analytics depending on your use case and Apache Flink abstraction level. Each method has different effects on the throughput, network traffic, and CPU (or memory) utilization. In this post, we cover these approaches and discuss their benefits and drawbacks.

Data enrichment patterns

Data enrichment is a process that appends additional context and enhances the collected data. The additional data often is collected from a variety of sources. The format and the frequency of the data updates could range from once in a month to many times in a second. The following table shows a few examples of different sources, formats, and update frequency.

Data Format Update Frequency
IP address ranges by country CSV Once a month
Company organization chart JSON Twice a year
Machine names by ID CSV Once a day
Employee information Table (Relational database) A few times a day
Customer information Table (Non-relational database) A few times an hour
Customer orders Table (Relational database) Many times a second

Based on the use case, your data enrichment application may have different requirements in terms of latency, throughput, or other factors. The remainder of the post dives deeper into different patterns of data enrichment in Kinesis Data Analytics, which are listed in the following table with their key characteristics. You can choose the best pattern based on the trade-off of these characteristics.

Enrichment Pattern Latency Throughput Accuracy if Reference Data Changes Memory Utilization Complexity
Pre-load reference data in Apache Flink Task Manager memory Low High Low High Low
Partitioned pre-loading of reference data in Apache Flink state Low High Low Low Low
Periodic Partitioned pre-loading of reference data in Apache Flink state Low High Medium Low Medium
Per-record asynchronous lookup with unordered map Medium Medium High Low Low
Per-record asynchronous lookup from an external cache system Low or Medium (Depending on Cache storage and implementation) Medium High Low Medium
Enriching streams using the Table API Low High High Low – Medium (depending on the selected join operator) Low

Enrich streaming data by pre-loading the reference data

When the reference data is small in size and static in nature (for example, country data including country code and country name), it’s recommended to enrich your streaming data by pre-loading the reference data, which you can do in several ways.

To see the code implementation for pre-loading reference data in various ways, refer to the GitHub repo. Follow the instructions in the GitHub repository to run the code and understand the data model.

Pre-loading of reference data in Apache Flink Task Manager memory

The simplest and also fastest enrichment method is to load the enrichment data into each of the Apache Flink task managers’ on-heap memory. To implement this method, you create a new class by extending the RichFlatMapFunction abstract class. You define a global static variable in your class definition. The variable could be of any type, the only limitation is that it should extend java.io.Serializable—for example, java.util.HashMap. Within the open() method, you define a logic that loads the static data into your defined variable. The open() method is always called first, during the initialization of each task in Apache Flink’s task managers, which makes sure the whole reference data is loaded before the processing begins. You implement your processing logic by overriding the processElement() method. You implement your processing logic and access the reference data by its key from the defined global variable.

The following architecture diagram shows the full reference data load in each task slot of the task manager.

diagram shows the full reference data load in each task slot of the task manager.

This method has the following benefits:

  • Easy to implement
  • Low latency
  • Can support high throughput

However, it has the following disadvantages:

  • If the reference data is large in size, the Apache Flink task manager may run out of memory.
  • Reference data can become stale over a period of time.
  • Multiple copies of the same reference data are loaded in each task slot of the task manager.
  • Reference data should be small to fit in the memory allocated to a single task slot. In Kinesis Data Analytics, each Kinesis Processing Unit (KPU) has 4 GB of memory, out of which 3 GB can be used for heap memory. If ParallelismPerKPU in Kinesis Data Analytics is set to 1, one task slot runs in each task manager, and the task slot can use the whole 3 GB of heap memory. If ParallelismPerKPU is set to a value greater than 1, the 3 GB of heap memory is distributed across multiple task slots in the task manager. If you’re deploying Apache Flink in Amazon EMR or in a self-managed mode, you can tune taskmanager.memory.task.heap.size to increase the heap memory of a task manager.

Partitioned pre-loading of reference data in Apache Flink State

In this approach, the reference data is loaded and kept in the Apache Flink state store at the start of the Apache Flink application. To optimize the memory utilization, first the main data stream is divided by a specified field via the keyBy() operator across all task slots. Furthermore, only the portion of the reference data that corresponds to each task slot is loaded in the state store.

This is achieved in Apache Flink by creating the class PartitionPreLoadEnrichmentData, extending the RichFlatMapFunction abstract class. Within the open method, you override the ValueStateDescriptor method to create a state handle. In the referenced example, the descriptor is named locationRefData, the state key type is String, and the value type is Location. In this code, we use ValueState compared to MapState because we only hold the location reference data for a particular key. For example, when we query Amazon S3 to get the location reference data, we query for the specific role and get a particular location as a value.

In Apache Flink, ValueState is used to hold a specific value for a key, whereas MapState is used to hold a combination of key-value pairs.

This technique is useful when you have a large static dataset that is difficult to fit in memory as a whole for each partition.

The following architecture diagram shows the load of reference data for the specific key for each partition of the stream.

diagram shows the load of reference data for the specific key for each partition of the stream.

For example, our reference data in the sample GitHub code has roles which are mapped to each building. Because the stream is partitioned by roles, only the specific building information per role is required to be loaded for each partition as the reference data.

This method has the following benefits:

  • Low latency.
  • Can support high throughput.
  • Reference data for specific partition is loaded in the keyed state.
  • In Kinesis Data Analytics, the default state store configured is RocksDB. RocksDB can utilize a significant portion of 1 GB of managed memory and 50 GB of disk space provided by each KPU. This provides enough room for the reference data to grow.

However, it has the following disadvantages:

  • Reference data can become stale over a period of time

Periodic partitioned pre-loading of reference data in Apache Flink State

This approach is a fine-tune of the previous technique, where each partitioned reference data is reloaded on a periodic basis to refresh the reference data. This is useful if your reference data changes occasionally.

The following architecture diagram shows the periodic load of reference data for the specific key for each partition of the stream.

diagram shows the periodic load of reference data for the specific key for each partition of the stream.

In this approach, the class PeriodicPerPartitionLoadEnrichmentData is created, extending the KeyedProcessFunction class. Similar to the previous pattern, in the context of the GitHub example, ValueState is recommended here because each partition only loads a single value for the key. In the same way as mentioned earlier, in the open method, you define the ValueStateDescriptor to handle the value state and define a runtime context to access the state.

Within the processElement method, load the value state and attach the reference data (in the referenced GitHub example, buildingNo to the customer data). Also register a timer service to be invoked when the processing time passes the given time. In the sample code, the timer service is scheduled to be invoked periodically (for example, every 60 seconds). In the onTimer method, update the state by making a call to reload the reference data for the specific role.

This method has the following benefits:

  • Low latency.
  • Can support high throughput.
  • Reference data for specific partitions is loaded in the keyed state.
  • Reference data is refreshed periodically.
  • In Kinesis Data Analytics, the default state store configured is RocksDB. Also, 50 GB of disk space provided by each KPU. This provides enough room for the reference data to grow.

However, it has the following disadvantages:

  • If the reference data changes frequently, the application still has stale data depending on how frequently the state is reloaded
  • The application can face load spikes during reload of reference data

Enrich streaming data using per-record lookup

Although pre-loading of reference data provides low latency and high throughput, it may not be suitable for certain types of workloads, such as the following:

  • Reference data updates with high frequency
  • Apache Flink needs to make an external call to compute the business logic
  • Accuracy of the output is important and the application shouldn’t use stale data

Normally, for these types of use cases, developers trade-off high throughput and low latency for data accuracy. In this section, you learn about a few of common implementations for per-record data enrichment and their benefits and disadvantages.

Per-record asynchronous lookup with unordered map

In a synchronous per-record lookup implementation, the Apache Flink application has to wait until it receives the response after sending every request. This causes the processor to stay idle for a significant period of processing time. Instead, the application can send a request for other elements in the stream while it waits for the response for the first element. This way, the wait time is amortized across multiple requests and therefore it increases the process throughput. Apache Flink provides asynchronous I/O for external data access. While using this pattern, you have to decide between unorderedWait (where it emits the result to the next operator as soon as the response is received, disregarding the order of the element on the stream) and orderedWait (where it waits until all inflight I/O operations complete, then sends the results to the next operator in the same order as original elements were placed on the stream). Usually, when downstream consumers disregard the order of the elements in the stream, unorderedWait provides better throughput and less idle time. Visit Enrich your data stream asynchronously using Kinesis Data Analytics for Apache Flink to learn more about this pattern.

The following architecture diagram shows how an Apache Flink application on Kinesis Data Analytics does asynchronous calls to an external database engine (for example Amazon DynamoDB) for every event in the main stream.

diagram shows how an Apache Flink application on Kinesis Data Analytics does asynchronous calls to an external database engine (for example Amazon DynamoDB) for every event in the main stream.

This method has the following benefits:

  • Still reasonably simple and easy to implement
  • Reads the most up-to-date reference data

However, it has the following disadvantages:

  • It generates a heavy read load for the external system (for example, a database engine or an external API) that hosts the reference data
  • Overall, it might not be suitable for systems that require high throughput with low latency

Per-record asynchronous lookup from an external cache system

A way to enhance the previous pattern is to use a cache system to enhance the read time for every lookup I/O call. You can use Amazon ElastiCache for caching, which accelerates application and database performance, or as a primary data store for use cases that don’t require durability like session stores, gaming leaderboards, streaming, and analytics. ElastiCache is compatible with Redis and Memcached.

For this pattern to work, you must implement a caching pattern for populating data in the cache storage. You can choose between a proactive or reactive approach depending your application objectives and latency requirements. For more information, refer to Caching patterns.

The following architecture diagram shows how an Apache Flink application calls to read the reference data from an external cache storage (for example, Amazon ElastiCache for Redis). Data changes must be replicated from the main database (for example, Amazon Aurora) to the cache storage by implementing one of the caching patterns.

diagram shows how an Apache Flink application calls to read the reference data from an external cache storage (for example, Amazon ElastiCache for Redis). Data changes must be replicated from the main database (for example, Amazon Aurora) to the cache storage by implementing one of the caching patterns.

Implementation for this data enrichment pattern is similar to the per-record asynchronous lookup pattern; the only difference is that the Apache Flink application makes a connection to the cache storage, instead of connecting to the primary database.

This method has the following benefits:

  • Better throughput because caching can accelerate application and database performance
  • Protects the primary data source from the read traffic created by the stream processing application
  • Can provide lower read latency for every lookup call
  • Overall, might not be suitable for medium to high throughput systems that want to improve data freshness

However, it has the following disadvantages:

  • Additional complexity of implementing a cache pattern for populating and syncing the data between the primary database and the cache storage
  • There is a chance for the Apache Flink stream processing application to read stale reference data depending on what caching pattern is implemented
  • Depending on the chosen cache pattern (proactive or reactive), the response time for each enrichment I/O may differ, therefore the overall processing time of the stream could be unpredictable

Alternatively, you can avoid these complexities by using the Apache Flink JDBC connector for Flink SQL APIs. We discuss enrichment stream data via Flink SQL APIs in more detail later in this post.

Enrich stream data via another stream

In this pattern, the data in the main stream is enriched with the reference data in another data stream. This pattern is good for use cases in which the reference data is updated frequently and it’s possible to perform change data capture (CDC) and publish the events to a data streaming service such as Apache Kafka or Amazon Kinesis Data Streams. This pattern is useful in the following use cases, for example:

  • Customer purchase orders are published to a Kinesis data stream, and then join with customer billing information in a DynamoDB stream
  • Data events captured from IoT devices should enrich with reference data in a table in Amazon Relational Database Service (Amazon RDS)
  • Network log events should enrich with the machine name on the source (and the destination) IP addresses

The following architecture diagram shows how an Apache Flink application on Kinesis Data Analytics joins data in the main stream with the CDC data in a DynamoDB stream.

diagram shows how an Apache Flink application on Kinesis Data Analytics joins data in the main stream with the CDC data in a DynamoDB stream.

To enrich streaming data from another stream, we use a common stream to stream join patterns, which we explain in the following sections.

Enrich streams using the Table API

Apache Flink Table APIs provide higher abstraction for working with data events. With Table APIs, you can define your data stream as a table and attach the data schema to it.

In this pattern, you define tables for each data stream and then join those tables to achieve the data enrichment goals. Apache Flink Table APIs support different types of join conditions, like inner join and outer join. However, you want to avoid those if you’re dealing with unbounded streams because those are resource intensive. To limit the resource utilization and run joins effectively, you should use either interval or temporal joins. An interval join requires one equi-join predicate and a join condition that bounds the time on both sides. To better understand how to implement an interval join, refer to Get started with Apache Flink SQL APIs in Kinesis Data Analytics Studio.

Compared to interval joins, temporal table joins don’t work with a time period within which different versions of a record are kept. Records from the main stream are always joined with the corresponding version of the reference data at the time specified by the watermark. Therefore, fewer versions of the reference data remain in the state.

Note that the reference data may or may not have a time element associated with it. If it doesn’t, you may need to add a processing time element for the join with the time-based stream.

In the following example code snippet, the update_time column is added to the currency_rates reference table from the change data capture metadata such as Debezium. Furthermore, it’s used to define a watermark strategy for the table.

CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
        WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);

This method has the following benefits:
  • Easy to implement
  • Low latency
  • Can support high throughput when reference data is a data stream

SQL APIs provide higher abstractions over how the data is processed. For more complex logic around how the join operator should process, we recommend you always start with SQL APIs first and use DataStream APIs if you really need to.

Conclusion

In this post, we demonstrated different data enrichment patterns in Kinesis Data Analytics. You can use these patterns and find the one that addresses your needs and quickly develop a stream processing application.

For further reading on Kinesis Data Analytics, visit the official product page.


About the Authors

About the author Ali AlemiAli Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

About the author Subham RakshitSubham Rakshit is a Streaming Specialist Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build search and streaming data platforms that help them achieve their business objective. Outside of work, he enjoys spending time solving jigsaw puzzles with his daughter.

About the author Dr. Sam MokhtariDr. Sam Mokhtari is a Senior Solutions Architect in AWS. His main area of depth is data and analytics, and he has published more than 30 influential articles in this field. He is also a respected data and analytics advisor who led several large-scale implementation projects across different industries, including energy, health, telecom, and transport.

Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/create-a-low-latency-source-to-data-lake-pipeline-using-amazon-msk-connect-apache-flink-and-apache-hudi/

During the recent years, there has been a shift from monolithic to the microservices architecture. The microservices architecture makes applications easier to scale and quicker to develop, enabling innovation and accelerating time to market for new features. However, this approach causes data to live in different silos, which makes it difficult to perform analytics. To gain deeper and richer insights, you should bring all your data from different silos into one place.

AWS offers replication tools such as AWS Database Migration Service (AWS DMS) to replicate data changes from a variety of source databases to various destinations including Amazon Simple Storage Service (Amazon S3). But customers who need to sync the data in a data lake with updates and deletes on the source systems still face a few challenges:

  • It’s difficult to apply record-level updates or deletes when records are stored in open data format files (such as JSON, ORC, or Parquet) on Amazon S3.
  • In streaming use cases, where jobs need to write data with low latency, row-based formats such as JSON and Avro are best suited. However, scanning many small files with these formats degrades the read query performance.
  • In use cases where the schema of the source data changes frequently, maintaining the schema of the target datasets via custom code is difficult and error-prone.

Apache Hudi provides a good way to solve these challenges. Hudi builds indexes when it writes the records for the first time. Hudi uses these indexes to locate the files to which an update (or delete) belongs. This enables Hudi to perform fast upsert (or delete) operations by avoiding the need to scan the whole dataset. Hudi provides two table types, each optimized for certain scenarios:

  • Copy-On-Write (COW) – These tables are common for batch processing. In this type, data is stored in a columnar format (Parquet), and each update (or delete) creates a new version of files during the write.
  • Merge-On-Read (MOR) – Stores Data using a combination of columnar (for example Parquet) and row-based (for example Avro) file formats and is intended to expose near-real time data.

Hudi datasets stored in Amazon S3 provide native integration with other AWS services. For example, you can write Apache Hudi tables using AWS Glue (see Writing to Apache Hudi tables using AWS Glue Custom Connector) or Amazon EMR (see New features from Apache Hudi available in Amazon EMR). Those approaches require having a deep understanding of Hudi’s Spark APIs and programming skills to build and maintain data pipelines.

In this post, I show you a different way of working with streaming data with minimum coding. The steps in this post demonstrate how to build fully scalable pipelines using SQL language without prior knowledge of Flink or Hudi. You can query and explore your data in multiple data streams by writing familiar SELECT queries. You can join the data from multiple streams and materialize the result to a Hudi dataset on Amazon S3.

Solution overview

The following diagram provides an overall architecture of the solution described in this post. I describe the components and steps fully in the sections that follow.

You use an Amazon Aurora MySQL database as the source and a Debezium MySQL connector with the setup described in the MSK Connect lab as the change data capture (CDC) replicator. This lab walks you through the steps to set up the stack for replicating an Aurora database salesdb to an Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster, using Amazon MSK Connect with a MySql Debezium source Kafka connector.

In September 2021, AWS announced MSK Connect for running fully managed Kafka Connect clusters. With a few clicks, MSK Connect allows you to easily deploy, monitor, and scale connectors that move data in and out of Apache Kafka and MSK clusters from external systems such as databases, file systems, and search indexes. You can now use MSK Connect for building a full CDC pipeline from many database sources to your MSK cluster.

Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data. When you use Apache Kafka, you capture real-time data from sources such as database change events or website clickstreams. Then you build pipelines (using stream processing frameworks such as Apache Flink) to deliver them to destinations such as a persistent storage or Amazon S3.

Apache Flink is a popular framework for building stateful streaming and batch pipelines. Flink comes with different levels of abstractions to cover a broad range of use cases. See Flink Concepts for more information.

Flink also offers different deployment modes depending on which resource provider you choose (Hadoop YARN, Kubernetes, or standalone). See Deployment for more information.

In this post, you use the SQL Client tool as an interactive way of authoring Flink jobs in SQL syntax. sql-client.sh compiles and submits jobs to a long-running Flink cluster (session mode) on Amazon EMR. Depending on the script, sql-client.sh either shows the tabular formatted output of the job in real time, or returns a job ID for long-running jobs.

You implement the solution with the following high-level steps:

  1. Create an EMR cluster.
  2. Configure Flink with Kafka and Hudi table connectors.
  3. Develop your real-time extract, transform, and load (ETL) job.
  4. Deploy your pipeline to production.

Prerequisites

This post assumes you have a running MSK Connect stack in your environment with the following components:

  • Aurora MySQL hosting a database. In this post, you use the example database salesdb.
  • The Debezium MySQL connector running on MSK Connect, ending in Amazon MSK in your Amazon Virtual Private Cloud (Amazon VPC).
  • An MSK cluster running within in a VPC.

If you don’t have an MSK Connect stack, follow the instructions in the MSK Connect lab setup and verify that your source connector replicates data changes to the MSK topics.

You also need the ability to connect directly to the EMR leader node. Session Manager is a feature of AWS Systems Manager that provides you with an interactive one-click browser-based shell window. Session Manager also allows you to comply with corporate policies that require controlled access to managed nodes. See Setting up Session Manager to learn how to connect to your managed nodes in your account via this method.

If Session Manager is not an option, you can also use Amazon Elastic Compute Cloud (Amazon EC2) private key pairs, but you’ll need to launch the cluster in a public subnet and provide inbound SSH access. See Connect to the master node using SSH for more information.

Create an EMR cluster

The latest released version of Apache Hudi is 0.10.0, at the time of writing. Hudi release version 0.10.0 is compatible with Flink release version 1.13. You need Amazon EMR release version emr-6.4.0 and later, which comes with Flink release version 1.13. To launch a cluster with Flink installed using the AWS Command Line Interface (AWS CLI), complete the following steps:

  1. Create a file, configurations.json, with the following content:
    [
        {
          "Classification": "flink-conf",
          "Properties": {
            "taskmanager.numberOfTaskSlots":"4"
          }
        }
    ]

  2. Create an EMR cluster in a private subnet (recommended) or in a public subnet of the same VPC as where you host your MSK cluster. Enter a name for your cluster with the --name option, and specify the name of your EC2 key pair as well as the subnet ID with the --ec2-attributes option. See the following code:
    aws emr create-cluster --release-label emr-6.4.0 \
    --applications Name=Flink \
    --name FlinkHudiCluster \
    --configurations file://./configurations.json \
    --region us-east-1 \
    --log-uri s3://yourLogUri \
    --instance-type m5.xlarge \
    --instance-count 2 \
    --service-role EMR_DefaultRole \ 
    --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole, SubnetId=A SubnetID of Amazon MSK VPC 

  3. Wait until the cluster state changes to Running.
  4. Retrieve the DNS name of the leader node using either the Amazon EMR console or the AWS CLI.
  5. Connect to the leader node via Session Manager or using SSH and an EC2 private key on Linux, Unix, and Mac OS X.
  6. When connecting using SSH, port 22 must be allowed by the leader node’s security group.
  7. Make sure the MSK cluster’s security group has an inbound rules that accepts traffic from the EMR cluster’s security groups.

Configure Flink with Kafka and Hudi table connectors

Flink table connectors allow you to connect to external systems when programming your stream operations using Table APIs. Source connectors provide access to streaming services including Kinesis or Apache Kafka as a data source. Sink connectors allow Flink to emit stream processing results to external systems or storage services like Amazon S3.

On your Amazon EMR leader node, download the following connectors and save them in the /lib/flink/lib directory:

  • Source connector – Download flink-connector-kafka_2.11-1.13.1.jar from the Apache repository. The Apache Kafka SQL connector allows Flink to read data from Kafka topics.
  • Sink connector – Amazon EMR release version emr-6.4.0 comes with Hudi release version 0.8.0. However, in this post you need Hudi Flink bundle connector release version 0.10.0, which is compatible with Flink release version 1.13. Download hudi-flink-bundle_2.11-0.10.0.jar from the Apache repository. It also contains multiple file system clients, including S3A for integrating with Amazon S3.

Develop your real-time ETL job

In this post, you use the Debezium source Kafka connector to stream data changes of a sample database, salesdb, to your MSK cluster. Your connector produces data changes in JSON. See Debezium Event Deserialization for more details. The Flink Kafka connector can deserialize events in JSON format by setting value.format with debezium-json in the table options. This configuration provides the full support for data updates and deletes, in addition to inserts.

You build a new job using Flink SQL APIs. These APIs allow you to work with the streaming data, similar to tables in relational databases. SQL queries specified in this method run continuously over the data events in the source stream. Because the Flink application consumes unbounded data from a stream, the output constantly changes. To send the output to another system, Flink emits update or delete events to the downstream sink operators. Therefore, when you work with CDC data or write SQL queries where the output rows need to update or delete, you must provide a sink connector that supports these actions. Otherwise, the Flink job ends with an error with the following message:

Target Table doesn't support consuming update or delete changes which is produced by {your query statement} …

Launch the Flink SQL client

Start a Flink YARN application on your EMR cluster with the configurations you previously specified in the configurations.json file:

cd /lib/flink && ./bin/yarn-session.sh --detached

After the command runs successfully, you’re ready to write your first job. Run the following command to launch sql-client:

./bin/sql-client.sh

Your terminal window looks like the following screenshot.

Set the job parameters

Run the following command to set the checkpointing interval for this session:

SET execution.checkpointing.interval = 1min;

Define your source tables

Conceptually, processing streams using SQL queries requires interpreting the events as logical records in a table. Therefore, the first step before reading or writing the data with SQL APIs is to create source and target tables. The table definition includes the connection settings and configuration along with a schema that defines the structure and the serialization format of the objects in the stream.

In this post, you create three source tables. Each corresponds to a topic in Amazon MSK. You also create a single target table that writes the output data records to a Hudi dataset stored on Amazon S3.

Replace BOOTSTRAP SERVERS ADDRESSES with your own Amazon MSK cluster information in the 'properties.bootstrap.servers' option and run the following commands in your sql-client terminal:

CREATE TABLE CustomerKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `NAME` STRING,
      `MKTSEGMENT` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER', -- created by debezium connector, corresponds to CUSTOMER table in Amazon Aurora database. 
      'properties.bootstrap.servers' = '<PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup1',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

CREATE TABLE CustomerSiteKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `SITE_ID` BIGINT,
      `STATE` STRING,
      `CITY` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER_SITE',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup2',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

CREATE TABLE SalesOrderAllKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `ORDER_ID` BIGINT,
      `SITE_ID` BIGINT,
      `ORDER_DATE` BIGINT,
      `SHIP_MODE` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.SALES_ORDER_ALL',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup3',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

By default, sql-client stores these tables in memory. They only live for the duration of the active session. Anytime your sql-client session expires, or you exit, you need to recreate your tables.

Define the sink table

The following command creates the target table. You specify 'hudi' as the connector in this table. The rest of the Hudi configurations are set in the with(…) section of the CREATE TABLE statement. See the full list of Flink SQL configs to learn more. Replace S3URI OF HUDI DATASET LOCATION with your Hudi dataset location in Amazon S3 and run the following code:

CREATE TABLE CustomerHudi (
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3),
      PRIMARY KEY (`customer_id`) NOT Enforced
    )
    PARTITIONED BY (`mktsegment`)
    WITH (
      'connector' = 'hudi',
      'write.tasks' = '4',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
      'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    );

Verify the Flink job’s results from multiple topics

For select queries, sql-client submits the job to a Flink cluster, then displays the results on the screen in real time. Run the following select query to view your Amazon MSK data:

SELECT Count(O.order_id) AS order_count,
       C.cust_id,
       C.NAME,
       C.mktsegment
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment; 

This query joins three streams and aggregates the count of customer orders, grouped by each customer record. After a few seconds, you should see the result in your terminal. Note how the terminal output changes as the Flink job consumes more events from the source streams.

Sink the result to a Hudi dataset

To have a complete pipeline, you need to send the result to a Hudi dataset on Amazon S3. To do that, add an insert into CustomerHudi statement in front of the select query:

INSERT INTO customerhudi
SELECT Count(O.order_id),
       C.cust_id,
       C.NAME,
       C.mktsegment,
       Proctime()
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment;

This time, the sql-client disconnects from the cluster after submitting the job. The client terminal doesn’t have to wait for the results of the job as it sinks its results to a Hudi dataset. The job continues to run on your Flink cluster even after you stop the sql-client session.

Wait a few minutes until the job generates Hudi commit log files to Amazon S3. Then navigate to the location in Amazon S3 you specified for your CustomerHudi table, which contains a Hudi dataset partitioned by MKTSEGMENT column. Within each partition you also find Hudi commit log files. This is because you defined the table type as MERGE_ON_READ. In this mode with the default configurations, Hudi merges commit logs to larger Parquet files after five delta commit logs occur. Refer to Table & Query Types for more information. You can change this setup by changing the table type to COPY_ON_WRITE or specifying your custom compaction configurations.

Query the Hudi dataset

You may also use a Hudi Flink connector as a source connector to read from a Hudi dataset stored on Amazon S3. You do that by running a select statement against the CustomerHudi table, or create a new table with hudi specified for connector. The path must point to an existing Hudi dataset’s location on Amazon S3. Replace S3URI OF HUDI DATASET LOCATION with your location and run the following command to create a new table:

CREATE TABLE `CustomerHudiReadonly` (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3),
      PRIMARY KEY (`customer_id`) NOT Enforced
    )
    PARTITIONED BY (`mktsegment`)
    WITH (
      'connector' = 'hudi',
      'hoodie.datasource.query.type' = 'snapshot',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
     'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    );

Note the additional column names prefixed with _hoodie_. These columns are added by Hudi during the write to maintain the metadata of each record. Also note the extra 'hoodie.datasource.query.type'  read configuration passed in the WITH portion of the table definition. This makes sure you read from the real-time view of your Hudi dataset. Run the following command:

Select * from CustomerHudiReadonly where customer_id <= 5;

The terminal displays the result within 30 seconds. Navigate to the Flink web interface, where you can observe a new Flink job started by the select query (See below for how to find the Flink web interface). It scans the committed files in the Hudi dataset and returns the result to the Flink SQL client.

Use a mysql CLI or your preferred IDE to connect to your salesdb database, which is hosted on Aurora MySQL. Run a few insert statements against the SALES_ORDER_ALL table:

insert into SALES_ORDER_ALL values (29001, 2, now(), 'STANDARD');
insert into SALES_ORDER_ALL values (29002, 2, now(), 'TWO-DAY');
insert into SALES_ORDER_ALL values (29003, 2, now(), 'STANDARD');
insert into SALES_ORDER_ALL values (29004, 2, now(), 'TWO-DAY');
insert into SALES_ORDER_ALL values (29005, 2, now(), 'STANDARD');

After a few seconds, a new commit log file appears in your Hudi dataset on Amazon S3. The Debezium for MySQL Kafka connector captures the changes and produces events to the MSK topic. The Flink application consumes the new events from the topic and updates the customer_count column accordingly. It then sends the changed records to the Hudi connector for merging with the Hudi dataset.

Hudi supports different write operation types. The default operation is upsert, where it initially inserts the records in the dataset. When a record with an existing key arrives in a process, it’s treated as an update. This operation is useful here where you expect to sync your dataset with the source database, and duplicate records are not expected.

Find the Flink web interface

The Flink web interface helps you view a Flink job’s configuration, graph, status, exception errors, resource utilization, and more. To access it, first you need to set up an SSH tunnel and activate a proxy in your browser, to connect to the YARN Resource Manager. After you connect to the Resource Manager, you choose the YARN application that’s hosting your Flink session. Choose the link under the Tracking UI column to navigate to the Flink web interface. For more information, see Finding the Flink web interface.

Deploy your pipeline to production

I recommend using Flink sql-client for quickly building data pipelines in an interactive way. It’s a good choice for experiments, development, or testing your data pipelines. For production environments, however, I recommend embedding your SQL scripts in a Flink Java application and running it on Amazon Kinesis Data Analytics. Kinesis Data Analytics is a fully managed service for running Flink applications; it has built-in auto scaling and fault tolerance features to provide your production applications the availability and scalability they need. A Flink Hudi application with the scripts from this this post is available on GitHub. I encourage you to visit this repo, and compare the differences between running in sql-client and Kinesis Data Analytics.

Clean up

To avoid incurring ongoing charges, complete the following cleanup steps:

  1. Stop the EMR cluster.
  2. Delete the AWS CloudFormation stack you created using the MSK Connect Lab setup.

Conclusion

Building a data lake is the first step to break down data silos and running analytics to gain insights from all your data. Syncing the data between the transactional databases and data files on a data lake isn’t trivial and involves significant effort. Before Hudi added support for Flink SQL APIs, Hudi customers had to have the necessary skills for writing Apache Spark code and running it on AWS Glue or Amazon EMR. In this post, I showed you a new way in which you can interactively explore your data in streaming services using SQL queries, and accelerate the development process for your data pipelines.

To learn more, visit Hudi on Amazon EMR documentation.


About the Author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.