Tag Archives: Amazon Managed Streaming for Apache Kafka (Amazon MSK)

AWS Weekly Roundup: New AWS Heroes, Amazon API Gateway, Amazon Q and more (June 10, 2024)

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-new-aws-heroes-amazon-api-gateway-amazon-q-and-more-june-10-2024/

In the last AWS Weekly Roundup, Channy reminded us on how life has ups and downs. It’s just how life is. But, that doesn’t mean that we should do it alone. Farouq Mousa, AWS Community Builder, is fighting brain cancer and Allen Helton, AWS Serverless Hero, his daughter is fighting leukemia.

If you have a moment, please visit their campaign pages and give your support.

Meanwhile, we’ve just finished a few AWS Summits in India, Korea and also Thailand. As always, I had so much fun working together at Developer Lounge with AWS Heroes, AWS Community Builders, and AWS User Group leaders. Here’s a photo from everyone here.

Last Week’s Launches
Here are some launches that caught my attention last week:

Welcome, new AWS Heroes! — Last week, we just announced new cohort for AWS Heroes, worldwide group of AWS experts who go above and beyond to share knowledge and empower their communities.

Amazon API Gateway increased integration timeout limit — If you’re using Regional REST APIs and private REST APIs in Amazon API Gateway, now you can increase the integration timeout limit greater than 29 seconds. This allows you to run various workloads requiring longer timeouts.

Amazon Q offers inline completion in the command line — Now, Amazon Q Developer provides real-time AI-generated code suggestions as you type in your command line. As a regular command line interface (CLI) user, I’m really excited about this.

New common control library in AWS Audit Manager — This announcement helps you to save time when mapping enterprise controls into AWS Audit Manager. Check out Danilo’s post where he elaborated how that you can simplify risk and complicance assessment with the new common control library.

Amazon Inspector container image scanning for Amazon CodeCatalyst and GitHub actions — If you need to integrate your CI/CD with software vulnerabilities checking, you can use Amazon Inspector. Now, with this native integration in GitHub actions and Amazon CodeCatalyst, it streamlines your development pipeline process.

Ingest streaming data with Amazon OpenSearch Ingestion and Amazon Managed Streaming for Apache Kafka — With this new capability, now you can build more efficient data pipelines for your complex analytics use cases. Now, you can seamlessly index the data from your Amazon MSK Serverless clusters in Amazon OpenSearch service.

Amazon Titan Text Embeddings V2 now available in Amazon Bedrock Knowledge Base — You now can embed your data into a vector database using Amazon Titan Text Embeddings V2. This will be helpful for you to retrieve relevant information for various tasks.

Max tokens 8,192
Languages 100+ in pre-training
Fine-tuning supported No
Normalization supported Yes
Vector size 256, 512, 1,024 (default)

From Community.aws
Here’s my 3 personal favorites posts from community.aws:

Upcoming AWS events
Check your calendars and sign up for these AWS and AWS Community events:

  • AWS Summits — Join free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Register in your nearest city: Japan (June 20), Washington, DC (June 26–27), and New York (July 10).

  • AWS re:Inforce — Join us for AWS re:Inforce (June 10–12) in Philadelphia, PA. AWS re:Inforce is a learning conference focused on AWS security solutions, cloud security, compliance, and identity. Connect with the AWS teams that build the security tools and meet AWS customers to learn about their security journeys.

  • AWS Community Days — Join community-led conferences that feature technical discussions, workshops, and hands-on labs led by expert AWS users and industry leaders from around the world: Midwest | Columbus (June 13), Sri Lanka (June 27), Cameroon (July 13), New Zealand (August 15), Nigeria (August 24), and New York (August 28).

You can browse all upcoming in-person and virtual events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

Donnie

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Introducing support for Apache Kafka on Raft mode (KRaft) with Amazon MSK clusters

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/introducing-support-for-apache-kafka-on-raft-mode-kraft-with-amazon-msk-clusters/

Organizations are adopting Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to capture and analyze data in real time. Amazon MSK helps you build and run production applications on Apache Kafka without needing Kafka infrastructure management expertise or having to deal with the complex overhead associated with setting up and running Apache Kafka on your own. Since its inception, Apache Kafka has depended on Apache Zookeeper for storing and replicating the metadata of Kafka brokers and topics. Starting from Apache Kafka version 3.3, the Kafka community has adopted KRaft (Apache Kafka on Raft), a consensus protocol, to replace Kafka’s dependency on ZooKeeper for metadata management. In the future, the Apache Kafka community plans to remove the ZooKeeper mode entirely.

Today, we’re excited to launch support for KRaft on new clusters on Amazon MSK starting from version 3.7. In this post, we walk you through some details around how KRaft mode helps over the ZooKeeper approach. We also guide you through the process of creating MSK clusters with KRaft mode and how to connect your application to MSK clusters with KRaft mode.

Why was ZooKeeper replaced with KRaft mode

The traditional Kafka architecture relies on ZooKeeper as the authoritative source for cluster metadata. Read and write access to metadata in ZooKeeper is funneled through a single Kafka controller. For clusters with a large number of partitions, this architecture can create a bottleneck during scenarios such as an uncontrolled broker shutdown or controller failover, due to a single-controller approach.

KRaft mode addresses these limitations by managing metadata within the Kafka cluster itself. Instead of relying on a separate ZooKeeper cluster, KRaft mode stores and replicates the cluster metadata across multiple Kafka controller nodes, forming a metadata quorum. The KRaft controller nodes comprise a Raft quorum that manages the Kafka metadata log. By distributing the metadata management responsibilities across multiple controller nodes, KRaft mode improves recovery time for scenarios such as uncontrolled broker shutdown or controller failover. For more details on KRaft mode and its implementation, refer to the KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum.

The following figure compares the three-node MSK cluster architecture with ZooKeeper vs. KRaft mode.

Amazon MSK with KRaft mode

Until now, Amazon MSK has supported Kafka clusters that rely on ZooKeeper for metadata management. One of the key benefits of Amazon MSK is that it handles the complexity of setting up and managing the ZooKeeper cluster at no additional cost. Many organizations use Amazon MSK to run large, business-critical streaming applications that require splitting their traffic across thousands of partitions. As the size of a Kafka cluster grows, the amount of metadata generated within the cluster increases proportionally to the number of partitions.

Two key properties govern the number of partitions a Kafka cluster can support: the per-node partition count limit and the cluster-wide partition limit. As mentioned earlier, the metadata management system based on ZooKeeper imposed a bottleneck on the cluster-wide partition limitation in Apache Kafka. However, with the introduction of KRaft mode in Amazon MSK starting with version 3.7, Amazon MSK now enables the creation of clusters with up to 60 brokers vs. the default quota of 30 brokers in ZooKeeper mode. Kafka’s scalability still fundamentally relies on expanding the cluster by adding more nodes to increase overall capacity. Consequently, the cluster-wide partition limit continues to define the upper bounds of scalability within the Kafka system, because it determines the maximum number of partitions that can be distributed across the available nodes. Amazon MSK manages the KRaft controller nodes at no additional cost.

Create and access an MSK cluster with KRaft mode

Complete the following steps to configure an MSK cluster with KRaft mode:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Choose Create cluster.
  3. For Cluster creation method, select Custom create.
  4. For Cluster name, enter a name.
  5. For Cluster type¸ select Provisioned.
  6. For Apache Kafka version, choose 3.7.x.
  7. For Metadata mode, select KRaft.
  8. Leave the other settings as default and choose Create cluster.

When the cluster creation is successful, you can navigate to the cluster and choose View client integration information, which will provide details about the cluster bootstrap servers.

Adapt your client applications and tools for accessing MSK clusters with KRaft mode

With the adoption of KRaft mode in Amazon MSK, customers using client applications and tools that connect to ZooKeeper to interact with MSK clusters will need to update them to reflect the removal of ZooKeeper from the architecture. Starting with version 1.0, Kafka introduced the ability for admin tools to use the bootstrap servers (brokers) as input parameters instead of a ZooKeeper connection string, and started deprecating ZooKeeper connection strings starting with version 2.5. This change was part of the efforts to decouple Kafka from ZooKeeper and pave the way for its eventual replacement with KRaft mode for metadata management. Instead of specifying the ZooKeeper connection string, clients will need to use the bootstrap.servers configuration option to connect directly to the Kafka brokers. The following table summarizes these changes.

. With Zookeeper With KRaft
Client and Services bootstrap.servers=broker:<port> or zookeeper.connect=zookeeper:2181 (deprecated) bootstrap.servers=broker:<port>
Admin Tools kafka-topics --zookeeper zookeeper:2181 (deprecated) or kafka-topics —bootstrap-server broker:<port> … —command-config kafka-topics —bootstrap-server broker:<port> … —command-config

Summary

In this post, we discussed how Amazon MSK has launched support for KRaft mode for metadata management. We also described how KRaft works and how it’s different from ZooKeeper.

To get started, create a new cluster with KRaft mode using the AWS Management Console, and refer to the Amazon MSK Developer Guide for more information.


About the author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Safely remove Kafka brokers from Amazon MSK provisioned clusters

Post Syndicated from Vidhi Taneja original https://aws.amazon.com/blogs/big-data/safely-remove-kafka-brokers-from-amazon-msk-provisioned-clusters/

Today, we are announcing broker removal capability for Amazon Managed Streaming for Apache Kafka (Amazon MSK) provisioned clusters, which lets you remove multiple brokers from your provisioned clusters. You can now reduce your cluster’s storage and compute capacity by removing sets of brokers, with no availability impact, data durability risk, or disruption to your data streaming applications. Amazon MSK is a fully managed Apache Kafka service that makes it easy for developers to build and run highly available, secure, and scalable streaming applications. Administrators can optimize the costs of their Amazon MSK clusters by reducing broker count and adapting the cluster capacity to the changes in the streaming data demand, without affecting their clusters’ performance, availability, or data durability.

You can use Amazon MSK as a core foundation to build a variety of real-time streaming applications and high-performance event-driven architectures. As business needs and traffic patterns change, cluster capacity is often adjusted to optimize costs. Amazon MSK provides flexibility and elasticity for administrators to right-size MSK clusters. You can increase broker count or the broker size to manage the surge in traffic during peak events or decrease the instance size of brokers of the cluster to reduce capacity. However, to reduce the broker count, earlier you had to undertake effort-intensive migration to another cluster.

With the broker removal capability, you can now remove multiple brokers from your provisioned clusters to meet the varying needs of your streaming workloads. During and post broker removal, the cluster continues to handle read and write requests from the client applications. MSK performs the necessary validations to safeguard against data durability risks and gracefully removes the brokers from the cluster. By using broker removal capability, you can precisely adjust MSK cluster capacity, eliminating the need to change the instance size of every broker in the cluster or having to migrate to another cluster to reduce broker count.

How the broker removal feature works

Before you execute the broker removal operation, you must make some brokers eligible for removal by moving all partitions off of them. You can use Kafka admin APIs or Cruise Control to move partitions to other brokers that you intend to retain in the cluster.

You choose which brokers to remove and move the partitions from those brokers to other brokers using Kafka tools. Alternatively, you may have brokers that are not hosting any partitions. Then use Edit number of brokers feature using the AWS Management Console, or the Amazon MSK API UpdateBrokerCount. Here are details on how you can use this new feature:

  • You can remove a maximum of one broker per Availability Zone (AZ) in a single broker removal operation. To remove more brokers, you can call multiple broker removal operations consecutively after the prior operation has been completed. You must retain at least one broker per AZ in your MSK cluster.
  • The target number of broker nodes in the cluster must be a multiple of the number of availability zones (AZs) in the client subnets parameter. For example, a cluster with subnets in two AZs must have a target number of nodes that is a multiple of two.
  • If the brokers you removed were present in the bootstrap broker string, MSK will perform the necessary routing so that the client’s connectivity to the cluster is not disrupted. You don’t need to make any client changes to change your bootstrap strings.
  • You can add brokers back to your cluster anytime using AWS Console, or the UpdateBrokerCount API.
  • Broker removal is supported on Kafka versions 2.8.1 and above. If you have clusters in lower versions, you must first upgrade to version 2.8.1 or above and then remove brokers.
  • Broker removal doesn’t support the t3.small instance type.
  • You will stop incurring costs for the removed brokers once the broker removal operation is completed successfully.
  • When brokers are removed from a cluster, their associated local storage is removed as well.

Considerations before removing brokers

Removing brokers from an existing Apache Kafka cluster is a critical operation that needs careful planning to avoid service disruption. When deciding how many brokers you should remove from the cluster, determine your cluster’s minimum broker count by considering your requirements around availability, durability, local data retention, and partition count. Here are a few things you should consider:

  • Check Amazon CloudWatch BytesInPerSec and BytesOutPerSec metrics for your cluster. Look for the peak load over a period of 1 month. Use this data with MSK sizing Excel file to identify how many brokers you need to handle your peak load. If the number of brokers listed in the Excel file is higher than the number of brokers that would remain after removing brokers, do not proceed with this operation. This indicates that removing brokers would result in too few brokers for the cluster, which can lead to availability impact for your cluster or applications.
  • Check UserPartitionExists metrics to verify that you have at least 1 empty broker per AZ in your cluster. If not, make sure to remove partitions from at least one broker per AZ before invoking the operation.
  • If you have more than one broker per AZ with no user partitions on them, MSK will randomly pick one of those during the removal operation.
  • Check the PartitionCount metrics to know the number of partitions that exist on your cluster. Check per broker partition limit. The broker removal feature will not allow the removal of brokers if the service detects that any brokers in the cluster have breached the partition limit. In that case, check if any unused topics could be removed instead to free up broker resources.
  • Check if the estimated storage in the Excel file exceeds the currently provisioned storage for the cluster. In that case, first provision additional storage on that cluster. If you are hitting per-broker storage limits, consider approaches like using MSK tiered storage or removing unused topics. Otherwise, avoid moving partitions to just a few brokers as that may lead to a disk full issue.
  • If the brokers you are planning to remove host partitions, make sure those partitions are reassigned to other brokers in the cluster. Use the kafka-reassign-partitions.sh tool or Cruise Control to initiate partition reassignment. Monitor the progress of reassignment to completion. Disregard the __amazon_msk_canary, __amazon_msk_canary_state internal topics, because they are managed by the service and will be automatically removed by MSK while executing the operation.
  • Verify the cluster status is Active, before starting the removal process.
  • Check the performance of the workload on your production environment after you move those partitions. We recommend monitoring this for a week before you remove the brokers to make sure that the other brokers in your cluster can safely handle your traffic patterns.
  • If you experience any impact on your applications or cluster availability after removing brokers, you can add the same number of brokers that you removed earlier by using the UpdateBrokerCount API, and then reassign partitions to the newly added brokers.
  • We recommend you test the entire process in a non-production environment, to identify and resolve any issues before making changes in the production environment.

Conclusion

Amazon MSK’s new broker removal capability provides a safe way to reduce the capacity of your provisioned Apache Kafka clusters. By allowing you to remove brokers without impacting availability, data durability, or disrupting your streaming applications, this feature enables you to optimize costs and right-size your MSK clusters based on changing business needs and traffic patterns. With careful planning and by following the recommended best practices, you can confidently use this capability to manage your MSK resources more efficiently.

Start taking advantage of the broker removal feature in Amazon MSK today. Review the documentation and follow the step-by-step guide to test the process in a non-production environment. Once you are comfortable with the workflow, plan and execute broker removal in your production MSK clusters to optimize costs and align your streaming infrastructure with your evolving workload requirements.


About the Authors


Vidhi Taneja is a Principal Product Manager for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She is passionate about helping customers build streaming applications at scale and derive value from real-time data. Before joining AWS, Vidhi worked at Apple, Goldman Sachs and Nutanix in product management and engineering roles. She holds an MS degree from Carnegie Mellon University.


Anusha Dasarakothapalli is a Principal Software Engineer for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She started her software engineering career with Amazon in 2015 and worked on products such as S3-Glacier and S3 Glacier Deep Archive, before transitioning to MSK in 2022. Her primary areas of focus lie in streaming technology, distributed systems, and storage.


Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Nexthink scales to trillions of events per day with Amazon MSK

Post Syndicated from Moe Haidar original https://aws.amazon.com/blogs/big-data/nexthink-scales-to-trillions-of-events-per-day-with-amazon-msk/

Real-time data streaming and event processing present scalability and management challenges. AWS offers a broad selection of managed real-time data streaming services to effortlessly run these workloads at any scale.

In this post, Nexthink shares how Amazon Managed Streaming for Apache Kafka (Amazon MSK) empowered them to achieve massive scale in event processing. Experiencing business hyper-growth, Nexthink migrated to AWS to overcome the scaling limitations of on-premises solutions. With Amazon MSK, Nexthink now seamlessly processes trillions of events per day, reaching over 5 GB per second of aggregated throughput.

In the following sections, Nexthink introduces their product and the need for scalability. They then highlight the challenges of their legacy on-premises application and present their transition to a cloud-centered software as a service (SaaS) architecture powered by Amazon MSK. Finally, Nexthink details the benefits achieved by adopting Amazon MSK.

Nexthink’s need to scale

Nexthink is the leader in digital employee experience (DeX). The company is shaping the future of work by providing IT leaders and C-levels with insights into employees’ daily technology experiences at the device and application level. This allows IT to evolve from reactive problem-solving to proactive optimization.

The Nexthink Infinity platform combines analytics, monitoring, automation, and more to manage the employee digital experience. By collecting device and application events, processing them in real time, and storing them, our platform analyzes data to solve problems and boost experiences for over 15 million employees across five continents.

In just 3 years, Nexthink’s business grew tenfold, and with the introduction of more real-time data our application had to scale from processing 200 MB per second to 5 GB per second and trillions of events daily. To enable this growth, we modernized our application from an on-premises single-tenant monolith to a cloud-based scalable SaaS solution powered by Amazon MSK.

The next sections detail our modernization journey, including the challenges we faced and the benefits we realized with our new cloud-centered, AWS-based architecture.

The on-premises solution and its challenges

Let’s first explore our previous on-premises solution, Nexthink V6, before examining how Amazon MSK addressed its challenges. The following diagram illustrates its architecture.

Nexthink v6

V6 was made up of two monolithic, single-tenant Java and C++ applications that were tightly coupled. The portal was a backend-for-frontend Java application, and the core engine was an in-house C++ in-memory database application that was also handling device connections, data ingestion, aggregation, and querying. By bundling all these functions together, the engine became difficult to manage and improve.

V6 also lacked scalability. Initially supporting 10,000 devices, some new tenants had over 300,000 devices. We reacted by deploying multiple V6 engines per tenant, increasing complexity and cost, hampering user experience, and delaying time to market. This also led to longer proof of concept and onboarding cycles, which hurt the business.

Furthermore, the absence of a streaming platform like Kafka created dependencies between teams through tight HTTP/gRPC coupling. Additionally, teams couldn’t access real-time events before ingestion into the database, limiting feature development. We also lacked a data buffer, risking potential data loss during outages. Such constraints impeded innovation and increased risks.

In summary, although the V6 system served its initial purpose, reinventing it with cloud-centered technologies became imperative to enhance scalability, reliability, and foster innovation by our engineering and product teams.

Transitioning to a cloud-centered architecture with Amazon MSK

To achieve our modernization goals, after thorough research and iterations, we implemented an event-driven microservices design on Amazon Elastic Kubernetes Service (Amazon EKS), using Kafka on Amazon MSK for distributed event storage and streaming.

Our transition from the v6 on-prem solution to the cloud-centered platform was phased over four iterations:

  • Phase 1 – We lifted and shifted from on premises to virtual machines in the cloud, reducing operational complexities and accelerating proof of concept cycles while transparently migrating customers.
  • Phase 2 – We extended the cloud architecture by implementing new product features with microservices and self-managed Kafka on Kubernetes. However, operating Kafka clusters ourselves proved overly difficult, leading us to Phase 3.
  • Phase 3 – We switched from self-managed Kafka to Amazon MSK, improving stability and reducing operational costs. We realized that managing Kafka wasn’t our core competency or differentiator, and the overhead was high. Amazon MSK enabled us to focus on our core application, freeing us from the burden of undifferentiated Kafka management.
  • Phase 4 – Finally, we eliminated all legacy components, completing the transition to a fully cloud-centered SaaS platform. This multi-year journey of learning and transformation took 3 years.

Today, after our successful transition, we use Amazon MSK for two key functions:

  • Real-time data ingestion and processing of trillions of daily events from over 15 million devices worldwide, as illustrated in the following figure.

Nexthink Architecture Ingestion

  • Enabling an event-driven system that decouples data producers and consumers, as depicted in the following figure.

Nexthink Architecture Event Driven

To further enhance our scalability and resilience, we adopted a cell-based architecture using the wide availability of Amazon MSK across AWS Regions. We currently operate over 10 cells, each representing an independent regional deployment of our SaaS solution. This cell-based approach minimizes the area of impact in case of issues, addresses data residency requirements, and enables horizontal scaling across AWS Regions, as illustrated in the following figure.

Nexthink Architecture Cells

Benefits of Amazon MSK

Amazon MSK has been critical in enabling our event-driven design. In this section, we outline the main benefits we gained from its adoption.

Improved data resilience

In our new architecture, data from devices is pushed directly to Kafka topics in Amazon MSK, which provides high availability and resilience. This makes sure that events can be safely received and stored at any time. Our services consuming this data inherit the same resilience from Amazon MSK. If our backend ingestion services face disruptions, no event is lost, because Kafka retains all published messages. When our services resume, they seamlessly continue processing from where they left off, thanks to Kafka’s producer semantics, which allow processing messages exactly-once, at-least-once, or at-most-once based on application needs.

Amazon MSK enables us to tailor the data retention duration to our specific requirements, ranging from seconds to unlimited duration. This flexibility grants uninterrupted data availability to our application, which wasn’t possible with our previous architecture. Furthermore, to safeguard data integrity in the event of processing errors or corruption, Kafka enabled us to implement a data replay mechanism, ensuring data consistency and reliability.

Organizational scaling

By adopting an event-driven architecture with Amazon MSK, we decomposed our monolithic application into loosely coupled, stateless microservices communicating asynchronously via Kafka topics. This approach enabled our engineering organization to scale rapidly from just 4–5 teams in 2019 to over 40 teams and approximately 350 engineers today.

The loose coupling between event publishers and subscribers empowered teams to focus on distinct domains, such as data ingestion, identification services, and data lakes. Teams could develop solutions independently within their domains, communicating through Kafka topics without tight coupling. This architecture accelerated feature development by minimizing the risk of new features impacting existing ones. Teams could efficiently consume events published by others, offering new capabilities more rapidly while reducing cross-team dependencies.

The following figure illustrates the seamless workflow of adding new domains to our system.

Adding domains

Furthermore, the event-driven design allowed teams to build stateless services that could seamlessly auto scale based on MSK metrics like messages per second. This event-driven scalability eliminated the need for extensive capacity planning and manual scaling efforts, freeing up development time.

By using an event-driven microservices architecture on Amazon MSK, we achieved organizational agility, enhanced scalability, and accelerated innovation while minimizing operational overhead.

Seamless infrastructure scaling

Nexthink’s business grew tenfold in 3 years, and many new capabilities were added to the product, leading to a substantial increase in traffic from 200 MB per second to 5 GB per second. This exponential data growth was enabled by the robust scalability of Amazon MSK. Achieving such scale with an on-premises solution would have been challenging and expensive, if not infeasible.

Attempting to self-manage Kafka imposed unnecessary operational overhead without providing business value. Running it with just 5% of today’s traffic was already complex and required two engineers. At today’s volumes, we estimated needing 6–10 dedicated staff, increasing costs and diverting resources away from core priorities.

Real-time capabilities

By channeling all our data through Amazon MSK, we enabled real-time processing of events. This unlocked capabilities like real-time alerts, event-driven triggers, and webhooks that were previously unattainable. As such, Amazon MSK was instrumental in facilitating our event-driven architecture and powering impactful innovations.

Secure data access

Transitioning to our new architecture, we met our security and data integrity goals. With Kafka ACLs, we enforced strict access controls, allowing consumers and producers to only interact with authorized topics. We based these granular data access controls on criteria like data type, domain, and team.

To securely scale decentralized management of topics, we introduced proprietary Kubernetes Custom Resource Definitions (CRDs). These CRDs enabled teams to independently manage their own topics, settings, and ACLs without compromising security.

Amazon MSK encryption made sure that the data remained encrypted at rest and in transit. We also introduced a Bring Your Own Key (BYOK) option, allowing application-level encryption with customer keys for all single-tenant and multi-tenant topics.

Enhanced observability

Amazon MSK gave us great visibility into our data flows. The out-of-the-box Amazon CloudWatch metrics let us see the amount and types of data flowing through each topic and cluster. This helped us quantify the usage of our product features by tracking data volumes at the topic level. The Amazon MSK operational metrics enabled effortless monitoring and right-sizing of clusters and brokers. Overall, the rich observability of Amazon MSK facilitated data-driven decisions about architecture and product features.

Conclusion

Nexthink’s journey from an on-premises monolith to a cloud SaaS was streamlined by using Amazon MSK, a fully managed Kafka service. Amazon MSK allowed us to scale seamlessly while benefiting from enterprise-grade reliability and security. By offloading Kafka management to AWS, we could stay focused on our core business and innovate faster.

Going forward, we plan to further improve performance, costs, and scalability by adopting Amazon MSK capabilities such as tiered storage and AWS Graviton-based EC2 instance types.

We are also working closely with the Amazon MSK team to prepare for upcoming service features. Rapidly adopting new capabilities will help us remain at the forefront of innovation while continuing to grow our business.

To learn more about how Nexthink uses AWS to serve its global customer base, explore the Nexthink on AWS case study. Additionally, discover other customer success stories with Amazon MSK by visiting the Amazon MSK blog category.


About the Authors

Moe HaidarMoe Haidar is a principal engineer and special projects lead @ CTO office of Nexthink. He has been involved with AWS since 2018 and is a key contributor to the cloud transformation of the Nexthink platform to AWS. His focus is on product and technology incubation and architecture, but he also loves doing hands-on activities to keep his knowledge of technologies sharp and up to date. He still contributes heavily to the code base and loves to tackle complex problems.
Simone PomataSimone Pomata is Senior Solutions Architect at AWS. He has worked enthusiastically in the tech industry for more than 10 years. At AWS, he helps customers succeed in building new technologies every day.
Magdalena GargasMagdalena Gargas is a Solutions Architect passionate about technology and solving customer challenges. At AWS, she works mostly with software companies, helping them innovate in the cloud. She participates in industry events, sharing insights and contributing to the advancement of the containerization field.

Exploring real-time streaming for generative AI Applications

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/exploring-real-time-streaming-for-generative-ai-applications/

Foundation models (FMs) are large machine learning (ML) models trained on a broad spectrum of unlabeled and generalized datasets. FMs, as the name suggests, provide the foundation to build more specialized downstream applications, and are unique in their adaptability. They can perform a wide range of different tasks, such as natural language processing, classifying images, forecasting trends, analyzing sentiment, and answering questions. This scale and general-purpose adaptability are what makes FMs different from traditional ML models. FMs are multimodal; they work with different data types such as text, video, audio, and images. Large language models (LLMs) are a type of FM and are pre-trained on vast amounts of text data and typically have application uses such as text generation, intelligent chatbots, or summarization.

Streaming data facilitates the constant flow of diverse and up-to-date information, enhancing the models’ ability to adapt and generate more accurate, contextually relevant outputs. This dynamic integration of streaming data enables generative AI applications to respond promptly to changing conditions, improving their adaptability and overall performance in various tasks.

To better understand this, imagine a chatbot that helps travelers book their travel. In this scenario, the chatbot needs real-time access to airline inventory, flight status, hotel inventory, latest price changes, and more. This data usually comes from third parties, and developers need to find a way to ingest this data and process the data changes as they happen.

Batch processing is not the best fit in this scenario. When data changes rapidly, processing it in a batch may result in stale data being used by the chatbot, providing inaccurate information to the customer, which impacts the overall customer experience. Stream processing, however, can enable the chatbot to access real-time data and adapt to changes in availability and price, providing the best guidance to the customer and enhancing the customer experience.

Another example is an AI-driven observability and monitoring solution where FMs monitor real-time internal metrics of a system and produces alerts. When the model finds an anomaly or abnormal metric value, it should immediately produce an alert and notify the operator. However, the value of such important data diminishes significantly over time. These notifications should ideally be received within seconds or even while it’s happening. If operators receive these notifications minutes or hours after they happened, such an insight is not actionable and has potentially lost its value. You can find similar use cases in other industries such as retail, car manufacturing, energy, and the financial industry.

In this post, we discuss why data streaming is a crucial component of generative AI applications due to its real-time nature. We discuss the value of AWS data streaming services such as Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Firehose in building generative AI applications.

In-context learning

LLMs are trained with point-in-time data and have no inherent ability to access fresh data at inference time. As new data appears, you will have to continuously fine-tune or further train the model. This is not only an expensive operation, but also very limiting in practice because the rate of new data generation far supersedes the speed of fine-tuning. Additionally, LLMs lack contextual understanding and rely solely on their training data, and are therefore prone to hallucinations. This means they can generate a fluent, coherent, and syntactically sound but factually incorrect response. They are also devoid of relevance, personalization, and context.

LLMs, however, have the capacity to learn from the data they receive from the context to more accurately respond without modifying the model weights. This is called in-context learning, and can be used to produce personalized answers or provide an accurate response in the context of organization policies.

For example, in a chatbot, data events could pertain to an inventory of flights and hotels or price changes that are constantly ingested to a streaming storage engine. Furthermore, data events are filtered, enriched, and transformed to a consumable format using a stream processor. The result is made available to the application by querying the latest snapshot. The snapshot constantly updates through stream processing; therefore, the up-to-date data is provided in the context of a user prompt to the model. This allows the model to adapt to the latest changes in price and availability. The following diagram illustrates a basic in-context learning workflow.

A commonly used in-context learning approach is to use a technique called Retrieval Augmented Generation (RAG). In RAG, you provide the relevant information such as most relevant policy and customer records along with the user question to the prompt. This way, the LLM generates an answer to the user question using additional information provided as context. To learn more about RAG, refer to Question answering using Retrieval Augmented Generation with foundation models in Amazon SageMaker JumpStart.

A RAG-based generative AI application can only produce generic responses based on its training data and the relevant documents in the knowledge base. This solution falls short when a near-real-time personalized response is expected from the application. For example, a travel chatbot is expected to consider the user’s current bookings, available hotel and flight inventory, and more. Moreover, the relevant customer personal data (commonly known as the unified customer profile) is usually subject to change. If a batch process is employed to update the generative AI’s user profile database, the customer may receive dissatisfying responses based on old data.

In this post, we discuss the application of stream processing to enhance a RAG solution used for building question answering agents with context from real-time access to unified customer profiles and organizational knowledge base.

Near-real-time customer profile updates

Customer records are typically distributed across data stores within an organization. For your generative AI application to provide a relevant, accurate, and up-to-date customer profile, it is vital to build streaming data pipelines that can perform identity resolution and profile aggregation across the distributed data stores. Streaming jobs constantly ingest new data to synchronize across systems and can perform enrichment, transformations, joins, and aggregations across windows of time more efficiently. Change data capture (CDC) events contain information about the source record, updates, and metadata such as time, source, classification (insert, update, or delete), and the initiator of the change.

The following diagram illustrates an example workflow for CDC streaming ingestion and processing for unified customer profiles.

In this section, we discuss the main components of a CDC streaming pattern required to support RAG-based generative AI applications.

CDC streaming ingestion

A CDC replicator is a process that collects data changes from a source system (usually by reading transaction logs or binlogs) and writes CDC events with the exact same order they occurred in a streaming data stream or topic. This involves a log-based capture with tools such as AWS Database Migration Service (AWS DMS) or open source connectors such as Debezium for Apache Kafka connect. Apache Kafka Connect is part of the Apache Kafka environment, allowing data to be ingested from various sources and delivered to variety of destinations. You can run your Apache Kafka connector on Amazon MSK Connect within minutes without worrying about configuration, setup, and operating an Apache Kafka cluster. You only need to upload your connector’s compiled code to Amazon Simple Storage Service (Amazon S3) and set up your connector with your workload’s specific configuration.

There are also other methods for capturing data changes. For example, Amazon DynamoDB provides a feature for streaming CDC data to Amazon DynamoDB Streams or Kinesis Data Streams. Amazon S3 provides a trigger to invoke an AWS Lambda function when a new document is stored.

Streaming storage

Streaming storage functions as an intermediate buffer to store CDC events before they get processed. Streaming storage provides reliable storage for streaming data. By design, it is highly available and resilient to hardware or node failures and maintains the order of the events as they are written. Streaming storage can store data events either permanently or for a set period of time. This allows stream processors to read from part of the stream if there is a failure or a need for re-processing. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. Amazon MSK is a fully managed, highly available, and secure service provided by AWS for running Apache Kafka.

Stream processing

Stream processing systems should be designed for parallelism to handle high data throughput. They should partition the input stream between multiple tasks running on multiple compute nodes. Tasks should be able to send the result of one operation to the next one over the network, making it possible for processing data in parallel while performing operations such as joins, filtering, enrichment, and aggregations. Stream processing applications should be able to process events with regards to the event time for use cases where events could arrive late or correct computation relies on the time events occur rather than the system time. For more information, refer to Notions of Time: Event Time and Processing Time.

Stream processes continuously produce results in the form of data events that need to be output to a target system. A target system could be any system that can integrate directly with the process or via streaming storage as in intermediary. Depending on the framework you choose for stream processing, you will have different options for target systems depending on available sink connectors. If you decide to write the results to an intermediary streaming storage, you can build a separate process that reads events and applies changes to the target system, such as running an Apache Kafka sink connector. Regardless of which option you choose, CDC data needs extra handling due to its nature. Because CDC events carry information about updates or deletes, it’s important that they merge in the target system in the right order. If changes are applied in the wrong order, the target system will be out of sync with its source.

Apache Flink is a powerful stream processing framework known for its low latency and high throughput capabilities. It supports event time processing, exactly-once processing semantics, and high fault tolerance. Additionally, it provides native support for CDC data via a special structure called dynamic tables. Dynamic tables mimic the source database tables and provide a columnar representation of the streaming data. The data in dynamic tables changes with every event that is processed. New records can be appended, updated, or deleted at any time. Dynamic tables abstract away the extra logic you need to implement for each record operation (insert, update, delete) separately. For more information, refer to Dynamic Tables.

With Amazon Managed Service for Apache Flink, you can run Apache Flink jobs and integrate with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

AWS Glue is a fully managed extract, transform, and load (ETL) service, which means AWS handles the infrastructure provisioning, scaling, and maintenance for you. Although it’s primarily known for its ETL capabilities, AWS Glue can also be used for Spark streaming applications. AWS Glue can interact with streaming data services such as Kinesis Data Streams and Amazon MSK for processing and transforming CDC data. AWS Glue can also seamlessly integrate with other AWS services such as Lambda, AWS Step Functions, and DynamoDB, providing you with a comprehensive ecosystem for building and managing data processing pipelines.

Unified customer profile

Overcoming the unification of the customer profile across a variety of source systems requires the development of robust data pipelines. You need data pipelines that can bring and synchronize all records into one data store. This data store provides your organization with the holistic customer records view that is needed for operational efficiency of RAG-based generative AI applications. For building such a data store, an unstructured data store would be best.

An identity graph is a useful structure for creating a unified customer profile because it consolidates and integrates customer data from various sources, ensures data accuracy and deduplication, offers real-time updates, connects cross-systems insights, enables personalization, enhances customer experience, and supports regulatory compliance. This unified customer profile empowers the generative AI application to understand and engage with customers effectively, and adhere to data privacy regulations, ultimately enhancing customer experiences and driving business growth. You can build your identity graph solution using Amazon Neptune, a fast, reliable, fully managed graph database service.

AWS provides a few other managed and serverless NoSQL storage service offerings for unstructured key-value objects. Amazon DocumentDB (with MongoDB compatibility) is a fast, scalable, highly available, and fully managed enterprise document database service that supports native JSON workloads. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

Near-real-time organizational knowledge base updates

Similar to customer records, internal knowledge repositories such as company policies and organizational documents are siloed across storage systems. This is typically unstructured data and is updated in a non-incremental fashion. The use of unstructured data for AI applications is effective using vector embeddings, which is a technique of representing high dimensional data such as text files, images, and audio files as multi-dimensional numeric.

AWS provides several vector engine services, such as Amazon OpenSearch Serverless, Amazon Kendra, and Amazon Aurora PostgreSQL-Compatible Edition with the pgvector extension for storing vector embeddings. Generative AI applications can enhance the user experience by transforming the user prompt into a vector and use it to query the vector engine to retrieve contextually relevant information. Both the prompt and the vector data retrieved are then passed to the LLM to receive a more precise and personalized response.

The following diagram illustrates an example stream-processing workflow for vector embeddings.

Knowledge base contents need to be converted to vector embeddings before being written to the vector data store. Amazon Bedrock or Amazon SageMaker can help you access the model of your choice and expose a private endpoint for this conversion. Furthermore, you can use libraries such as LangChain to integrate with these endpoints. Building a batch process can help you convert your knowledge base content to vector data and store it in a vector database initially. However, you need to rely on an interval to reprocess the documents to synchronize your vector database with changes in your knowledge base content. With a large number of documents, this process can be inefficient. Between these intervals, your generative AI application users will receive answers according to the old content, or will receive an inaccurate answer because the new content is not vectorized yet.

Stream processing is an ideal solution for these challenges. It produces events as per existing documents initially and further monitors the source system and creates a document change event as soon as they occur. These events can be stored in streaming storage and wait to be processed by a streaming job. A streaming job reads these events, loads the content of the document, and transforms the contents to an array of related tokens of words. Each token further transforms into vector data via an API call to an embedding FM. Results are sent for storage to the vector storage via a sink operator.

If you’re using Amazon S3 for storing your documents, you can build an event-source architecture based on S3 object change triggers for Lambda. A Lambda function can create an event in the desired format and write that to your streaming storage.

You can also use Apache Flink to run as a streaming job. Apache Flink provides the native FileSystem source connector, which can discover existing files and read their contents initially. After that, it can continuously monitor your file system for new files and capture their content. The connector supports reading a set of files from distributed file systems such as Amazon S3 or HDFS with a format of plain text, Avro, CSV, Parquet, and more, and produces a streaming record. As a fully managed service, Managed Service for Apache Flink removes the operational overhead of deploying and maintaining Flink jobs, allowing you to focus on building and scaling your streaming applications. With seamless integration into the AWS streaming services such as Amazon MSK or Kinesis Data Streams, it provides features like automatic scaling, security, and resiliency, providing reliable and efficient Flink applications for handling real-time streaming data.

Based on your DevOps preference, you can choose between Kinesis Data Streams or Amazon MSK for storing the streaming records. Kinesis Data Streams simplifies the complexities of building and managing custom streaming data applications, allowing you to focus on deriving insights from your data rather than infrastructure maintenance. Customers using Apache Kafka often opt for Amazon MSK due to its straightforwardness, scalability, and dependability in overseeing Apache Kafka clusters within the AWS environment. As a fully managed service, Amazon MSK takes on the operational complexities associated with deploying and maintaining Apache Kafka clusters, enabling you to concentrate on constructing and expanding your streaming applications.

Because a RESTful API integration suits the nature of this process, you need a framework that supports a stateful enrichment pattern via RESTful API calls to track for failures and retry for the failed request. Apache Flink again is a framework that can do stateful operations in at-memory speed. To understand the best ways to make API calls via Apache Flink, refer to Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink.

Apache Flink provides native sink connectors for writing data to vector datastores such as Amazon Aurora for PostgreSQL with pgvector or Amazon OpenSearch Service with VectorDB. Alternatively, you can stage the Flink job’s output (vectorized data) in an MSK topic or a Kinesis data stream. OpenSearch Service provides support for native ingestion from Kinesis data streams or MSK topics. For more information, refer to Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion and Loading streaming data from Amazon Kinesis Data Streams.

Feedback analytics and fine-tuning

It’s important for data operation managers and AI/ML developers to get insight about the performance of the generative AI application and the FMs in use. To achieve that, you need to build data pipelines that calculate important key performance indicator (KPI) data based on the user feedback and variety of application logs and metrics. This information is useful for stakeholders to gain real-time insight about the performance of the FM, the application, and overall user satisfaction about the quality of support they receive from your application. You also need to collect and store the conversation history for further fine-tuning your FMs to improve their ability in performing domain-specific tasks.

This use case fits very well in the streaming analytics domain. Your application should store each conversation in streaming storage. Your application can prompt users about their rating of each answer’s accuracy and their overall satisfaction. This data can be in a format of a binary choice or a free form text. This data can be stored in a Kinesis data stream or MSK topic, and get processed to generate KPIs in real time. You can put FMs to work for users’ sentiment analysis. FMs can analyze each answer and assign a category of user satisfaction.

Apache Flink’s architecture allows for complex data aggregation over windows of time. It also provides support for SQL querying over stream of data events. Therefore, by using Apache Flink, you can quickly analyze raw user inputs and generate KPIs in real time by writing familiar SQL queries. For more information, refer to Table API & SQL.

With Amazon Managed Service for Apache Flink Studio, you can build and run Apache Flink stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by Apache Zeppelin and use Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. With support for user-defined functions (UDFs), Apache Flink allows for building custom operators to integrate with external resources such as FMs for performing complex tasks such as sentiment analysis. You can use UDFs to compute various metrics or enrich user feedback raw data with additional insights such as user sentiment. To learn more about this pattern, refer to Proactively addressing customer concern in real-time with GenAI, Flink, Apache Kafka, and Kinesis.

With Managed Service for Apache Flink Studio, you can deploy your Studio notebook as a streaming job with one click. You can use native sink connectors provided by Apache Flink to send the output to your storage of choice or stage it in a Kinesis data stream or MSK topic. Amazon Redshift and OpenSearch Service are both ideal for storing analytical data. Both engines provide native ingestion support from Kinesis Data Streams and Amazon MSK via a separate streaming pipeline to a data lake or data warehouse for analysis.

Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses and data lakes, using AWS-designed hardware and machine learning to deliver the best price-performance at scale. OpenSearch Service offers visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

You can use the outcome of such analysis combined with user prompt data for fine-tuning the FM when is needed. SageMaker is the most straightforward way to fine-tune your FMs. Using Amazon S3 with SageMaker provides a powerful and seamless integration for fine-tuning your models. Amazon S3 serves as a scalable and durable object storage solution, enabling straightforward storage and retrieval of large datasets, training data, and model artifacts. SageMaker is a fully managed ML service that simplifies the entire ML lifecycle. By using Amazon S3 as the storage backend for SageMaker, you can benefit from the scalability, reliability, and cost-effectiveness of Amazon S3, while seamlessly integrating it with SageMaker training and deployment capabilities. This combination enables efficient data management, facilitates collaborative model development, and makes sure that ML workflows are streamlined and scalable, ultimately enhancing the overall agility and performance of the ML process. For more information, refer to Fine-tune Falcon 7B and other LLMs on Amazon SageMaker with @remote decorator.

With a file system sink connector, Apache Flink jobs can deliver data to Amazon S3 in open format (such as JSON, Avro, Parquet, and more) files as data objects. If you prefer to manage your data lake using a transactional data lake framework (such as Apache Hudi, Apache Iceberg, or Delta Lake), all of these frameworks provide a custom connector for Apache Flink. For more details, refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi.

Summary

For a generative AI application based on a RAG model, you need to consider building two data storage systems, and you need to build data operations that keep them up to date with all the source systems. Traditional batch jobs are not sufficient to process the size and diversity of the data you need to integrate with your generative AI application. Delays in processing the changes in source systems result in an inaccurate response and reduce the efficiency of your generative AI application. Data streaming enables you to ingest data from a variety of databases across various systems. It also allows you to transform, enrich, join, and aggregate data across many sources efficiently in near-real time. Data streaming provides a simplified data architecture to collect and transform users’ real-time reactions or comments on the application responses, helping you deliver and store the results in a data lake for model fine-tuning. Data streaming also helps you optimize data pipelines by processing only the change events, allowing you to respond to data changes more quickly and efficiently.

Learn more about AWS data streaming services and get started building your own data streaming solution.


About the Authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Imtiaz (Taz) Sayed is the World-Wide Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Introducing enhanced functionality for worker configuration management in Amazon MSK Connect

Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/introducing-enhanced-functionality-for-worker-configuration-management-in-amazon-msk-connect/

Amazon MSK Connect is a fully managed service for Apache Kafka Connect. With a few clicks, MSK Connect allows you to deploy connectors that move data between Apache Kafka and external systems.

MSK Connect now supports the ability to delete MSK Connect worker configurations, tag resources, and manage worker configurations and custom plugins using AWS CloudFormation. Together, these new capabilities make it straightforward to manage your MSK Connect resources and automate deployments through CI/CD pipelines.

MSK Connect makes it effortless to stream data to and from Apache Kafka over a private connection without requiring infrastructure management expertise. With a few clicks, you can deploy connectors like an Amazon S3 sink connector for loading streaming data to Amazon Simple Storage Service (Amazon S3), deploy connectors developed by third parties like Debezium for streaming change logs from databases into Apache Kafka, or deploy your own connector customized for your use case.

MSK Connect integrates external systems or AWS services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your Apache Kafka cluster into a data sink. The connector can also perform lightweight tasks such as transformation, format conversion, or filtering data before delivering the data to a destination. You can use a plugin to create the connecter; these custom plugins are resources that contain the code that defines connector logic.

The primary components of MSK Connect are workers. Each worker is a Java virtual machine (JVM) process that runs the connector logic based on the worker configuration provided. Worker configurations are resources that contain your connector configuration properties that can be reused across multiple connectors. Each worker is comprised of a set of tasks that copy the data in parallel.

Today, we are announcing three new capabilities in MSK Connect:

  • The ability to delete worker configurations
  • Support for resource tags for enabling resource grouping, cost allocation and reporting, and access control with tag-based policies
  • Support in AWS CloudFormation to manage worker configurations and custom plugins

In the following sections, we look at the new functionalities in more detail.

Delete worker configurations

Connectors for integrating Amazon Managed Streaming for Apache Kafka (Amazon MSK) with other AWS and partner services are usually created using a worker configuration (default or custom). These configurations can grow with the creation and deletion of connectors, potentially creating configuration management issues.

You can now use the new delete worker configuration API to delete unused configurations. The service checks that the worker configuration is not in use by any connectors before deleting the configuration. Additionally, you can now use a prefix filter to list worker configurations and custom plugins using the ListWorkerConfigurations and ListCustomPlugins API calls. The prefix filter allows you to list the selective resources with names starting with the prefix so you can perform quick selective deletes.

To test the new delete API, complete the following steps:

  1. On the Amazon MSK console, create a new worker configuration.
  2. Provide a name and optional description.
  3. In the Worker configuration section, enter your configuration code.

MSK Connect Worker Configuration

After you create the configuration, a Delete option is available on the configuration detail page (see the following screenshot) if the configuration is not being used in any connector.

To support this new API, an additional workerConfigurationState has been added, so you can more easily track the state of the worker configuration. This new state will be returned in the API call responses for CreateWorkerConfiguration, DescribeWorkerConfiguration, and ListWorkerConfigurations.

MSK Connect Worker Configuration

  1. Choose Delete to delete the worker configuration.
  2. In the confirmation pop-up, enter the name of the worker configuration, then choose Delete.

Delete MSKC Worker Configuration

If the worker configuration is being used with any connector, the Delete option is disabled, as shown in the following screenshot.

Resource tags

MSK Connect now also has support for resource tags. Tags are key-value metadata that can be associated with AWS service resources. You can add tags to connectors, custom plugins, and worker configurations to organize and find resources used across AWS services. In the following screenshots, our example MSK Connect connector, plugin, and worker configuration have been tagged with the resource tag key project and value demo-tags.

You can now tag your Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3 resources with the same project name, for example. Then you can use the tag to search for all resources linked to this particular project for cost allocation, reporting, resource grouping, or access control. MSK Connect supports adding tags when creating resources, applying tags to an existing resource, removing tags from a resource, and querying tags associated with a resource.

AWS CloudFormation support

Previously, you were only able to provision an MSK Connect connector with AWS CloudFormation by using an existing worker configuration. With this new feature, you can now perform CREATE, READ, UPDATE, DELETE, and LIST operations on connectors, and create and add new worker configurations using AWS CloudFormation.

The following code is an example of creating a worker configuration:

{
"Type": "AWS::KafkaConnect::WorkerConfiguration"
"Properties":{
"Name": "WorkerConfigurationName",
"Description": "WorkerConfigurationDescription",
"PropertiesFileContent": String,
"Tags": [Tag,…],
}
}

The return values are as follows:

  • ARN of the newly created worker configuration
  • State of the new worker configuration
  • Creation time of new worker configuration
  • Latest revision of the new worker configuration

Conclusion

MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we discussed the new features that were added to MSK Connect, which streamline connector and worker management with the introduction of APIs for deleting worker configurations, tagging MSK Connect resources, and support in AWS CloudFormation to create non-default worker configurations.

These capabilities are available in all AWS Regions where Amazon MSK Connect is available. For a list of Region availability, refer to AWS Services by Region. To learn more about MSK Connect, visit the Amazon MSK Connect Developer Guide.


About the Authors

Chinmayi Narasimhadevara is a is a Solutions Architect focused on Big Data and Analytics at Amazon Web Services. Chinmayi has over 20 years of experience in information technology. She helps AWS customers build advanced, highly scalable and performant solutions.

Harita Pappu is Technical Account Manager based out California. She has over 18 years of experience working in software industry building and scaling applications. She is passionate about new technologies and focused on helping customers achieve cost optimization and operational excellence.

Build an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK using Python

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-serverless-streaming-pipeline-with-apache-kafka-on-amazon-msk-using-python/

The volume of data generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and travel. Organizations are looking for more ways to quickly use the constant inflow of data to innovate for their businesses and customers. They have to reliably capture, process, analyze, and load the data into a myriad of data stores, all in real time.

Apache Kafka is a popular choice for these real-time streaming needs. However, it can be challenging to set up a Kafka cluster along with other data processing components that scale automatically depending on your application’s needs. You risk under-provisioning for peak traffic, which can lead to downtime, or over-provisioning for base load, leading to wastage. AWS offers multiple serverless services like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Data Firehose, Amazon DynamoDB, and AWS Lambda that scale automatically depending on your needs.

In this post, we explain how you can use some of these services, including MSK Serverless, to build a serverless data platform to meet your real-time needs.

Solution overview

Let’s imagine a scenario. You’re responsible for managing thousands of modems for an internet service provider deployed across multiple geographies. You want to monitor the modem connectivity quality that has a significant impact on customer productivity and satisfaction. Your deployment includes different modems that need to be monitored and maintained to ensure minimal downtime. Each device transmits thousands of 1 KB records every second, such as CPU usage, memory usage, alarm, and connection status. You want real-time access to this data so you can monitor performance in real time, and detect and mitigate issues quickly. You also need longer-term access to this data for machine learning (ML) models to run predictive maintenance assessments, find optimization opportunities, and forecast demand.

Your clients that gather the data onsite are written in Python, and they can send all the data as Apache Kafka topics to Amazon MSK. For your application’s low-latency and real-time data access, you can use Lambda and DynamoDB. For longer-term data storage, you can use managed serverless connector service Amazon Data Firehose to send data to your data lake.

The following diagram shows how you can build this end-to-end serverless application.

end-to-end serverless application

Let’s follow the steps in the following sections to implement this architecture.

Create a serverless Kafka cluster on Amazon MSK

We use Amazon MSK to ingest real-time telemetry data from modems. Creating a serverless Kafka cluster is straightforward on Amazon MSK. It only takes a few minutes using the AWS Management Console or AWS SDK. To use the console, refer to Getting started using MSK Serverless clusters. You create a serverless cluster, AWS Identity and Access Management (IAM) role, and client machine.

Create a Kafka topic using Python

When your cluster and client machine are ready, SSH to your client machine and install Kafka Python and the MSK IAM library for Python.

  • Run the following commands to install Kafka Python and the MSK IAM library:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python
  • Create a new file called createTopic.py.
  • Copy the following code into this file, replacing the bootstrap_servers and region information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))
  • Run the createTopic.py script to create a new Kafka topic called mytopic on your serverless cluster:
python createTopic.py

Produce records using Python

Let’s generate some sample modem telemetry data.

  • Create a new file called kafkaDataGen.py.
  • Copy the following code into this file, updating the BROKERS and region information with the details for your cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())
  • Run the kafkaDataGen.py to continuously generate random data and publish it to the specified Kafka topic:
python kafkaDataGen.py

Store events in Amazon S3

Now you store all the raw event data in an Amazon Simple Storage Service (Amazon S3) data lake for analytics. You can use the same data to train ML models. The integration with Amazon Data Firehose allows Amazon MSK to seamlessly load data from your Apache Kafka clusters into an S3 data lake. Complete the following steps to continuously stream data from Kafka to Amazon S3, eliminating the need to build or manage your own connector applications:

  • On the Amazon S3 console, create a new bucket. You can also use an existing bucket.
  • Create a new folder in your S3 bucket called streamingDataLake.
  • On the Amazon MSK console, choose your MSK Serverless cluster.
  • On the Actions menu, choose Edit cluster policy.

cluster policy

  • Select Include Firehose service principal and choose Save changes.

firehose service principal

  • On the S3 delivery tab, choose Create delivery stream.

delivery stream

  • For Source, choose Amazon MSK.
  • For Destination, choose Amazon S3.

source and destination

  • For Amazon MSK cluster connectivity, select Private bootstrap brokers.
  • For Topic, enter a topic name (for this post, mytopic).

source settings

  • For S3 bucket, choose Browse and choose your S3 bucket.
  • Enter streamingDataLake as your S3 bucket prefix.
  • Enter streamingDataLakeErr as your S3 bucket error output prefix.

destination settings

  • Choose Create delivery stream.

create delivery stream

You can verify that the data was written to your S3 bucket. You should see that the streamingDataLake directory was created and the files are stored in partitions.

amazon s3

Store events in DynamoDB

For the last step, you store the most recent modem data in DynamoDB. This allows the client application to access the modem status and interact with the modem remotely from anywhere, with low latency and high availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

Lets first create a table in DynamoDB. Refer to DynamoDB API permissions: Actions, resources, and conditions reference to verify that your client machine has the necessary permissions.

  • Create a new file called createTable.py.
  • Copy the following code into the file, updating the region information:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")
  • Run the createTable.py script to create a table called device_status in DynamoDB:
python createTable.py

Now let’s configure the Lambda function.

  • On the Lambda console, choose Functions in the navigation pane.
  • Choose Create function.
  • Select Author from scratch.
  • For Function name¸ enter a name (for example, my-notification-kafka).
  • For Runtime, choose Python 3.11.
  • For Permissions, select Use an existing role and choose a role with permissions to read from your cluster.
  • Create the function.

On the Lambda function configuration page, you can now configure sources, destinations, and your application code.

  • Choose Add trigger.
  • For Trigger configuration, enter MSK to configure Amazon MSK as a trigger for the Lambda source function.
  • For MSK cluster, enter myCluster.
  • Deselect Activate trigger, because you haven’t configured your Lambda function yet.
  • For Batch size, enter 100.
  • For Starting position, choose Latest.
  • For Topic name¸ enter a name (for example, mytopic).
  • Choose Add.
  • On the Lambda function details page, on the Code tab, enter the following code:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")
  • Deploy the Lambda function.
  • On the Configuration tab, choose Edit to edit the trigger.

edit trigger

  • Select the trigger, then choose Save.
  • On the DynamoDB console, choose Explore items in the navigation pane.
  • Select the table device_status.

You will see Lambda is writing events generated in the Kafka topic to DynamoDB.

ddb table

Summary

Streaming data pipelines are critical for building real-time applications. However, setting up and managing the infrastructure can be daunting. In this post, we walked through how to build a serverless streaming pipeline on AWS using Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose, and other services. The key benefits are no servers to manage, automatic scalability of the infrastructure, and a pay-as-you-go model using fully managed services.

Ready to build your own real-time pipeline? Get started today with a free AWS account. With the power of serverless, you can focus on your application logic while AWS handles the undifferentiated heavy lifting. Let’s build something awesome on AWS!


About the Authors

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Michael Oguike is a Product Manager for Amazon MSK. He is passionate about using data to uncover insights that drive action. He enjoys helping customers from a wide range of industries improve their businesses using data streaming. Michael also loves learning about behavioral science and psychology from books and podcasts.

How VMware Tanzu CloudHealth migrated from self-managed Kafka to Amazon MSK

Post Syndicated from Rivlin Pereira original https://aws.amazon.com/blogs/big-data/how-vmware-tanzu-cloudhealth-migrated-from-self-managed-kafka-to-amazon-msk/

This is a post co-written with Rivlin Pereira & Vaibhav Pandey from Tanzu CloudHealth (VMware by Broadcom).

VMware Tanzu CloudHealth is the cloud cost management platform of choice for more than 20,000 organizations worldwide, who rely on it to optimize and govern their largest and most complex multi-cloud environments. In this post, we discuss how the VMware Tanzu CloudHealth DevOps team migrated their self-managed Apache Kafka workloads (running version 2.0) to Amazon Managed Streaming for Apache Kafka (Amazon MSK) running version 2.6.2. We discuss the system architectures, deployment pipelines, topic creation, observability, access control, topic migration, and all the issues we faced with the existing infrastructure, along with how and why we migrated to the new Kafka setup and some lessons learned.

Kafka cluster overview

In the fast-evolving landscape of distributed systems, VMware Tanzu CloudHealth’s next-generation microservices platform relies on Kafka as its messaging backbone. For us, Kafka’s high-performance distributed log system excels in handling massive data streams, making it indispensable for seamless communication. Serving as a distributed log system, Kafka efficiently captures and stores diverse logs, from HTTP server access logs to security event audit logs.

Kafka’s versatility shines in supporting key messaging patterns, treating messages as basic logs or structured key-value stores. Dynamic partitioning and consistent ordering ensure efficient message organization. The unwavering reliability of Kafka aligns with our commitment to data integrity.

The integration of Ruby services with Kafka is streamlined through the Karafka library, acting as a higher-level wrapper. Our other language stack services use similar wrappers. Kafka’s robust debugging features and administrative commands play a pivotal role in ensuring smooth operations and infrastructure health.

Kafka as an architectural pillar

In VMware Tanzu CloudHealth’s next-generation microservices platform, Kafka emerges as a critical architectural pillar. Its ability to handle high data rates, support diverse messaging patterns, and guarantee message delivery aligns seamlessly with our operational needs. As we continue to innovate and scale, Kafka remains a steadfast companion, enabling us to build a resilient and efficient infrastructure.

Why we migrated to Amazon MSK

For us, migrating to Amazon MSK came down to three key decision points:

  • Simplified technical operations – Running Kafka on a self-managed infrastructure was an operational overhead for us. We hadn’t updated Kafka version 2.0.0 for a while, and Kafka brokers were going down in production, causing issues with topics going offline. We also had to run scripts manually for increasing replication factors and rebalancing leaders, which was additional manual effort.
  • Deprecated legacy pipelines and simplified permissions – We were looking to move away from our existing pipelines written in Ansible to create Kafka topics on the cluster. We also had a cumbersome process of giving team members access to Kafka machines in staging and production, and we wanted to simplify this.
  • Cost, patching, and support – Because Apache Zookeeper is completely managed and patched by AWS, moving to Amazon MSK was going to save us time and money. In addition, we discovered that running Amazon MSK with the same type of brokers on Amazon Elastic Compute Cloud (Amazon EC2) was cheaper to run on Amazon MSK. Combined with the fact that we get security patches applied on brokers by AWS, migrating to Amazon MSK was an easy decision. This also meant that the team was freed up to work on other important things. Finally, getting enterprise support from AWS was also critical in our final decision to move to a managed solution.

How we migrated to Amazon MSK

With the key drivers identified, we moved ahead with a proposed design to migrate existing self-managed Kafka to Amazon MSK. We conducted the following pre-migration steps before the actual implementation:

  • Assessment:
    • Conducted a meticulous assessment of the existing EC2 Kafka cluster, understanding its configurations and dependencies
    • Verified Kafka version compatibility with Amazon MSK
  • Amazon MSK setup with Terraform
  • Network configuration:
    • Ensured seamless network connectivity between the EC2 Kafka and MSK clusters, fine-tuning security groups and firewall settings

After the pre-migration steps, we implemented the following for the new design:

  • Automated deployment, upgrade, and topic creation pipelines for MSK clusters:
    • In the new setup, we wanted to have automated deployments and upgrades of the MSK clusters in a repeatable fashion using an IaC tool. Therefore, we created custom Terraform modules for MSK cluster deployments as well as upgrades. These modules where called from a Jenkins pipeline for automated deployments and upgrades of the MSK clusters. For Kafka topic creation, we were using an Ansible-based home-grown pipeline, which wasn’t stable and led to a lot of complaints from dev teams. As a result, we evaluated options for deployments to Kubernetes clusters and used the Strimzi Topic Operator to create topics on MSK clusters. Topic creation was automated using Jenkins pipelines, which dev teams could self-service.
  • Better observability for clusters:
    • The old Kafka clusters didn’t have good observability. We only had alerts on Kafka broker disk size. With Amazon MSK, we took advantage of open monitoring using Prometheus. We stood up a standalone Prometheus server that scraped metrics from MSK clusters and sent them to our internal observability tool. As a result of improved observability, we were able to set up robust alerting for Amazon MSK, which wasn’t possible with our old setup.
  • Improved COGS and better compute infrastructure:
    • For our old Kafka infrastructure, we had to pay for managing Kafka, Zookeeper instances, plus any additional broker storage costs and data transfer costs. With the move to Amazon MSK, because Zookeeper is completely managed by AWS, we only have to pay for Kafka nodes, broker storage, and data transfer costs. As a result, in final Amazon MSK setup for production, we saved not only on infrastructure costs but also operational costs.
  • Simplified operations and enhanced security:
    • With the move to Amazon MSK, we didn’t have to manage any Zookeeper instances. Broker security patching was also taken care by AWS for us.
    • Cluster upgrades became simpler with the move to Amazon MSK; it’s a straightforward process to initiate from the Amazon MSK console.
    • With Amazon MSK, we got broker automatic scaling out of the box. As a result, we didn’t have to worry about brokers running out of disk space, thereby leading to additional stability of the MSK cluster.
    • We also got additional security for the cluster because Amazon MSK supports encryption at rest by default, and various options for encryption in transit are also available. For more information, refer to Data protection in Amazon Managed Streaming for Apache Kafka.

During our pre-migration steps, we validated the setup on the staging environment before moving ahead with production.

Kafka topic migration strategy

With the MSK cluster setup complete, we performed a data migration of Kafka topics from the old cluster running on Amazon EC2 to the new MSK cluster. To achieve this, we performed the following steps:

  • Set up MirrorMaker with Terraform – We used Terraform to orchestrate the deployment of a MirrorMaker cluster consisting of 15 nodes. This demonstrated the scalability and flexibility by adjusting the number of nodes based on the migration’s concurrent replication needs.
  • Implement a concurrent replication strategy – We implemented a concurrent replication strategy with 15 MirrorMaker nodes to expedite the migration process. Our Terraform-driven approach contributed to cost optimization by efficiently managing resources during the migration and ensured the reliability and consistency of the MSK and MirrorMaker clusters. It also showcased how the chosen setup accelerates data transfer, optimizing both time and resources.
  • Migrate data – We successfully migrated 2 TB of data in a remarkably short timeframe, minimizing downtime and showcasing the efficiency of the concurrent replication strategy.
  • Set up post-migration monitoring – We implemented robust monitoring and alerting during the migration, contributing to a smooth process by identifying and addressing issues promptly.

The following diagram illustrates the architecture after the topic migration was complete.
Mirror-maker setup

Challenges and lessons learned

Embarking on a migration journey, especially with large datasets, is often accompanied by unforeseen challenges. In this section, we delve into the challenges encountered during the migration of topics from EC2 Kafka to Amazon MSK using MirrorMaker, and share valuable insights and solutions that shaped the success of our migration.

Challenge 1: Offset discrepancies

One of the challenges we encountered was the mismatch in topic offsets between the source and destination clusters, even with offset synchronization enabled in MirrorMaker. The lesson learned here was that offset values don’t necessarily need to be identical, as long as offset sync is enabled, which makes sure the topics have the correct position to read the data from.

We addressed this problem by using a custom tool to run tests on consumer groups, confirming that the translated offsets were either smaller or caught up, indicating synchronization as per MirrorMaker.

Challenge 2: Slow data migration

The migration process faced a bottleneck—data transfer was slower than anticipated, especially with a substantial 2 TB dataset. Despite a 20-node MirrorMaker cluster, the speed was insufficient.

To overcome this, the team strategically grouped MirrorMaker nodes based on unique port numbers. Clusters of five MirrorMaker nodes, each with a distinct port, significantly boosted throughput, allowing us to migrate data within hours instead of days.

Challenge 3: Lack of detailed process documentation

Navigating the uncharted territory of migrating large datasets using MirrorMaker highlighted the absence of detailed documentation for such scenarios.

Through trial and error, the team crafted an IaC module using Terraform. This module streamlined the entire cluster creation process with optimized settings, enabling a seamless start to the migration within minutes.

Final setup and next steps

As a result of the move to Amazon MSK, our final setup after topic migration looked like the following diagram.
MSK Blog
We’re considering the following future improvements:

Conclusion.

In this post, we discussed how VMware Tanzu CloudHealth migrated their existing Amazon EC2-based Kafka infrastructure to Amazon MSK. We walked you through the new architecture, deployment and topic creation pipelines, improvements to observability and access control, topic migration challenges, and the issues we faced with the existing infrastructure, along with how and why we migrated to the new Amazon MSK setup. We also talked about all the advantages that Amazon MSK gave us, the final architecture we achieved with this migration, and lessons learned.

For us, the interplay of offset synchronization, strategic node grouping, and IaC proved pivotal in overcoming obstacles and ensuring a successful migration from Amazon EC2 Kafka to Amazon MSK. This post serves as a testament to the power of adaptability and innovation in migration challenges, offering insights for others navigating a similar path.

If you’re running self-managed Kafka on AWS, we encourage you to try the managed Kafka offering, Amazon MSK.


About the Authors

Rivlin Pereira is Staff DevOps Engineer at VMware Tanzu Division. He is very passionate about Kubernetes and works on CloudHealth Platform building and operating cloud solutions that are scalable, reliable and cost effective.

Vaibhav Pandey, a Staff Software Engineer at Broadcom, is a key contributor to the development of cloud computing solutions. Specializing in architecting and engineering data storage layers, he is passionate about building and scaling SaaS applications for optimal performance.

Raj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Todd McGrath is a data streaming specialist at Amazon Web Services where he advises customers on their streaming strategies, integration, architecture, and solutions. On the personal side, he enjoys watching and supporting his 3 teenagers in their preferred activities as well as following his own pursuits such as fishing, pickleball, ice hockey, and happy hour with friends and family on pontoon boats. Connect with him on LinkedIn.

Satya Pattanaik is a Sr. Solutions Architect at AWS. He has been helping ISVs build scalable and resilient applications on AWS Cloud. Prior joining AWS, he played significant role in Enterprise segments with their growth and success. Outside of work, he spends time learning “how to cook a flavorful BBQ” and trying out new recipes.

Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/best-practices-to-implement-near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-msk/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, straightforward, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the most widely used cloud data warehouse. You can run and scale analytics in seconds on all your data, without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift Streaming Ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use Structured Query Language (SQL) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss the best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK.

Overview of solution

We walk through an example pipeline to ingest data from an MSK topic into Amazon Redshift using Amazon Redshift streaming ingestion. We also show how to unnest JSON data using dot notation in Amazon Redshift. The following diagram illustrates our solution architecture.

The process flow consists of the following steps:

  1. Create a streaming materialized view in your Redshift cluster to consume live streaming data from the MSK topics.
  2. Use a stored procedure to implement change data capture (CDC) using the unique combination of Kafka Partition and Kafka Offset at the record level for the ingested MSK topic.
  3. Create a user-facing table in the Redshift cluster and use dot notation to unnest the JSON document from the streaming materialized view into data columns of the table. You can continuously load fresh data by calling the stored procedure at regular intervals.
  4. Establish connectivity between an Amazon QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

As part of this post, we also discuss the following topics:

  • Steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift
  • Best practices to achieve optimized performance from streaming materialized views
  • Monitoring techniques to track failures in Amazon Redshift streaming ingestion

Prerequisites

You must have the following:

Considerations while setting up your MSK topic

Keep in mind the following considerations when configuring your MSK topic:

  • Make sure that the name of your MSK topic is no longer than 128 characters.
  • As of this writing, MSK records containing compressed data can’t be directly queried in Amazon Redshift. Amazon Redshift doesn’t support any native decompression methods for client-side compressed data in an MSK topic.
  • Follow best practices while setting up your MSK cluster.
  • Review the streaming ingestion limitations for any other considerations.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to the Setting up IAM and performing streaming ingestion from Kafka.
  2. Make sure that data is flowing into your MSK topic using Amazon CloudWatch metrics (for example, BytesOutPerSec).
  3. Launch the query editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Redshift cluster for the next steps. The following steps were run in query editor v2.
  4. Create an external schema to map to the MSK cluster. Replace your IAM role ARN and the MSK cluster ARN in the following statement:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Optionally, if your topic names are case sensitive, you need to enable enable_case_sensitive_identifier to be able to access them in Amazon Redshift. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Create a materialized view to consume the stream data from the MSK topic:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

The metadata column kafka_value that arrives from Amazon MSK is stored in VARBYTE format in Amazon Redshift. For this post, you use the JSON_PARSE function to convert kafka_value to a SUPER data type. You also use the CAN_JSON_PARSE function in the filter condition to skip invalid JSON records and guard against errors due to JSON parsing failures. We discuss how to store the invalid data for future debugging later in this post.

  1. Refresh the streaming materialized view, which triggers Amazon Redshift to read from the MSK topic and load data into the materialized view:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the MSK topic to the Data column of SUPER type in the streaming materialized view Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Use dot notation as shown in the following code to unnest your JSON payload:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

The following screenshot shows what the result looks like after unnesting.

If you have arrays in your JSON document, consider unnesting your data using PartiQL statements in Amazon Redshift. For more information, refer to the section Unnest the JSON document in the post Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB.

Incremental data load strategy

Complete the following steps to implement an incremental data load:

  1. Create a table called Orders in Amazon Redshift, which end-users will use for visualization and business analysis:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Next, you create a stored procedure called SP_Orders_Load to implement CDC from a streaming materialized view and load into the final Orders table. You use the combination of Kafka_Partition and Kafka_Offset available in the streaming materialized view as system columns to implement CDC. The combination of these two columns will always be unique within an MSK topic, which makes sure that none of the records are missed during the process. The stored procedure contains the following components:

  • To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session, user, or cluster level.
  • Refresh the streaming materialized view manually if auto refresh is not enabled.
  • Create an audit table called Orders_Streaming_Audit if it doesn’t exist to keep track of the last offset for a partition that was loaded into Orders table during the last run of the stored procedure.
  • Unnest and insert only new or changed data into a staging table called Orders_Staging_Table, reading from the streaming materialized view Orders_Stream_MV, where Kafka_Offset is greater than the last processed Kafka_Offset recorded in the audit table Orders_Streaming_Audit for the Kafka_Partition being processed.
  • When loading for the first time using this stored procedure, there will be no data in the Orders_Streaming_Audit table and all the data from Orders_Stream_MV will get loaded into the Orders table.
  • Insert only business-relevant columns to the user-facing Orders table, selecting from the staging table Orders_Staging_Table.
  • Insert the max Kafka_Offset for every loaded Kafka_Partition into the audit table Orders_Streaming_Audit

We have added the intermediate staging table Orders_Staging_Table in this solution to help with the debugging in case of unexpected failures and trackability. Skipping the staging step and directly loading into the final table from Orders_Stream_MV can provide lower latency depending on your use case.

  1. Create the stored procedure with the following code:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Run the stored procedure to load data into the Orders table:
    call SP_Orders_Load();

  3. Validate data in the Orders table.

Establish cross-account streaming ingestion

If your MSK cluster belongs to a different account, complete the following steps to create IAM roles to set up cross-account streaming ingestion. Let’s assume the Redshift cluster is in account A and the MSK cluster is in account B, as shown in the following diagram.

Complete the following steps:

  1. In account B, create an IAM role called MyRedshiftMSKRole that allows Amazon Redshift (account A) to communicate with the MSK cluster (account B) named MyTestCluster. Depending on whether your MSK cluster uses IAM authentication or unauthenticated access to connect, you need to create an IAM role with one of the following policies:
    • An IAM policAmazonAmazon MSK using unauthenticated access:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • An IAM policy for Amazon MSK when using IAM authentication:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

The resource section in the preceding example gives access to all topics in the MyTestCluster MSK cluster. If you need to restrict the IAM role to specific topics, you need to replace the topic resource with a more restrictive resource policy.

  1. After you create the IAM role in account B, take note of the IAM role ARN (for example, arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. In account A, create a Redshift customizable IAM role called MyRedshiftRole, that Amazon Redshift will assume when connecting to Amazon MSK. The role should have a policy like the following, which allows the Amazon Redshift IAM Role in account A to assume the Amazon MSK role in account B:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Take note of the role ARN for the Amazon Redshift IAM role (for example, arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Go back to account B and add this role in the trust policy of the IAM role arn:aws:iam::0123456789:role/MyRedshiftMSKRole to allow account B to trust the IAM role from account A. The trust policy should look like the following code:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Sign in to the Amazon Redshift console as account A.
  6. Launch the query editor v2 or your preferred SQL client and run the following statements to access the MSK topic in account B. To map to the MSK cluster, create an external schema using role chaining by specifying IAM role ARNs, separated by a comma without any spaces around it. The role attached to the Redshift cluster comes first in the chain.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Performance considerations

Keep in mind the following performance considerations:

  • Keep the streaming materialized view simple and move transformations like unnesting, aggregation, and case expressions to a later step—for example, by creating another materialized view on top of the streaming materialized view.
  • Consider creating only one streaming materialized view in a single Redshift cluster or workgroup for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic. Live streaming data in a streaming materialized view can be shared across multiple Redshift clusters or Redshift Serverless workgroups using data sharing.
  • While defining your streaming materialized view, avoid using Json_Extract_Path_Text to pre-shred data, because Json_extract_path_text operates on the data row by row, which significantly impacts ingestion throughput. It is preferable to land the data as is from the stream and then shred it later.
  • Where possible, consider skipping the sort key in the streaming materialized view to accelerate the ingestion speed. When a streaming materialized view has a sort key, a sort operation will occur with every batch of ingested data from the stream. Sorting has a performance overheard depending on the sort key data type, number of sort key columns, and amount of data ingested in each batch. This sorting step can increase the latency before the streaming data is available to query. You should weigh which is more important: latency on ingestion or latency on querying the data.
  • For optimized performance of the streaming materialized view and to reduce storage usage, occasionally purge data from the materialized view using delete, truncate, or alter table append.
  • If you need to ingest multiple MSK topics in parallel into Amazon Redshift, start with a smaller number of streaming materialized views and keep adding more materialized views to evaluate the overall ingestion performance within a cluster or workgroup.
  • Increasing the number of nodes in a Redshift provisioned cluster or the base RPU of a Redshift Serverless workgroup can help boost the ingestion performance of a streaming materialized view. For optimal performance, you should aim to have as many slices in your Redshift provisioned cluster as there are partitions in your MSK topic, or 8 RPU for every four partitions in your MSK topic.

Monitoring techniques

Records in the topic that exceed the size of the target materialized view column at the time of ingestion will be skipped. Records that are skipped by the materialized view refresh will be logged in the SYS_STREAM_SCAN_ERRORS system table.

Errors that occur when processing a record due to a calculation or a data type conversion or some other logic in the materialized view definition will result in the materialized view refresh failure until the offending record has expired from the topic. To avoid these types of issues, test the logic of your materialized view definition carefully; otherwise, land the records into the default VARBYTE column and process them later.

The following are available monitoring views:

  • SYS_MV_REFRESH_HISTORY – Use this view to gather information about the refresh history of your streaming materialized views. The results include the refresh type, such as manual or auto, and the status of the most recent refresh. The following query shows the refresh history for a streaming materialized view:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Use this view to check the reason why a record failed to load via streaming ingestion from an MSK topic. As of writing this post, when ingesting from Amazon MSK, this view only logs errors when the record is larger than the materialized view column size. This view will also show the unique identifier (offset) of the MSK record in the position column. The following query shows the error code and error reason when a record exceeded the maximum size limit:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Use this view to monitor the number of records scanned at a given record_time. This view also tracks the offset of the last record read in the batch. The following query shows topic data for a specific materialized view:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Use this view to check the overall metrics for a streaming materialized view refresh. This will also log errors in the error_message column for errors that don’t show up in SYS_STREAM_SCAN_ERRORS. The following query shows the error causing the refresh failure of a streaming materialized view:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Additional considerations for implementation

You have the choice to optionally generate a materialized view on top of a streaming materialized view, allowing you to unnest and precompute results for end-users. This approach eliminates the need to store the results in a final table using a stored procedure.

In this post, you use the CAN_JSON_PARSE function to guard against any errors to more successfully ingest data—in this case, the streaming records that can’t be parsed are skipped by Amazon Redshift. However, if you want to keep track of your error records, consider storing them in a column using the following SQL when creating the streaming materialized view:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

You can also consider unloading data from the view SYS_STREAM_SCAN_ERRORS into an Amazon Simple Storage Service (Amazon S3) bucket and get alerts by sending a report via email using Amazon Simple Notification Service (Amazon SNS) notifications whenever a new S3 object is created.

Lastly, based on your data freshness requirement, you can use Amazon EventBridge to schedule the jobs in your data warehouse to call the aforementioned SP_Orders_Load stored procedure on a regular basis. EventBridge does this at fixed intervals, and you may need to have a mechanism (for example, an AWS Step Functions state machine) to monitor if the previous call to the procedure completed. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. You can also refer to Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API. Another option is to use Amazon Redshift query editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

Conclusion

In this post, we discussed best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK. We showed you an example pipeline to ingest data from an MSK topic into Amazon Redshift using streaming ingestion. We also showed a reliable strategy to perform incremental streaming data load into Amazon Redshift using Kafka Partition and Kafka Offset. Additionally, we demonstrated the steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift and discussed performance considerations for optimized ingestion rate. Lastly, we discussed monitoring techniques to track failures in Amazon Redshift streaming ingestion.

If you have any questions, leave them in the comments section.


About the Authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Adekunle Adedotun is a Sr. Database Engineer with Amazon Redshift service. He has been working on MPP databases for 6 years with a focus on performance tuning. He also provides guidance to the development team for new and existing service features.

Simplify data streaming ingestion for analytics using Amazon MSK and Amazon Redshift

Post Syndicated from Sebastian Vlad original https://aws.amazon.com/blogs/big-data/simplify-data-streaming-ingestion-for-analytics-using-amazon-msk-and-amazon-redshift/

Towards the end of 2022, AWS announced the general availability of real-time streaming ingestion to Amazon Redshift for Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), eliminating the need to stage streaming data in Amazon Simple Storage Service (Amazon S3) before ingesting it into Amazon Redshift.

Streaming ingestion from Amazon MSK into Amazon Redshift, represents a cutting-edge approach to real-time data processing and analysis. Amazon MSK serves as a highly scalable, and fully managed service for Apache Kafka, allowing for seamless collection and processing of vast streams of data. Integrating streaming data into Amazon Redshift brings immense value by enabling organizations to harness the potential of real-time analytics and data-driven decision-making.

This integration enables you to achieve low latency, measured in seconds, while ingesting hundreds of megabytes of streaming data per second into Amazon Redshift. At the same time, this integration helps make sure that the most up-to-date information is readily available for analysis. Because the integration doesn’t require staging data in Amazon S3, Amazon Redshift can ingest streaming data at a lower latency and without intermediary storage cost.

You can configure Amazon Redshift streaming ingestion on a Redshift cluster using SQL statements to authenticate and connect to an MSK topic. This solution is an excellent option for data engineers that are looking to simplify data pipelines and reduce the operational cost.

In this post, we provide a complete overview on how to configure Amazon Redshift streaming ingestion from Amazon MSK.

Solution overview

The following architecture diagram describes the AWS services and features you will be using.

architecture diagram describing the AWS services and features you will be using

The workflow includes the following steps:

  1. You start with configuring an Amazon MSK Connect source connector, to create an MSK topic, generate mock data, and write it to the MSK topic. For this post, we work with mock customer data.
  2. The next step is to connect to a Redshift cluster using the Query Editor v2.
  3. Finally, you configure an external schema and create a materialized view in Amazon Redshift, to consume the data from the MSK topic. This solution does not rely on an MSK Connect sink connector to export the data from Amazon MSK to Amazon Redshift.

The following solution architecture diagram describes in more detail the configuration and integration of the AWS services you will be using.
solution architecture diagram describing in more detail the configuration and integration of the AWS services you will be using
The workflow includes the following steps:

  1. You deploy an MSK Connect source connector, an MSK cluster, and a Redshift cluster within the private subnets on a VPC.
  2. The MSK Connect source connector uses granular permissions defined in an AWS Identity and Access Management (IAM) in-line policy attached to an IAM role, which allows the source connector to perform actions on the MSK cluster.
  3. The MSK Connect source connector logs are captured and sent to an Amazon CloudWatch log group.
  4. The MSK cluster uses a custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  5. The MSK cluster logs are captured and sent to an Amazon CloudWatch log group.
  6. The Redshift cluster uses granular permissions defined in an IAM in-line policy attached to an IAM role, which allows the Redshift cluster to perform actions on the MSK cluster.
  7. You can use the Query Editor v2 to connect to the Redshift cluster.

Prerequisites

To simplify the provisioning and configuration of the prerequisite resources, you can use the following AWS CloudFormation template:

Complete the following steps when launching the stack:

  1. For Stack name, enter a meaningful name for the stack, for example, prerequisites.
  2. Choose Next.
  3. Choose Next.
  4. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Submit.

The CloudFormation stack creates the following resources:

  • A VPC custom-vpc, created across three Availability Zones, with three public subnets and three private subnets:
    • The public subnets are associated with a public route table, and outbound traffic is directed to an internet gateway.
    • The private subnets are associated with a private route table, and outbound traffic is sent to a NAT gateway.
  • An internet gateway attached to the Amazon VPC.
  • A NAT gateway that is associated with an elastic IP and is deployed in one of the public subnets.
  • Three security groups:
    • msk-connect-sg, which will be later associated with the MSK Connect connector.
    • redshift-sg, which will be later associated with the Redshift cluster.
    • msk-cluster-sg, which will be later associated with the MSK cluster. It allows inbound traffic from msk-connect-sg, and redshift-sg.
  • Two CloudWatch log groups:
    • msk-connect-logs, to be used for the MSK Connect logs.
    • msk-cluster-logs, to be used for the MSK cluster logs.
  • Two IAM Roles:
    • msk-connect-role, which includes granular IAM permissions for MSK Connect.
    • redshift-role, which includes granular IAM permissions for Amazon Redshift.
  • A custom MSK cluster configuration, allowing the MSK Connect connector to create topics on the MSK cluster.
  • An MSK cluster, with three brokers deployed across the three private subnets of custom-vpc. The msk-cluster-sg security group and the custom-msk-cluster-configuration configuration are applied to the MSK cluster. The broker logs are delivered to the msk-cluster-logs CloudWatch log group.
  • A Redshift cluster subnet group, which is using the three private subnets of custom-vpc.
  • A Redshift cluster, with one single node deployed in a private subnet within the Redshift cluster subnet group. The redshift-sg security group and redshift-role IAM role are applied to the Redshift cluster.

Create an MSK Connect custom plugin

For this post, we use an Amazon MSK data generator deployed in MSK Connect, to generate mock customer data, and write it to an MSK topic.

Complete the following steps:

  1. Download the Amazon MSK data generator JAR file with dependencies from GitHub.
    awslabs github page for downloading the jar file of the amazon msk data generator
  2. Upload the JAR file into an S3 bucket in your AWS account.
    amazon s3 console image showing the uploaded jar file in an s3 bucket
  3. On the Amazon MSK console, choose Custom plugins under MSK Connect in the navigation pane.
  4. Choose Create custom plugin.
  5. Choose Browse S3, search for the Amazon MSK data generator JAR file you uploaded to Amazon S3, then choose Choose.
  6. For Custom plugin name, enter msk-datagen-plugin.
  7. Choose Create custom plugin.

When the custom plugin is created, you will see that its status is Active, and you can move to the next step.
amazon msk console showing the msk connect custom plugin being successfully created

Create an MSK Connect connector

Complete the following steps to create your connector:

  1. On the Amazon MSK console, choose Connectors under MSK Connect in the navigation pane.
  2. Choose Create connector.
  3. For Custom plugin type, choose Use existing plugin.
  4. Select msk-datagen-plugin, then choose Next.
  5. For Connector name, enter msk-datagen-connector.
  6. For Cluster type, choose Self-managed Apache Kafka cluster.
  7. For VPC, choose custom-vpc.
  8. For Subnet 1, choose the private subnet within your first Availability Zone.

For the custom-vpc created by the CloudFormation template, we are using odd CIDR ranges for public subnets, and even CIDR ranges for the private subnets:

    • The CIDRs for the public subnets are 10.10.1.0/24, 10.10.3.0/24, and 10.10.5.0/24
    • The CIDRs for the private subnets are 10.10.2.0/24, 10.10.4.0/24, and 10.10.6.0/24
  1. For Subnet 2, select the private subnet within your second Availability Zone.
  2. For Subnet 3, select the private subnet within your third Availability Zone.
  3. For Bootstrap servers, enter the list of bootstrap servers for TLS authentication of your MSK cluster.

To retrieve the bootstrap servers for your MSK cluster, navigate to the Amazon MSK console, choose Clusters, choose msk-cluster, then choose View client information. Copy the TLS values for the bootstrap servers.

  1. For Security groups, choose Use specific security groups with access to this cluster, and choose msk-connect-sg.
  2. For Connector configuration, replace the default settings with the following:
connector.class=com.amazonaws.mskdatagen.GeneratorSourceConnector
tasks.max=2
genkp.customer.with=#{Code.isbn10}
genv.customer.name.with=#{Name.full_name}
genv.customer.gender.with=#{Demographic.sex}
genv.customer.favorite_beer.with=#{Beer.name}
genv.customer.state.with=#{Address.state}
genkp.order.with=#{Code.isbn10}
genv.order.product_id.with=#{number.number_between '101','109'}
genv.order.quantity.with=#{number.number_between '1','5'}
genv.order.customer_id.matching=customer.key
global.throttle.ms=2000
global.history.records.max=1000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
  1. For Connector capacity, choose Provisioned.
  2. For MCU count per worker, choose 1.
  3. For Number of workers, choose 1.
  4. For Worker configuration, choose Use the MSK default configuration.
  5. For Access permissions, choose msk-connect-role.
  6. Choose Next.
  7. For Encryption, select TLS encrypted traffic.
  8. Choose Next.
  9. For Log delivery, choose Deliver to Amazon CloudWatch Logs.
  10. Choose Browse, select msk-connect-logs, and choose Choose.
  11. Choose Next.
  12. Review and choose Create connector.

After the custom connector is created, you will see that its status is Running, and you can move to the next step.
amazon msk console showing the msk connect connector being successfully created

Configure Amazon Redshift streaming ingestion for Amazon MSK

Complete the following steps to set up streaming ingestion:

  1. Connect to your Redshift cluster using Query Editor v2, and authenticate with the database user name awsuser, and password Awsuser123.
  2. Create an external schema from Amazon MSK using the following SQL statement.

In the following code, enter the values for the redshift-role IAM role, and the msk-cluster cluster ARN.

CREATE EXTERNAL SCHEMA msk_external_schema
FROM MSK
IAM_ROLE '<insert your redshift-role arn>'
AUTHENTICATION iam
CLUSTER_ARN '<insert your msk-cluster arn>';
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create an external schema from amazon msk

  1. Create a materialized view using the following SQL statement:
CREATE MATERIALIZED VIEW msk_mview AUTO REFRESH YES AS
SELECT
    "kafka_partition",
    "kafka_offset",
    "kafka_timestamp_type",
    "kafka_timestamp",
    "kafka_key",
    JSON_PARSE(kafka_value) as Data,
    "kafka_headers"
FROM
    "dev"."msk_external_schema"."customer"
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to create a materialized view

  1. You can now query the materialized view using the following SQL statement:
select * from msk_mview LIMIT 100;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the materialized view

  1. To monitor the progress of records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_STATES monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_STATES;
  1. Choose Run to run the SQL statement.

redshift query editor v2 showing the SQL statement used to query the sys stream scan states monitoring view

  1. To monitor errors encountered on records loaded via streaming ingestion, you can take advantage of the SYS_STREAM_SCAN_ERRORS monitoring view using the following SQL statement:
select * from SYS_STREAM_SCAN_ERRORS;
  1. Choose Run to run the SQL statement.redshift query editor v2 showing the SQL statement used to query the sys stream scan errors monitoring view

Clean up

After following along, if you no longer need the resources you created, delete them in the following order to prevent incurring additional charges:

  1. Delete the MSK Connect connector msk-datagen-connector.
  2. Delete the MSK Connect plugin msk-datagen-plugin.
  3. Delete the Amazon MSK data generator JAR file you downloaded, and delete the S3 bucket you created.
  4. After you delete your MSK Connect connector, you can delete the CloudFormation template. All the resources created by the CloudFormation template will be automatically deleted from your AWS account.

Conclusion

In this post, we demonstrated how to configure Amazon Redshift streaming ingestion from Amazon MSK, with a focus on privacy and security.

The combination of the ability of Amazon MSK to handle high throughput data streams with the robust analytical capabilities of Amazon Redshift empowers business to derive actionable insights promptly. This real-time data integration enhances the agility and responsiveness of organizations in understanding changing data trends, customer behaviors, and operational patterns. It allows for timely and informed decision-making, thereby gaining a competitive edge in today’s dynamic business landscape.

This solution is also applicable for customers that are looking to use Amazon MSK Serverless and Amazon Redshift Serverless.

We hope this post was a good opportunity to learn more about AWS service integration and configuration. Let us know your feedback in the comments section.


About the authors

Sebastian Vlad is a Senior Partner Solutions Architect with Amazon Web Services, with a passion for data and analytics solutions and customer success. Sebastian works with enterprise customers to help them design and build modern, secure, and scalable solutions to achieve their business outcomes.

Sharad Pai is a Lead Technical Consultant at AWS. He specializes in streaming analytics and helps customers build scalable solutions using Amazon MSK and Amazon Kinesis. He has over 16 years of industry experience and is currently working with media customers who are hosting live streaming platforms on AWS, managing peak concurrency of over 50 million. Prior to joining AWS, Sharad’s career as a lead software developer included 9 years of coding, working with open source technologies like JavaScript, Python, and PHP.

Secure connectivity patterns for Amazon MSK Serverless cross-account access

Post Syndicated from Tamer Soliman original https://aws.amazon.com/blogs/big-data/secure-connectivity-patterns-for-amazon-msk-serverless-cross-account-access/

Amazon MSK Serverless is a cluster type of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that makes it straightforward for you to run Apache Kafka without having to manage and scale cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain on a usage basis.

Deploying infrastructure across multiple VPCs and multiple accounts is considered best practice, facilitating scalability while maintaining isolation boundaries. In a multi-account environment, Kafka producers and consumers can exist within the same VPC—however, they are often located in different VPCs, sometimes within the same account, in a different account, or even in multiple different accounts. There is a need for a solution that can extend access to MSK Serverless clusters to producers and consumers from multiple VPCs within the same AWS account and across multiple AWS accounts. The solution needs to be scalable and straightforward to maintain.

In this post, we walk you through multiple solution approaches that address the MSK Serverless cross-VPC and cross-account access connectivity options, and we discuss the advantages and limitations of each approach.

MSK Serverless connectivity and authentication

When an MSK Serverless cluster is created, AWS manages the cluster infrastructure on your behalf and extends private connectivity back to your VPCs through VPC endpoints powered by AWS PrivateLink. You bootstrap your connection to the cluster through a bootstrap server that holds a record of all your underlying brokers.

At creation, a fully qualified domain name (FQDN) is assigned to your cluster bootstrap server. The bootstrap server FQDN has the general format of boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and your cluster brokers follow the format of bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, where ClusterUniqueID.xx is unique to your cluster and bxxxx is a dynamic broker range (b0001, b0037, and b0523 can be some of your assigned brokers at a point of time). It’s worth noting that the brokers assigned to your cluster are dynamic and change over time, but your bootstrap address remains the same for the cluster. All your communication with the cluster starts with the bootstrap server that can respond with the list of active brokers when required. For proper Kafka communication, your MSK client needs to be able to resolve the domain names of your bootstrap server as well as all your brokers.

At cluster creation, you specify the VPCs that you would like the cluster to communicate with (up to five VPCs in the same account as your cluster). For each VPC specified during cluster creation, cluster VPC endpoints are created along with a private hosted zone that includes a list of your bootstrap server and all dynamic brokers kept up to date. The private hosted zones facilitate resolving the FQDNs of your bootstrap server and brokers, from within the associated VPCs defined during cluster creation, to the respective VPC endpoints for each.

Cross-account access

To be able to extend private connectivity of your Kafka producers and consumers to your MSK Serverless cluster, you need to consider three main aspects: private connectivity, authentication and authorization, and DNS resolution.

The following diagram highlights the possible connectivity options. Although the diagram shows them all here for demonstration purposes, in most cases, you would use one or more of these options depending on your architecture, not necessary all in the same setup.

MSK cross account connectivity options

In this section, we discuss the different connectivity options along with their pros and cons. We also cover the authentication and DNS resolution aspects associated with the relevant connectivity options.

Private connectivity layer

This is the underlying private network connectivity. You can achieve this connectivity using VPC peering, AWS Transit Gateway, or PrivateLink, as indicated in the preceding diagram. VPC peering simplifies the setup, but it lacks the support for transitive routing. In most cases, peering is used when you have a limited number of VPCs or if your VPCs generally communicate with some limited number of core services VPCs without the need of lateral connectivity or transitive routing. On the other hand, AWS Transit Gateway facilitates transitive routing and can simplify the architecture when you have a large number of VPCs, and especially when lateral connectivity is required. PrivateLink is more suited for extending connectivity to a specific resource unidirectionally across VPCs or accounts without exposing full VPC-to-VPC connectivity, thereby adding a layer of isolation. PrivateLink is useful if you have overlapping CIDRs, which is a case that is not supported by Transit Gateway or VPC peering. PrivateLink is also useful when your connected parties are administrated separately, and when one-way connectivity and isolation are required.

If you choose PrivateLink as a connectivity option, you need to use a Network Load Balancer (NLB) with an IP type target group with its registered targets set as the IP addresses of the zonal endpoints of your MSK Serverless cluster.

Cluster authentication and authorization

In addition to having private connectivity and being able to resolve the bootstrap server and brokers domain names, for your producers and consumers to have access to your cluster, you need to configure your clients with proper credentials. MSK Serverless supports AWS Identity and Access Management (IAM) authentication and authorization. For cross-account access, your MSK client needs to assume a role that has proper credentials to access the cluster. This post focuses mainly on the cross-account connectivity and name resolution aspects. For more details on cross-account authentication and authorization, refer to the following GitHub repo.

DNS resolution

For Kafka producers and consumers located in accounts across the organization to be able to produce and consume to and from the centralized MSK Serverless cluster, they need to be able to resolve the FQDNs of the cluster bootstrap server as well as each of the cluster brokers. Understanding the dynamic nature of broker allocation, the solution will have to accommodate such a requirement. In the next section, we address how we can satisfy this part of the requirements.

Cluster cross-account DNS resolution

Now that we have discussed how MSK Serverless works, how private connectivity is extended, and the authentication and authorization requirements, let’s discuss how DNS resolution works for your cluster.

For every VPC associated with your cluster during cluster creation, a VPC endpoint is created along with a private hosted zone. Private hosted zones enable name resolve of the FQDNs of the cluster bootstrap server and the dynamically allocated brokers, from within each respective VPC. This works well when requests come from within any of the VPCs that were added during cluster creation because they already have the required VPC endpoints and relevant private hosted zones.

Let’s discuss how you can extend name resolution to other VPCs within the same account that were not included during cluster creation, and to others that may be located in other accounts.

You’ve already made your choice of the private connectivity option that best fits your architecture requirements, be it VPC peering, PrivateLink, or Transit Gateway. Assuming that you have also configured your MSK clients to assume roles that have the right IAM credentials in order to facilitate cluster access, you now need to address the name resolution aspect of connectivity. It’s worth noting that, although we list different connectivity options using VPC peering, Transit Gateway, and PrivateLink, in most cases only one or two of these connectivity options are present. You don’t necessarily need to have them all; they are listed here to demonstrate your options, and you are free to choose the ones that best fit your architecture and requirements.

In the following sections, we describe two different methods to address DNS resolution. For each method, there are advantages and limitations.

Private hosted zones

The following diagram highlights the solution architecture and its components. Note that, to simplify the diagram, and to make room for more relevant details required in this section, we have eliminated some of the connectivity options.

Cross-account access using Private Hosted Zones

The solution starts with creating a private hosted zone, followed by creating a VPC association.

Create a private hosted zone

We start by creating a private hosted zone for name resolution. To make the solution scalable and straightforward to maintain, you can choose to create this private hosted zone in the same MSK Serverless cluster account; in some cases, creating the private hosted zone in a centralized networking account is preferred. Having the private hosted zone created in the MSK Serverless cluster account facilitates centralized management of the private hosted zone alongside the MSK cluster. We can then associate the centralized private hosted zone with VPCs within the same account, or in different other accounts. Choosing to centralize your private hosted zones in a networking account can also be a viable solution to consider.

The purpose of the private hosted zone is to be able to resolve the FQDNs of the bootstrap server as well as all the dynamically assigned cluster-associated brokers. As discussed earlier, the bootstrap server FQDN format is boot-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, and the cluster brokers use the format bxxxx-ClusterUniqueID.xx.kafka-serverless.Region.amazonaws.com, with bxxxx being the broker ID. You need to create the new private hosted zone with the primary domain set as kafka-serverless.Region.amazonaws.com, with an A-Alias record called *.kafka-serverless.Region.amazonaws.com pointing to the Regional VPC endpoint of the MSK Serverless cluster in the MSK cluster VPC. This should be sufficient to direct all traffic targeting your cluster to the primary cluster VPC endpoints that you specified in your private hosted zone.

Now that you have created the private hosted zone, for name resolution to work, you need to associate the private hosted zone with every VPC where you have clients for the MSK cluster (producer or consumer).

Associate a private hosted zone with VPCs in the same account

For VPCs that are in the same account as the MSK cluster and weren’t included in the configuration during cluster creation, you can associate them to the private hosted zone created using the AWS Management Console by editing the private hosted zone settings and adding the respective VPCs. For more information, refer to Associating more VPCs with a private hosted zone.

Associate a private hosted zone in cross-account VPCs

For VPCs that are in a different account other than the MSK cluster account, refer to Associating an Amazon VPC and a private hosted zone that you created with different AWS accounts. The key steps are as follows:

  1. Create a VPC association authorization in the account where the private hosted zone is created (in this case, it’s the same account as the MSK Serverless cluster account) to authorize the remote VPCs to be associated with the hosted zone:
aws route53 create-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID
  1. Associate the VPC with the private hosted zone in the account where you have the VPCs with the MSK clients (remote account), referencing the association authorization you created earlier:
aws route53 list-vpc-association-authorizations --hosted-zone-id HostedZoneID
aws route53 associate-vpc-with-hosted-zone --hosted-zone-id HostedZoneID --VPC VPCRegion=Region,VPCId=vpc-ID
  1. Delete the VPC authorization to associate the VPC with the hosted zone:
aws route53 delete-vpc-association-authorization --hosted-zone-id HostedZoneID --vpc VPCRegion=Region,VPCId=vpc-ID

Deleting the authorization doesn’t affect the association, it just prevents you from re-associating the VPC with the hosted zone in the future. If you want to re-associate the VPC with the hosted zone, you’ll need to repeat steps 1 and 2 of this procedure.

Note that your VPC needs to have the enableDnsSupport and enableDnsHostnames DNS attributes enabled for this to work. These two settings can be configured under the VPC DNS settings. For more information, refer to DNS attributes in your VPC.

These procedures work well for all remote accounts when connectivity is extended using VPC peering or Transit Gateway. If your connectivity option uses PrivateLink, the private hosted zone needs to be created in the remote account instead (the account where the PrivateLink VPC endpoints are). In addition, an A-Alias record that resolves to the PrivateLink endpoint instead of the MSK cluster endpoint needs to be created as indicated in the earlier diagram. This will facilitate name resolution to the PrivateLink endpoint. If other VPCs need access to the cluster through that same PrivateLink setup, you need to follow the same private hosted zone association procedure as described earlier and associate your other VPCs with the private hosted zone created for your PrivateLink VPC.

Limitations

The private hosted zones solution has some key limitations.

Firstly, because you’re using kafka-serverless.Region.amazonaws.com as the primary domain for our private hosted zone, and your A-Alias record uses *.kafka-serverless.Region.amazonaws.com, all traffic to the MSK Serverless service originating from any VPC associated with this private hosted zone will be directed to the one specific cluster VPC Regional endpoint that you specified in the hosted zone A-Alias record.

This solution is valid if you have one MSK Serverless cluster in your centralized service VPC. If you need to provide access to multiple MSK Serverless clusters, you can use the same solution but adapt a distributed private hosted zone approach as opposed to a centralized approach. In a distributed private hosted zone approach, each private hosted zone can point to a specific cluster. The VPCs associated with that specific private hosted zone will communicate only to the respective cluster listed under the specific private hosted zone.

In addition, after you establish a VPC association with a private hosted zone resolving *.kafka-serverless.Region.amazonaws.com, the respective VPC will only be able to communicate with the cluster defined in that specific private hosted zone and no other cluster. An exception to this rule is if a local cluster is created within the same client VPC, in which case the clients within the VPC will only be able to communicate with only the local cluster.

You can also use PrivateLink to accommodate multiple clusters by creating a PrivateLink plus private hosted zone per cluster, replicating the configuration steps described earlier.

Both solutions, using distributed private hosted zones or PrivateLink, are still subject to the limitation that for each client VPC, you can only communicate with the one MSK Serverless cluster that your associated private hosted zone is configured for.

In the next section, we discuss another possible solution.

Resolver rules and AWS Resource Access Manager

The following diagram shows a high-level overview of the solution using Amazon Route 53 resolver rules and AWS Resource Access Manager.

Cross-account access using Resolver Rules and Resolver Endpoints

The solution starts with creating Route 53 inbound and outbound resolver endpoints, which are associated with the MSK cluster VPC. Then you create a resolver forwarding rule in the MSK account that is not associated with any VPC. Next, you share the resolver rule across accounts using Resource Access Manager. At the remote account where you need to extend name resolution to, you need to accept the resource share and associate the resolver rules with your target VPCs located in the remote account (the account where the MSK clients are located).

For more information about this approach, refer to the third use case in Simplify DNS management in a multi-account environment with Route 53 Resolver.

This solution accommodates multiple centralized MSK serverless clusters in a more scalable and flexible approach. Therefore, the solution counts on directing DNS requests to be resolved by the VPC where the MSK clusters are. Multiple MSK Serverless clusters can coexist, where clients in a particular VPC can communicate with one or more of them at the same time. This option is not supported with the private hosted zone solution approach.

Limitations

Although this solution has its advantages, it also has a few limitations.

Firstly, for a particular target consumer or producer account, all your MSK Serverless clusters need to be in the same core service VPC in the MSK account. This is due to the fact that the resolver rule is set on an account level and uses.kafka-serverless.Region.amazonaws.com as the primary domain, directing its resolution to one specific VPC resolver endpoint inbound/outbound pair within that service VPC. If you need to have separate clusters in different VPCs, consider creating separate accounts.

The second limitation is that all your client VPCs need to be in the same Region as your core MSK Serverless VPC. The reason behind this limitation is that resolver rules pointing to a resolver endpoint pair (in reality, they point to the outbound endpoint that loops into the inbound endpoints) need to be in the same Region as the resolver rules, and Resource Access Manager will extend the share only within the same Region. However, this solution is good when you have multiple MSK clusters in the same core VPC, and although your remote clients are in different VPCs and accounts, they are still within the same Region. A workaround for this limitation is to duplicate the creation of resolver rules and outbound resolver endpoint in a second Region, where the outbound endpoint loops back through the original first Region inbound resolver endpoint associated with the MSK Serverless cluster VPC (assuming IP connectivity is facilitated). This second Region resolver rule can then be shared using Resource Access Manager within the second Region.

Conclusion

You can configure MSK Serverless cross-VPC and cross-account access in multi-account environments using private hosted zones or Route 53 resolver rules. The solution discussed in this post allows you to centralize your configuration while extending cross-account access, making it a scalable and straightforward-to-maintain solution. You can create your MSK Serverless clusters with cross-account access for producers and consumers, keep your focus on your business outcomes, and gain insights from sources of data across your organization without having to right-size and manage a Kafka infrastructure.


About the Author

Tamer Soliman is a Senior Solutions Architect at AWS. He helps Independent Software Vendor (ISV) customers innovate, build, and scale on AWS. He has over two decades of industry experience, and is an inventor with three granted patents. His experience spans multiple technology domains including telecom, networking, application integration, data analytics, AI/ML, and cloud deployments. He specializes in AWS Networking and has a profound passion for machine leaning, AI, and Generative AI.

How Gupshup built their multi-tenant messaging analytics platform on Amazon Redshift

Post Syndicated from Gaurav Singh original https://aws.amazon.com/blogs/big-data/how-gupshup-built-their-multi-tenant-messaging-analytics-platform-on-amazon-redshift/

Gupshup is a leading conversational messaging platform, powering over 10 billion messages per month. Across verticals, thousands of large and small businesses in emerging markets use Gupshup to build conversational experiences across marketing, sales, and support. Gupshup’s carrier-grade platform provides a single messaging API for 30+ channels, a rich conversational experience-building tool kit for any use case, and a network of emerging market partnerships across messaging channels, device manufacturers, ISVs, and operators.

Objective

Gupshup wanted to build a messaging analytics platform that provided:

  • Build a platform to get detailed insights, data, and reports about WhatsApp/SMS campaigns and track the success of every text message sent by the end customers.
  • Easily gain insight into trends, delivery rates, and speed.
  • Save time and eliminate unnecessary processes.

About Redshift and some relevant features for the use case

Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. It makes it fast, simple, and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Amazon Redshift extends beyond traditional data warehousing workloads, by integrating with the AWS cloud with features such as querying the data lake with Spectrum, semistructured data ingestion and querying with PartiQL, streaming ingestion from Amazon Kinesis and Amazon MSK, Redshift ML, federated queries to Amazon Aurora and Amazon RDS operational databases, and federated materialized views.

In this use case, Gupshup is heavily relying on Amazon Redshift as their data warehouse to process billions of streaming events every month, performing intricate data-pipeline-like operations on such data and incrementally maintaining a hierarchy of aggregations on top of raw data. They have been enjoying the flexibility and convenience that Amazon Redshift has brought to their business. By leveraging the Amazon Redshift materialized views, Gupshup has been able to dramatically improve query performance on recurring and predictable workloads, such as dashboard queries from Business Intelligence (BI) tools. Additionally, extract, load, and transform (ELT) data processing is sped up and made easier. To store commonly used pre-computations and seamlessly utilize them to reduce latency on ensuing analytical queries, Redshift materialized views feature incremental refresh capability which enables Gupshup to be more agile while using less code. Without writing complicated code for incremental updates, they were able to deliver data latency of roughly 15 minutes for some use cases.

Overall architecture and implementation details with Redshift Materialized views

Gupshup uses a CDC mechanism to extract data from their source systems and persist it in S3 in order to meet these needs. A series of materialized view refreshes are used to calculate metrics, after which the incremental data from S3 is loaded into Redshift. This compiled data is then imported into Aurora PostgreSQL Serverless for operational reporting. The ability of Redshift to incrementally refresh materialized views, enabling it to process massive amounts of data progressively, the capacity for scaling, which utilizes concurrency and elastic resizing for vertical scaling, as well as the RA3 architecture, delivers the separation of storage and compute to scale one without worrying about the other, led Gupshup to make this choice. Gupshup chose Aurora PostgreSQL as the operational reporting layer due to its anticipated increase in concurrency and cost-effectiveness for queries that retrieve only precalculated metrics.

Incremental analytics is the main reason for Gupshup to use Redshift. The diagram shows a simplified version of a typical data processing pipeline where data comes via multiple streams. The streams need to be joined together, then enriched by joining with master data tables. This is followed by series of joins and aggregations. All this needs to be performed in incremental manner, providing 30 minutes of latency.

Gupshup uses Redshift’s incremental materialized view feature to accomplish this. All of the join, enrich, and aggregation statements are written using sql statements. The stream-to-stream joins are performed by ingesting both streams in a table sorted by the key fields. Then an incremental MV aggregates data by the key fields. Redshift then automatically takes care of keeping the MVs refreshed incrementally with incoming data. The incremental view maintenance feature works even for hierarchical aggregations with MVs based on other MVs. This allows Gupshup to build an entire processing pipeline incrementally. It has actually helped Gupshup reduce cycle time during the POC and prototyping phases. Moreover, no separate effort is required to process historical data versus live streaming data.

Apart from incremental analytics, Redshift simplifies a lot of operational aspects. E.g., use the snapshot-restore feature to quickly create a green experimental cluster from an existing blue serving cluster. In case the processing logic changes (which happens quite often in prototyping stages), they need to reprocess all historical data. Gupshup uses Redshift’s elastic scaling feature to temporarily scale the cluster up and then scale it down when done. They also use Redshift to directly power some of their high-concurrency dashboards. For such cases, the concurrency scaling feature of Redshift really comes in handy. Apart from this, they have a lot of in-house data analysts who need to run ad hoc queries on live production data. They use the workload management features of Redshift to make sure their analysts can run queries while ensuring that production queries do not get affected.

Benefits realized with Amazon Redshift

  • On-Demand Scaling
  • Ease of use and maintenance with less code
  • Performance benefits with an incremental MV refresh

Conclusion

Gupshup, an enterprise messaging company, needed a scalable data warehouse solution to analyze billions of events generated each month. They chose Amazon Redshift to build a cloud data warehouse that could handle this scale of data and enable fast analytics.

By combining Redshift’s scalability, snapshots, workload management, and low-operational approach, Gupshup provides data-driven insights in less than 15 minutes analytics refresh rate.

Overall, Redshift’s scalability, performance, ease of management, and cost effectiveness have allowed Gupshup to gain data-driven insights from billions of events in near real-time. A scalable and robust data foundation is enabling Gupshup to build innovative messaging products and a competitive advantage.

The incremental refresh of materialized views feature of Redshift allowed us to be more agile with less code:

  • For some use cases, we are able to provide data latency of about 15 minutes, without having to write complex code for incremental updates.
  • The incremental refresh feature is a main differentiating factor that gives Redshift an edge over some of its competitors. I request that you keep improving and enhancing it.

“The incremental refresh of materialized views feature of Redshift allowed us to be more agile with less code”

Pankaj Bisen, Director of AI and Analytics at Gupshup.


About the Authors

Shabi Abbas Sayed is a Senior Technical Account Manager at AWS. He is passionate about building scalable data warehouses and big data solutions working closely with the customers. He works with large ISVs customers, in helping them build and operate secure, resilient, scalable, and high-performance SaaS applications in the cloud.

Gaurav Singh is a Senior Solutions Architect at AWS, specializing in AI/ML and Generative AI. Based in Pune, India, he focuses on helping customers build, deploy, and migrate ML production workloads to SageMaker at scale. In his spare time, Gaurav loves to explore nature, read, and run.

Break data silos and stream your CDC data with Amazon Redshift streaming and Amazon MSK

Post Syndicated from Umesh Chaudhari original https://aws.amazon.com/blogs/big-data/break-data-silos-and-stream-your-cdc-data-with-amazon-redshift-streaming-and-amazon-msk/

Data loses value over time. We hear from our customers that they’d like to analyze the business transactions in real time. Traditionally, customers used batch-based approaches for data movement from operational systems to analytical systems. Batch load can run once or several times a day. A batch-based approach can introduce latency in data movement and reduce the value of data for analytics. Change Data Capture (CDC)-based approach has emerged as alternative to batch-based approaches. A CDC-based approach captures the data changes and makes them available in data warehouses for further analytics in real-time.

CDC tracks changes made in source database, such as inserts, updates, and deletes, and continually updates those changes to target database. When the CDC is high-frequency, the source database is changing rapidly, and the target database (i.e., usually a data warehouse) needs to reflect those changes in near real-time.

With the explosion of data, the number of data systems in organizations has grown. Data silos causes data to live in different sources, which makes it difficult to perform analytics.

To gain deeper and richer insights, you can bring all the changes from different data silos into one place, like data warehouse. This post showcases how to use streaming ingestion to bring data to Amazon Redshift.

Redshift streaming ingestion provides low latency, high-throughput data ingestion, which enables customers to derive insights in seconds instead of minutes. It’s simple to set up, and directly ingests streaming data into your data warehouse from Amazon Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK) without the need to stage in Amazon Simple Storage Service (Amazon S3). You can create materialized views using SQL statements. After that, using materialized-view refresh, you can ingest hundreds of megabytes of data per second.

Solution overview

In this post, we create a low-latency data replication between Amazon Aurora MySQL to Amazon Redshift Data Warehouse, using Redshift streaming ingestion from Amazon MSK. Using Amazon MSK, we securely stream data with a fully managed, highly available Apache Kafka service. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. We store CDC events in Amazon MSK, for a set duration of time, which makes it possible to deliver CDC events to additional destinations such as Amazon S3 data lake.

We deploy Debezium MySQL source Kafka connector on Amazon MSK Connect. Amazon MSK Connect makes it easy to deploy, monitor, and automatically scale connectors that move data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. Amazon MSK Connect is a fully compatible with Apache Kafka Connect, which enables you to lift and shift your Apache Kafka Connect applications with zero code changes.

This solution uses Amazon Aurora MySQL hosting the example database salesdb. Users of the database can perform the row-level INSERT, UPDATE, and DELETE operations to produce the change events in the example salesdb database. Debezium MySQL source Kafka Connector reads these change events and emits them to the Kafka topics in Amazon MSK. Amazon Redshift then read the messages from the Kafka topics from Amazon MSK using Amazon Redshift Streaming feature. Amazon Redshift stores these messages using materialized views and process them as they arrive.

You can see how CDC performs create event by looking at this example here. We are going to use OP field – its mandatory string describes the type of operation that caused the connector to generate the event, in our solution for processing. In this example, c indicates that the operation created a row. Valid values for OP field are:

  • c = create
  • u = update
  • d = delete
  • r = read (applies to only snapshots)

The following diagram illustrates the solution architecture:

This image shows the architecture of the solution. we are reading from Amazon Aurora using the Debezium connector for MySQL. Debezium Connector for MySQL is deployed on Amazon MSK Connect and ingesting the events inside Amazon MSK which are being ingested further to Amazon Redshift MV

The solution workflow consists of the following steps:

  • Amazon Aurora MySQL has a binary log (i.e., binlog) that records all operations(INSERT, UPDATE, DELETE) in the order in which they are committed to the database.
  • Amazon MSK Connect runs the source Kafka Connector called Debezium connector for MySQL, reads the binlog, produces change events for row-level INSERT, UPDATE, and DELETE operations, and emits the change events to Kafka topics in amazon MSK.
  • An Amazon Redshift-provisioned cluster is the stream consumer and can read messages from Kafka topics from Amazon MSK.
  • A materialized view in Amazon Redshift is the landing area for data read from the stream, which is processed as it arrives.
  • When the materialized view is refreshed, Amazon Redshift compute nodes allocate a group of Kafka partition to a compute slice.
  • Each slice consumes data from the allocated partitions until the view reaches parity with last Offset for the Kafka topic.
  • Subsequent materialized view refreshes read data from the last offset of the previous refresh until it reaches parity with the topic data.
  • Inside the Amazon Redshift, we created stored procedure to process CDC records and update target table.

Prerequisites

This post assumes you have a running Amazon MSK Connect stack in your environment with the following components:

  • Aurora MySQL hosting a database. In this post, you use the example database salesdb.
  • The Debezium MySQL connector running on Amazon MSK Connect, which connects Amazon MSK in your Amazon Virtual Private Cloud (Amazon VPC).
  • Amazon MSK cluster

If you don’t have an Amazon MSK Connect stack, then follow the instructions in the MSK Connect lab setup and verify that your source connector replicates data changes to the Amazon MSK topics.

You should provision the Amazon Redshift cluster in same VPC of Amazon MSK cluster. If you haven’t deployed one, then follow the steps here in the AWS Documentation.

We use AWS Identity and Access Management (AWS IAM) authentication for communication between Amazon MSK and Amazon Redshift cluster. Please make sure you have created an AWS IAM role with a trust policy that allows your Amazon Redshift cluster to assume the role. For information about how to configure the trust policy for the AWS IAM role, see Authorizing Amazon Redshift to access other AWS services on your behalf. After it’s created, the role should have the following AWS IAM policy, which provides permission for communication with the Amazon MSK cluster.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "MSKIAMpolicy",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:Connect"
            ],
            "Resource": [
                "arn:aws:kafka:*:0123456789:cluster/xxx/xxx",
                "arn:aws:kafka:*:0123456789:topic/*/*/*"
            ]
        },
        {
            "Sid": "MSKPolicy",
            "Effect": "Allow",
            "Action": [
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": "arn:aws:kafka:*:0123456789:cluster/xxx/xxx"
        }
    ]
}

Please replace the ARN containing xxx from above example policy with your Amazon MSK cluster’s ARN.

  • Also, verify that Amazon Redshift cluster has access to Amazon MSK cluster. In Amazon Redshift Cluster’s security group, add the inbound rule for MSK security group allowing port 9098. To see how to manage redshift cluster security group, refer Managing VPC security groups for a cluster.

image shows, how to add the inbound rule for MSK security group allowing port 9098, In Amazon Redshift Cluster’s security group

  • And, in the Amazon MSK cluster’s security group add the inbound rule allowing port 9098 for leader IP address of your Amazon Redshift Cluster, as shown in the following diagram. You can find the IP address for your Amazon Redshift Cluster’s leader node on properties tab of Amazon Redshift cluster from AWS Management Console.

image shows how to add the inbound rule allowing port 9098 for leader IP address of your Amazon Redshift Cluster,in the Amazon MSK cluster’s security group

Walkthrough

Navigate to the Amazon Redshift service from AWS Management Console, then set up Amazon Redshift streaming ingestion for Amazon MSK by performing the following steps:

  1. Enable_case_sensitive_identifier to true – In case you are using default parameter group for Amazon Redshift Cluster, you won’t be able to set enable_case_sensitive_identifier to true. You can create new parameter group with enable_case_sensitive_identifier to true and attach it to Amazon Redshift cluster. After you modify parameter values, you must reboot any clusters that are associated with the modified parameter group. It may take few minutes for Amazon Redshift cluster to reboot.

This configuration value that determines whether name identifiers of databases, tables, and columns are case sensitive. Once done, please open a new Amazon Redshift Query Editor V2, so that config changes we made are reflected, then follow next steps.

  1. Create an external schema that maps to the streaming data source.
CREATE EXTERNAL SCHEMA MySchema
FROM MSK
IAM_ROLE 'arn:aws:iam::YourRole:role/msk-redshift-streaming'
AUTHENTICATION IAM
CLUSTER_ARN 'arn:aws:kafka:us-east-1:2073196*****:cluster/MSKCluster-msk-connect-lab/849b47a0-65f2-439e-b181-1038ea9d4493-10'; // Replace last part with your cluster ARN, this is just for example.//

Once done, verify if you are seeing below tables created from MSK Topics:

image shows tables created from MSK Topics

  1. Create a materialized view that references the external schema.
CREATE MATERIALIZED VIEW customer_debezium AUTO REFRESH YES AS
SELECT
*,
json_parse(kafka_value) as payload
from
"dev"."myschema"."salesdb.salesdb.CUSTOMER" ; // Replace myshecma with name you have given to your external schema in step 2 //

Now, you can query newly created materialized view customer_debezium using below command.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

Check the materialized view is populated with the CDC records

  1. REFRESH MATERIALIZED VIEW (optional). This step is optional as we have already specified AUTO REFRESH AS YES while creating MV (materialized view).
REFRESH MATERIALIZED VIEW "dev"."public"."customer_debezium";

NOTE: Above the materialized view is auto-refreshed, which means if you don’t see the records immediately, then you have wait for few seconds and rerun the select statement. Amazon Redshift streaming ingestion view also comes with the option of a manual refresh, which allow you to manually refresh the object. You can use the following query that pulls streaming data to Redshift object immediately.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

images shows records from the customer_debezium MV

Process CDC records in Amazon Redshift

In following steps, we create the staging table to hold the CDC data, which is target table that holds the latest snapshot and stored procedure to process CDC records and update in target table.

  1. Create staging table: The staging table is a temporary table that holds all of the data that will be used to make changes to the target table, including both updates and inserts.
CREATE TABLE public.customer_stg (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
ts_ms bigint ENCODE az64,
op character varying(2) ENCODE lzo,
record_rank smallint ENCODE az64,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id); // In this particular example, we have used LZO encoding as LZO encoding works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. //
  1. Create target table

We use customer_target table to load the processed CDC events.

CREATE TABLE public.customer_target (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id);
  1. Create Last_extract_time debezium table and Inserting Dummy value.

We need to store the timestamp of last extracted CDC events. We use of debezium_last_extract table for this purpose. For initial record we insert a dummy value, which enables us to perform a comparison between current and next CDC processing timestamp.

CREATE TABLE public.debezium_last_extract (
process_name character varying(256) ENCODE lzo,
latest_refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE AUTO;

Insert into public.debezium_last_extract VALUES ('customer','1983-01-01 00:00:00');

SELECT * FROM "dev"."public"."debezium_last_extract";
  1. Create stored procedure

This stored procedure processes the CDC records and updates the target table with the latest changes.

CREATE OR REPLACE PROCEDURE public.incremental_sync_customer()

LANGUAGE plpgsql

AS $$

DECLARE

sql VARCHAR(MAX) := '';

max_refresh_time TIMESTAMP;

staged_record_count BIGINT :=0;

BEGIN

-- Get last loaded refresh_time number from target table

sql := 'SELECT MAX(latest_refresh_time) FROM debezium_last_extract where process_name =''customer'';';

EXECUTE sql INTO max_refresh_time;

-- Truncate staging table

EXECUTE 'TRUNCATE customer_stg;';

-- Insert (and transform) latest change record for member with sequence number greater than last loaded sequence number into temp staging table

EXECUTE 'INSERT INTO customer_stg ('||

'select coalesce(payload.after."CUST_ID",payload.before."CUST_ID") ::varchar as customer_id,payload.after."NAME"::varchar as customer_name,payload.after."MKTSEGMENT" ::varchar as market_segment, payload.ts_ms::bigint,payload."op"::varchar, rank() over (partition by coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar order by payload.ts_ms::bigint desc) as record_rank, refresh_time from CUSTOMER_debezium where refresh_time > '''||max_refresh_time||''');';

sql := 'SELECT COUNT(*) FROM customer_stg;';

EXECUTE sql INTO staged_record_count;

RAISE INFO 'Staged member records: %', staged_record_count;

// replace customer_stg with your staging table name //

-- Delete records from target table that also exist in staging table (updated/deleted records)

EXECUTE 'DELETE FROM customer_target using customer_stg WHERE customer_target.customer_id = customer_stg.customer_id';

// replace customer_target with your target table name //

-- Insert all records from staging table into target table

EXECUTE 'INSERT INTO customer_target SELECT customer_id,customer_name, market_segment, refresh_time FROM customer_stg where record_rank =1 and op <> ''d''';

-- Insert max refresh time to control table

EXECUTE 'INSERT INTO debezium_last_extract SELECT ''customer'', max(refresh_time) from customer_target ';

END

$$

images shows stored procedure with name incremental_sync_customer created in above step

Test the solution

Update example salesdb hosted on Amazon Aurora

  1. This will be your Amazon Aurora database and we access it from Amazon Elastic Compute Cloud (Amazon EC2) instance with Name= KafkaClientInstance.
  2. Please replace the Amazon Aurora endpoint with value of your Amazon Aurora endpoint and execute following command and the use salesdb.
mysql -f -u master -h mask-lab-salesdb.xxxx.us-east-1.rds.amazonaws.com --password=S3cretPwd99

image shows the details of the RDS for MySQL
  1. Do an update, insert , and delete in any of the tables created. You can also do update more than once to check the last updated record later in Amazon Redshift.

image shows the insert, updates and delete operations performed on RDS for MySQL

  1. Invoke the stored procedure incremental_sync_customer created in the above steps from Amazon Redshift Query Editor v2. You can manually run proc using following command or schedule it.
call incremental_sync_customer();
  1. Check the target table for latest changes. This step is to check latest values in target table. You’ll see that all the updates and deletes that you did in source table are shown at top as a result order by refresh_time.
SELECT * FROM "dev"."public"."customer_target" order by refresh_time desc;

image shows the records from from customer_target table in descending order

Extending the solution

In this solution, we showed CDC processing for the customer table, and you can use the same approach to extend it to other tables in the example salesdb database or add more databases to MSK Connect configuration property database.include.list.

Our proposed approach can work with any MySQL source supported by Debezium MySQL source Kafka Connector. Similarly, to extend this example to your workloads and use-cases, you need to create the staging and target tables according to the schema of the source table. Then you need to update the coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar as customer_id statements with the column names and types in your source and target tables. Like in example stated in this post, we used LZO encoding as LZO encoding, which works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. Another consideration to keep in mind while creating target and staging tables is choosing a distribution style and key based on data in source database. Here we have chosen distribution style as key with Customer_id, which are based on source data and schema update by following the best practices mentioned here.

Cleaning up

  1. Delete all the Amazon Redshift clusters
  2. Delete Amazon MSK Cluster and MSK Connect Cluster
  3. In case you don’t want to delete Amazon Redshift clusters, you can manually drop MV and tables created during this post using below commands:
drop MATERIALIZED VIEW customer_debezium;
drop TABLE public.customer_stg;
drop TABLE public.customer_target;
drop TABLE public.debezium_last_extract;

Also, please remove inbound security rules added to your Amazon Redshift and Amazon MSK Clusters, along with AWS IAM roles created in the Prerequisites section.

Conclusion

In this post, we showed you how Amazon Redshift streaming ingestion provided high-throughput, low-latency ingestion of streaming data from Amazon Kinesis Data Streams and Amazon MSK into an Amazon Redshift materialized view. We increased speed and reduced cost of streaming data into Amazon Redshift by eliminating the need to use any intermediary services.

Furthermore, we also showed how CDC data can be processed rapidly after generation, using a simple SQL interface that enables customers to perform near real-time analytics on variety of data sources (e.g., Internet-of-Things [ IoT] devices, system telemetry data, or clickstream data) from a busy website or application.

As you explore the options to simplify and enable near real-time analytics for your CDC data,

We hope this post provides you with valuable guidance. We welcome any thoughts or questions in the comments section.


About the Authors

Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with AWS customers to design and build real time data processing systems. He has 13 years of working experience in software engineering including architecting, designing, and developing data analytics systems.

Vishal Khatri is a Sr. Technical Account Manager and Analytics specialist at AWS. Vishal works with State and Local Government helping educate and share best practices with customers by leading and owning the development and delivery of technical content while designing end-to-end customer solutions.

Amazon MSK now provides up to 29% more throughput and up to 24% lower costs with AWS Graviton3 support

Post Syndicated from Sai Maddali original https://aws.amazon.com/blogs/big-data/amazon-msk-now-provides-up-to-29-more-throughput-and-up-to-24-lower-costs-with-aws-graviton3-support/

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.

Today, we’re excited to bring the benefits of Graviton3 to Kafka workloads, with Amazon MSK now offering M7g instances for new MSK provisioned clusters. AWS Graviton processors are custom Arm-based processors built by AWS to deliver the best price-performance for your cloud workloads. For example, when running an MSK provisioned cluster using M7g.4xlarge instances, you can achieve up to 27% reduction in CPU usage and up to 29% higher write and read throughput compared to M5.4xlarge instances. These performance improvements, along with M7g’s lower prices provide up to 24% in compute cost savings over M5 instances.

In February 2023, AWS launched new Graviton3-based M7g instances. M7g instances are equipped with DDR5 memory, which provides up to 50% higher memory bandwidth than the DDR4 memory used in previous generations. M7g instances also deliver up to 25% higher storage throughput and up to 88% increase in network throughput compared to similar sized M5 instances to deliver price-performance benefits for Kafka workloads. You can read more about M7g features in New Graviton3-Based General Purpose (m7g) and Memory-Optimized (r7g) Amazon EC2 Instances.

Here are the specs for the M7g instances on MSK:

Name vCPUs Memory Network Bandwidth Storage Bandwidth
M7g.large 2 8 GiB up to 12.5 Gbps up to 10 Gbps
M7g.xlarge 4 16 GiB up to 12.5 Gbps up to 10 Gbps
M7g.2xlarge 8 32 GiB up to 15 Gbps up to 10 Gbps
M7g.4xlarge 16 64 GiB up to 15 Gbps up to 10 Gbps
M7g.8xlarge 32 128 GiB 15 Gbps 10 Gbps
M7g.12xlarge 48 192 GiB 22.5 Gbps 15 Gbps
M7g.16xlarge 64 256 GiB 30 Gbps 20 Gbps

M7g instances on Amazon MSK

Organizations are adopting Amazon MSK to capture and analyze data in real time, run machine learning (ML) workflows, and build event-driven architectures. Amazon MSK enables you to reduce operational overhead and run your applications with higher availability and durability. It also offers a consistent reduction in price-performance with capabilities such as Tiered Storage. With compute making up a large portion of Kafka costs, customers wanted a way to optimize them further and see Graviton instances providing them the quickest path. Amazon MSK has fully tested and validated M7g on all Kafka versions starting with version 2.8.2, making it to run critical workloads and benefit from Gravition3 cost savings.

You can get started by provisioning new clusters with the Graviton3-based M7g instances as the broker type using the AWS Management Console, APIs via the AWS SDK, and the AWS Command Line Interface (AWS CLI). M7g instances support all Amazon MSK and Kafka features, making it straightforward for you to run all your existing Kafka workloads with minimal changes. Amazon MSK supports Graviton3-based M7g instances from large through 16xlarge sizes to run all Kafka workloads.

Let’s take the M7g instances on MSK provisioned clusters for a test drive and see how it compares with Amazon MSK M5 instances.

M7g instances in action

Customers run a wide variety of workloads on Amazon MSK; some are latency sensitive, and some are throughput bound. In this post, we focus on M7g performance impact on throughput-bound workloads. M7g comes with an increase in network and storage throughput, providing a higher throughput per broker compared to an M5-based cluster.

To understand the implications, let’s look at how Kafka uses available throughput for writing or reading data. Every broker in the MSK cluster comes with a bounded storage and network throughput entitlement. Predominantly, writes in Kafka consume both storage and network throughput, whereas reads consume mostly network throughput. This is because a Kafka consumer is typically reading real-time data from a page cache and occasionally goes to disk to process old data. Therefore, the overall throughput gains also change based on the workload’s write to read throughput ratios.

Let’s look at the throughput gains based on an example. Our setup includes an MSK cluster with M7g.4xlarge instances and another with M5.4xlarge instances, with three nodes in three different Availability Zones. We also enabled TLS encryption, AWS Identity and Access Management (IAM) authentication, and a replication factor of 3 across both M7g and M5 MSK clusters. We also applied Amazon MSK best practices for broker configurations, including num.network.threads = 8 and num.io.threads = 16. On the client side for writes, we optimized the batch size with appropriate linger.ms and batch.size configurations. For the workload, we assumed 6 topics each with 64 partitions (384 per broker). For ingestion, we generated load with an average message size of 512 bytes and with one consumer group per topic. The amount of load sent to the clusters was identical.

As we ingest more data into the MSK cluster, the M7g.4xlarge instance supports higher throughput per broker, as shown in the following graph. After an hour of consistent writes, M7g.4xlarge brokers support up to 54 MB/s of write throughput vs. 40 MB/s with M5-based brokers, which represents a 29% increase.

We also see another important observation: M7g-based brokers consume much fewer CPU resources than M5s, even though they support 29% higher throughput. As seen in the following chart, CPU utilization of an M7g-based broker is on average 40%, whereas on an M5-based broker, it’s 47%.

As covered previously, customers may see different performance improvements based on the number of consumer group, batch sizes, and instance size. We recommend referring to MSK Sizing and Pricing to calculate M7g performance gains for your use case or creating a cluster based on M7g instances and benchmark the gains on your own.

Lower costs, with lesser operational burden, and higher resiliency

Since its launch, Amazon MSK has made it cost-effective to run your Kafka workloads, while still improving overall resiliency. Since day 1, you have been able to run brokers in multiple Availability Zones without worrying about additional networking costs. In October 2022, we launched Tiered Storage, which provides virtually unlimited storage at up to 50% lower costs. When you use Tiered Storage, you not only save on overall storage cost but also improve the overall availability and elasticity of your cluster.

Continuing down this path, we are now reducing compute costs for customers while still providing performance improvements. With M7g instances, Amazon MSK provides 24% savings on compute costs compared to similar sized M5 instances. When you move to Amazon MSK, you can not only lower your operational overhead using features such as Amazon MSK Connect, Amazon MSK Replicator, and automatic Kafka version upgrades, but also improve over resiliency and reduce their infrastructure costs.

Pricing and Regions

M7g instances on Amazon MSK are available today in the US (Ohio, N. Virginia, N. California, Oregon), Asia Pacific (Hyderabad, Mumbai, Seoul, Singapore, Sydney, Tokyo), Canada (Central), and EU (Ireland, London, Spain, Stockholm) Regions.

Refer to Amazon MSK pricing to learn about Graivton3-based instances with Amazon MSK pricing.

Summary

In this post, we discussed the performance gains achieved while using Graviton-based M7g instances. These instances can provide significant improvement in read and write throughput compared to similar sized M5 instances for Amazon MSK workloads. To get started, create a new cluster with M7g brokers using the AWS Management Console, and read our documentation for more information.


About the Authors

Sai Maddali is a Senior Manager Product Management at AWS who leads the product team for Amazon MSK. He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

Umesh is a Streaming Solutions Architect at AWS. He works with AWS customers to design and build real time data processing systems. He has 13 years of working experience in software engineering including architecting, designing, and developing data analytics systems.

Lanre Afod is a Solutions Architect focused with Global Financial Services at AWS, passionate about helping customers with deploying secure, scalable, high available and resilient architectures within the AWS Cloud.

AWS Weekly Roundup – EC2 DL2q instances, PartyRock, Amplify’s 6th birthday, and more – November 20, 2023

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-ec2-dl2q-instances-partyrock-amplifys-6th-birthday-and-more-november-20-2023/

Last week I saw an astonishing 160+ new service launches. There were so many updates that we decided to publish a weekly roundup again. This continues the same innovative pace of the previous week as we are getting closer to AWS re:Invent 2023.

Our News Blog team is also finalizing new blog posts for re:Invent to introduce awesome launches with service teams for your reading pleasure. Jeff Barr shared The Road to AWS re:Invent 2023 to explain our blogging journey and process. Please stay tuned in the next week!

Last week’s launches
Here are some of the launches that caught my attention last week:

Amazon EC2 DL2q instances – New DL2q instances are powered by Qualcomm AI 100 Standard accelerators and are the first to feature Qualcomm’s AI technology in the public cloud. With eight Qualcomm AI 100 Standard accelerators and 128 GiB of total accelerator memory, you can run popular generative artificial intelligence (AI) applications and extend to edge devices across smartphones, autonomous driving, personal compute, and extended reality headsets to develop and validate these AI workloads before deploying.

PartyRock for Amazon Bedrock – We introduced PartyRock, a fun and intuitive hands-on, generative AI app-building playground powered by Amazon Bedrock. You can experiment, learn all about prompt engineering, build mini-apps, and share them with your friends—all without writing any code or creating an AWS account.

You also can now access the Meta Llama 2 Chat 13B foundation model and Cohere Command Light, Embed English, and multilingual models for Amazon Bedrock.

AWS Amplify celebrates its sixth birthday – We announced six new launches; a new documentation site, support for Next.js 14 with our hosting and JavaScript library, added custom token providers and an automatic React Native social sign-in update to Amplify Auth, new ChangePassword and DeleteUser account settings components, and updated all Amplify UI packages to use new Amplify JavaScript v6. You can also use wildcard subdomains when using a custom domain with your Amplify application deployed to AWS Amplify Hosting.

Amplify docs site UI

Also check out other News Blog posts about major launches published in the past week:

Other AWS service launches
Here are some other bundled feature launches per AWS service:

Amazon Athena  – You can use a new cost-based optimizer (CBO) to enhance query performance based on table and column statistics, collected by AWS Glue Data Catalog and Athena JDBC 3.x driver, a new alternative that supports almost all authentication plugins. You can also use Amazon EMR Studio to develop and run interactive queries on Amazon Athena.

Amazon CloudWatch – You can use a new CloudWatch metric called EBS Stalled I/O Check to monitor the health of your Amazon EBS volumes, the regular expression for Amazon CloudWatch Logs Live Tail filter pattern syntax to search and match relevant log events, observability of SAP Sybase ASE database in CloudWatch Application Insights, and up to two stats commands in a Log Insights query to perform aggregations on the results.

Amazon CodeCatalyst – You can connect to a Amazon Virtual Private Cloud (Amazon VPC) from CodeCatalyst Workflows, provision infrastructure using Terraform within CodeCatalyst Workflows, access CodeCatalyst with your workforce identities configured in IAM Identity Center, and create teams made up of members of the CodeCatalyst space.

Amazon Connect – You can use a pre-built queue performance dashboard and Contact Lens conversational analytics dashboard to view and compare real-time and historical aggregated queue performance. You can use quick responses for chats, previously written formats such as typing in ‘/#greet’ to insert a personalized response, and scanning attachments to detect malware or other unwanted content.

AWS Glue – AWS Glue for Apache Spark added new six database connectors: Teradata, SAP HANA, Azure SQL, Azure Cosmos DB, Vertica, and MongoDB, as well as the native connectivity to Amazon OpenSearch Service.

AWS Lambda – You can see single pane view of metrics, logs, and traces in the AWS Lambda console and advanced logging controls to natively capture logs in JSON structured format. You can view the SAM template on the Lambda console and export the function’s configuration to AWS Application Composer. AWS Lambda also supports Java 21 and NodeJS 20 versions built on the new Amazon Linux 2023 runtime.

AWS Local Zones in Dallas – You can enable the new Local Zone in Dallas, Texas, us-east-1-dfw-2a, with Amazon EC2 C6i, M6i, R6i, C6gn, and M6g instances and Amazon EBS volume types gp2, gp3, io1, sc1, and st1. You can also access Amazon ECS, Amazon EKS, Application Load Balancer, and AWS Direct Connect in this new Local Zone to support a broad set of workloads at the edge.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) – You can standardize access control to Kafka resources using AWS Identity and Access Management (IAM) and build Kafka clients for Amazon MSK Serverless written in all programming languages. These are open source client helper libraries and code samples for popular languages, including Java, Python, Go, and JavaScript. Also, Amazon MSK now supports an enhanced version of Apache Kafka 3.6.0 that offers generally available Tiered Storage and automatically sends you storage capacity alerts when you are at risk of exhausting your storage.

Amazon OpenSearch Service Ingestion – You can migrate your data from Elasticsearch version 7.x clusters to the latest versions of Amazon OpenSearch Service and use persistent buffering to protect the durability of incoming data.

Amazon RDS –Amazon RDS for MySQL now supports creating active-active clusters using the Group Replication plugin, upgrading MySQL 5.7 snapshots to MySQL 8.0, and Innovation Release version of MySQL 8.1.

Amazon RDS Custom for SQL Server extends point-in-time recovery support for up to 1,000 databases, supports Service Master Key Retention to use transparent data encryption (TDE), table- and column-level encryption, DBMail and linked servers, and use SQL Server Developer edition with the bring your own media (BYOM).

Additionally, Amazon RDS Multi-AZ deployments with two readable standbys now supports minor version upgrades and system maintenance updates with typically less than one second of downtime when using Amazon RDS Proxy.

AWS Partner Central – You can use an improved user experience in AWS Partner Central to build and promote your offerings and the new Investments tab in the Partner Analytics Dashboard to gain actionable insights. You can now link accounts and associated users between Partner Central and AWS Marketplace and use an enhanced co-sell experience with APN Customer Engagements (ACE) manager.

Amazon QuickSight – You can programmatically manage user access and custom permissions support for roles to restrict QuickSight functionality to the QuickSight account for IAM Identity Center and Active Directory using APIs. You can also use shared restricted folders, a Contributor role and support for data source asset types in folders and the Custom Week Start feature, an addition designed to enhance the data analysis experience for customers across diverse industries and social contexts.

AWS Trusted Advisor – You can use new APIs to programmatically access Trusted Advisor best practices checks, recommendations, and prioritized recommendations and 37 new Amazon RDS checks that provide best practices guidance by analyzing DB instance configuration, usage, and performance data.

There’s a lot more launch news that I haven’t covered. See AWS What’s New for more details.

See you virtually in AWS re:Invent
AWS re:Invent 2023Next week we’ll hear the latest from AWS, learn from experts, and connect with the global cloud community in Las Vegas. If you come, check out the agenda, session catalog, and attendee guides before your departure.

If you’re not able to attend re:Invent in person this year, we’re offering the option to livestream our Keynotes and Innovation Talks. With the registration for online pass, you will have access to on-demand keynote, Innovation Talks, and selected breakout sessions after the event.

Channy

Triggering AWS Lambda function from a cross-account Amazon Managed Streaming for Apache Kafka

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/compute/triggering-aws-lambda-function-from-a-cross-account-amazon-managed-streaming-for-apache-kafka/

This post is written by Subham Rakshit, Senior Specialist Solutions Architect, and Ismail Makhlouf, Senior Specialist Solutions Architect.

Many organizations use a multi-account strategy for stream processing applications. This involves decomposing the overall architecture into a single producer account and many consumer accounts. Within AWS, in the producer account, you can use Amazon Managed Streaming for Apache Kafka (Amazon MSK), and in their consumer accounts have AWS Lambda functions for event consumption. This blog post explains how you can trigger Lambda functions from a cross-account Amazon MSK cluster.

The Lambda event sourcing mapping (ESM) for Amazon MSK continuously polls for new events from the Amazon MSK cluster, aggregates them into batches, and then triggers the target Lambda function. The ESM for Amazon MSK functions as a serverless set of Kafka consumers that ensures that each event is processed at least once. Additionally, events are processed in the same order they are received within each Kafka partition. In addition, the ESM batches the stream of data and filters the events based on configured logic.

Overview

Amazon MSK supports two different deployment types: provisioned and serverless. Triggering a Lambda function from a cross-account Amazon MSK cluster is only supported with a provisioned cluster deployed within the same Region. To facilitate this functionality, Amazon MSK uses multi-VPC private connectivity, powered by AWS PrivateLink, which simplifies connecting Kafka consumers hosted in different AWS accounts to an Amazon MSK cluster.

The following diagram illustrates the architecture of this example:

Architecture Diagram

The architecture is divided in two parts: the producer and the consumer.

In the producer account, you have the Amazon MSK cluster with multi-VPC connectivity enabled. Multi-VPC connectivity is only available for authenticated Amazon MSK clusters. Cluster policies are required to grant permissions to other AWS accounts, allowing them to establish private connectivity to the Amazon MSK cluster. You can delegate permissions to relevant roles or users. When combined with AWS Identity and Access Management (IAM) client authentication, cluster policies offer fine-grained control over Kafka data plane permissions for connecting applications.

In the consumer account, you have the Lambda ESM for Amazon MSK and the managed VPC connection deployed within the same VPC. The managed VPC connection allows private connectivity from the consumer application VPC to the Amazon MSK cluster. The Lambda ESM for Amazon MSK connects to the cross-account Amazon MSK cluster via IAM authentication. It also supports SASL/SCRAM, and mutual TLS (mTLS) authenticated clusters. The ESM receives the event from the Kafka topic and invokes the Lambda function to process it.

Deploying the example application

To set up the Lambda function trigger from a cross-account Amazon MSK cluster as the event source, follow these steps. The AWS CloudFormation templates for deploying the example are accessible in the GitHub repository.

As a part of this example, some sample data is published using the Kafka console producer and Lambda processes these events and writes to Amazon S3.

Pre-requisites

For this example, you need two AWS accounts. This post uses the following naming conventions:

  • Producer (for example, account No: 1111 1111 1111): Account that hosts the Amazon MSK cluster and Kafka client instance.
  • Consumer (for example, account No: 2222 2222 2222): Account that hosts the Lambda function and consumes events from Amazon MSK.

To get started:

  1. Clone the repository locally:
    git clone https://github.com/aws-samples/lambda-cross-account-msk.git
  2. Set up the producer account: you must configure the VPC networking, deploy the Amazon MSK cluster, and a Kafka client instance to publish data. To do this, deploy the CloudFormation template producer-account.yaml from the AWS console and take note of the MSKClusterARN from the CloudFormation outputs tab.
  3. Set up the consumer account: To set up the consumer account, you need the Lambda function, IAM role used by the Lambda function, and S3 bucket receiving the data. For this, deploy the CloudFormation template consumer-account.yaml from the AWS console with the input parameter MSKAccountId, that is the producer AWS account ID (for example, account Id: 1111 1111 1111). Note the LambdaRoleArn from the CloudFormation outputs tab.

Setting up multi-VPC connectivity in the Amazon MSK cluster

Once the accounts are created, you must enable connectivity between them. By enabling multi-VPC private connectivity in the Amazon MSK cluster, you set up the network connection to allow the cross-account consumers to connect to the cluster.

  1. In the producer account, navigate to the Amazon MSK console.
  2. Choose producer-cluster, and go to the Properties tab.
  3. Scroll to Networking settings, choose Edit, and select Turn on multi-VPC connectivity. This takes some time, then appears as follows.Networking settings
  4. Add the necessary cluster policy to allow cross-account consumers to connect to Amazon MSK. In the producer account, deploy the CloudFormation template producer-msk-cluster-policy.yaml from the AWS console with the following input parameters:
    • MSKClusterArnAmazon Resource Name (ARN) of the Amazon MSK cluster in producer account. Find this information in the CloudFormation output of producer-account.yaml.
    • LambdaRoleArn – ARN of the IAM role attached to the Lambda function in the consumer account. Find this information in the CloudFormation output of consumer-account.yaml.
    • LambdaAccountId – Consumer AWS account ID (for example, account Id: 2222 2222 2222).

Creating a Kafka topic in Amazon MSK and publishing events

In the producer account, navigate to the Amazon MSK console. Choose the Amazon MSK cluster named producer-cluster. Choose View client information to show the bootstrap server.

Client information

The CloudFormation template also deploys a Kafka client instance to create topics and publish events.

To access the client, go to the Amazon EC2 console and choose the instance producer-KafkaClientInstance1. Connect to EC2 instance with Session Manager:

sudo su - ec2-user
#Set MSK Broker IAM endpoint
export BS=<<Provide IAM bootstrap address here>>

You must use the single-VPC Private endpoint for the Amazon MSK cluster and not the multi-VPC private endpoint, as you are going to publish events from a Kafka console producer from the producer account.

Run these scripts to create the customer topic and publish sample events in the topic:

./kafka_create_topic.sh
./kafka_produce_events.sh

Creating a managed VPC connection in the consumer account

To establish a connection to the Amazon MSK cluster in the producer account, you must create a managed VPC connection in the consumer account. Lambda communicates with cross-account Amazon MSK through this managed VPC connection.

For detailed setup steps, read the Amazon MSK managed VPC connection documentation.

Configuring the Lambda ESM for Amazon MSK

The final step is to set up the Lambda ESM for Amazon MSK. Setting up the ESM enables you to connect to the Amazon MSK cluster in the producer account via the managed VPC endpoint. This allows you to trigger the Lambda function to process the data produced from the Kafka topic:

  1. In the consumer account, go to the Lambda console.
  2. Open the Lambda function msk-lambda-cross-account-iam.
  3. Go to the Configuration tab, select Triggers, and choose Add Trigger.
  4. For Trigger configuration, select Amazon MSK.

Lambda trigger

To configure this trigger:

  1. Select the shared Amazon MSK cluster. This automatically defaults to the IAM authentication that is used to connect to the cluster.
    MSK Lambda trigger
  2. By default, the Active trigger check box is enabled. This ensures that the trigger is in the active state after creation. For the other values:
    1. Keep the Batch size default to 100.
    2. Change the Starting Position to Trim horizon.
    3. Set the Topic name as customer.
    4. Set the Consumer Group ID as msk-lambda-iam.

Trigger configuration

Scroll to the bottom and choose Add. This starts creating the Amazon MSK trigger, which takes several minutes. After creation, the state of the trigger shows as Enabled.

Verifying the output on the consumer side

The Lambda function receives the events and writes them in an S3 bucket.

To validate that the function is working, go to the consumer account and navigate to the S3 console. Search for the cross-account-lambda-consumer-data-<<REGION>>-<<AWS Account Id>> bucket. In the bucket, you see the customer-data-<<datetime>>.csv files.

S3 bucket objects

Cleaning up

You must empty and delete the S3 bucket, managed VPC connection, and the Lambda ESM for Amazon MSK manually from the consumer account. Next, delete the CloudFormation stacks from the AWS console from both the producer and consumer accounts to remove all other resources created as a part of the example.

Conclusion

With Lambda and Amazon MSK, you can now build a decentralized application distributed across multiple AWS accounts. This post shows how you can set up Amazon MSK as an event source for cross-account Lambda functions and also walks you through the configuration required in both producer and consumer accounts.

For further reading on AWS Lambda with Amazon MSK as an event source, visit the documentation.

For more serverless learning resources, visit Serverless Land.

Converting Apache Kafka events from Avro to JSON using EventBridge Pipes

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/converting-apache-kafka-events-from-avro-to-json-using-eventbridge-pipes/

This post is written by Pascal Vogel, Solutions Architect, and Philipp Klose, Global Solutions Architect.

Event streaming with Apache Kafka has become an important element of modern data-oriented and event-driven architectures (EDAs), unlocking use cases such as real-time analytics of user behavior, anomaly and fraud detection, and Internet of Things event processing. Stream producers and consumers in Kafka often use schema registries to ensure that all components follow agreed-upon event structures when sending (serializing) and processing (deserializing) events to avoid application bugs and crashes.

A common schema format in Kafka is Apache Avro, which supports rich data structures in a compact binary format. To integrate Kafka with other AWS and third-party services more easily, AWS offers Amazon EventBridge Pipes, a serverless point-to-point integration service. However, many downstream services expect JSON-encoded events, requiring custom, and repetitive schema validation and conversion logic from Avro to JSON in each downstream service.

This blog post shows how to reliably consume, validate, convert, and send Avro events from Kafka to AWS and third-party services using EventBridge Pipes, allowing you to reduce custom deserialization logic in downstream services. You can also use EventBridge event buses as targets in Pipes to filter and distribute events from Pipes to multiple targets, including cross-account and cross-Region delivery.

This blog describes two scenarios:

  1. Using Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS Glue Schema Registry.
  2. Using Confluent Cloud and the Confluent Schema Registry.

See the associated GitHub repositories for Glue Schema Registry or Confluent Schema Registry for full source code and detailed deployment instructions.

Kafka event streaming and schema validation on AWS

To build event streaming applications with Kafka on AWS, you can use Amazon MSK, offerings such as Confluent Cloud, or self-hosted Kafka on Amazon Elastic Compute Cloud (Amazon EC2) instances.

To avoid common issues in event streaming and event-driven architectures, such as data inconsistencies and incompatibilities, it is a recommended practice to define and share event schemas between event producers and consumers. In Kafka, schema registries are used to manage, evolve, and enforce schemas for event producers and consumers. The AWS Glue Schema Registry provides a central location to discover, manage, and evolve schemas. In the case of Confluent Cloud, the Confluent Schema Registry serves the same role. Both the Glue Schema Registry and the Confluent Schema Registry support common schema formats such as Avro, Protobuf, and JSON.

To integrate Kafka with AWS services, third-party services, and your own applications, you can use EventBridge Pipes. EventBridge Pipes helps you create point-to-point integrations between event sources and targets with optional filtering, transformation, and enrichment. EventBridge Pipes reduces the amount of integration code that you have to write and maintain when building EDAs.

Many AWS and third-party services expect JSON-encoded payloads (events) as input, meaning they cannot directly consume Avro or Protobuf payloads. To replace repetitive Avro-to-JSON validation and conversion logic in each consumer, you can use the EventBridge Pipes enrichment step. This solution uses an AWS Lambda function in the enrichment step to deserialize and validate Kafka events with a schema registry, including error handling with dead-letter queues, and convert events to JSON before passing them to downstream services.

Solution overview

Architecture overview of the solution

The solution presented in this blog post consists of the following key elements:

  1. The source of the pipe is a Kafka cluster deployed using MSK or Confluent Cloud. EventBridge Pipes reads events from the Kafka stream in batches and sends them to the enrichment function (see here for an example event).
  2. The enrichment step (Lambda function) deserializes and validates the events against the configured schema registry (Glue or Confluent), converts events from Avro to JSON with integrated error handling, and returns them to the pipe.
  3. The target of this example solution is an EventBridge custom event bus that is invoked by EventBridge Pipes with JSON-encoded events returned by the enrichment Lambda function. EventBridge Pipes supports a variety of other targets, including Lambda, AWS Step Functions, Amazon API Gateway, API destinations, and more, enabling you to build EDAs without writing integration code.
  4. In this sample solution, the event bus sends all events to Amazon CloudWatch Logs via an EventBridge rule. You can extend the example to invoke additional EventBridge targets.

Optionally, you can add OpenAPI 3 or JSONSchema Draft 4 schemas for your events in the EventBridge schema registry by either manually generating it from the Avro schema or using EventBridge schema discovery. This allows you to download code bindings for the JSON-converted events for various programming languages, such as JavaScript, Python, and Java, to correctly use them in your EventBridge targets.

The remainder of this blog post describes this solution for the Glue and Confluent schema registries with code examples.

EventBridge Pipes with the Glue Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Glue Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

You need an Amazon MSK serverless cluster running and the Glue Schema registry configured. This example includes a Avro schema and a Glue Schema Registry. See the following AWS blog post for an introduction to schema validation with the Glue Schema Registry: Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry.

EventBridge Pipes configuration

Use the AWS Cloud Development Kit (AWS CDK) template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Amazon MSK Serverless Kafka topic as the source via AWS Identity and Access Management (IAM) authentication.
  2. EventBridge Pipes reads events from your Kafka topic using the Amazon MSK source type.
  3. An enrichment Lambda function in Java to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An Amazon Simple Queue Service (Amazon SQS) dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule sends all incoming events into a CloudWatch Logs log group.

For MSK-based sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

EventBridge Pipes with the Confluent Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Confluent Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

To set up this solution, you need a Kafka stream running on Confluent Cloud as well as the Confluent Schema Registry set up. See the corresponding Schema Registry tutorial for Confluent Cloud to set up a schema registry for your Confluent Kafka stream.

To connect to your Confluent Cloud Kafka cluster, you need an API key for Confluent Cloud and Confluent Schema Registry. AWS Secrets Manager is used to securely store your Confluent secrets.

EventBridge Pipes configuration

Use the AWS CDK template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Confluent Kafka topic as the source via an API secret stored in Secrets Manager.
  2. EventBridge Pipes reads events from your Confluent Kafka topic using the self-managed Apache Kafka stream source type, which includes all non-MSK Kafka clusters.
  3. An enrichment Lambda function in Python to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An SQS dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule writes all incoming events into a CloudWatch Logs log group.

For self-managed Kafka sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

Enrichment Lambda functions

Both of the solutions described previously include an enrichment Lambda function for schema validation and conversion from Avro to JSON.

The Java Lambda function integrates with the Glue Schema Registry using the AWS Glue Schema Registry Library. The Python Lambda function integrates with the Confluent Schema Registry using the confluent-kafka library and uses Powertools for AWS Lambda (Python) to implement Serverless best practices such as logging and tracing.

The enrichment Lambda functions perform the following tasks:

  1. In the events polled from the Kafka stream by the EventBridge pipe, the key and value of the event are base64 encoded. Therefore, for each event in the batch passed to the function, the key and the value are decoded.
  2. The event key is assumed to be serialized by the producer as a string type.
  3. The event value is deserialized using the Glue Schema registry Serde (Java) or the confluent-kafka AvroDeserializer (Python).
  4. The function then returns the successfully converted JSON events to the EventBridge pipe, which then invokes the target for each of them.
  5. Events for which Avro deserialization failed are sent to the SQS dead letter queue.

Conclusion

This blog post shows how to implement event consumption, Avro schema validation, and conversion to JSON using Amazon EventBridge Pipes, Glue Schema Registry, and Confluent Schema Registry.

The source code for the presented example is available in the AWS Samples GitHub repository for Glue Schema Registry and Confluent Schema Registry. For more patterns, visit the Serverless Patterns Collection.

For more serverless learning resources, visit Serverless Land.

Amazon MSK Serverless now supports Kafka clients written in all programming languages

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/amazon-msk-serverless-now-supports-kafka-clients-written-in-all-programming-languages/

Amazon MSK Serverless is a cluster type for Amazon Managed Streaming for Apache Kafka (Amazon MSK) that is the most straightforward way to run Apache Kafka clusters without having to manage compute and storage capacity. With MSK Serverless, you can run your applications without having to provision, configure, or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring and moving them to even load across a cluster.

With today’s launch, MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. Administrators can simplify and standardize access control to Kafka resources using AWS Identity and Access Management (IAM). This support for IAM in Amazon MSK is based on SASL/OUATHBEARER, an open standard for authorization and authentication.

In this post, we show how you can connect your applications to MSK Serverless with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including Java, Python, Go, JavaScript, and .NET. Using IAM authentication and authorization is the preferred choice of many customers because you can secure Kafka resources just like you do with all other AWS services. Additionally, you get all the other benefits of IAM, such as temporary role-based credentials, precisely scoped permission policies, and more. Now you can use MSK Serverless with IAM-based authentication more broadly with the support for multiple languages.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. For example, you can create an IAM user and a policy that allows the user to write to a specific Kafka topic but restricts access to other resources without worrying about managing Kafka ACLs. After you provide the identity policies with the necessary permissions, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. With OAUTHBEARER support, you can build clients that can work across both Amazon MSK and other Kafka environments. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, Amazon MSK provides new code libraries for the following programming languages in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the bootstrap address along with its request to access Apache Kafka resources.
  3. The MSK Serverless cluster decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK Serverless clusters with no additional cost. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. You also need to make sure the security group associated with your MSK Serverless cluster has an inbound rule allowing the traffic from the client applications in the associated VPCs to the TCP port number 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

For this post, we use the Python programming language. We also use https://github.com/dpkp/kafka-python as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for Python with your Kafka client library, run the following command:
    $ pip install aws-msk-iam-sasl-signer-python

  2. Import the installed Amazon MSK IAM SASL signer library in your code:
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import socket
    import time
    from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    class MSKTokenProvider():
            def token(self):
                token, _ = MSKAuthTokenProvider.generate_auth_token('<your aws region>')
                return token

  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your Python Kafka client properties, and pass the token provider in the configuration object:
    tp = MSKTokenProvider()
    
        producer = KafkaProducer(
            bootstrap_servers='<Amazon MSK Serverless bootstrap string>',
            security_protocol='SASL_SSL',
            sasl_mechanism='OAUTHBEARER',
            sasl_oauth_token_provider=tp,
            client_id=”my.kafka.client.unique.id”,
        )

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. You can run your applications without having to configure and manage the infrastructure or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring, and ensures an even balance of partition distribution across brokers in the cluster (auto-balancing).

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Amazon MSK IAM authentication now supports all programming languages

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/amazon-msk-iam-authentication-now-supports-all-programming-languages/

The AWS Identity and Access Management (IAM) authentication feature in Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports all programming languages. Administrators can simplify and standardize access control to Kafka resources using IAM. This support is based on SASL/OUATHBEARER, an open standard for authorization and authentication. Both Amazon MSK provisioned and serverless cluster types support the new Amazon MSK IAM expansion to all programming languages.

In this post, we show how you can connect your applications to MSK clusters with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including JavaPythonGoJavaScript, and .NET. You can also use standard IAM access controls such as temporary role-based credentials and precisely scoped permission policies more broadly with the multiple language support on Amazon MSK.

For clients that need to connect from other VPCs to an MSK cluster, whether in a same or a different AWS account, you can enable multi-VPC private connectivity and cluster policy support. IAM access control via cluster policy helps you manage all access to the cluster and topics in one place. For example, you can control which IAM principals have write access to certain topics, and which principals can only read from them. Users who are using IAM client authentication can also add permissions for required kafka-cluster actions in the cluster resource policy.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. After IAM authentication is enabled for your cluster, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, new code libraries for the following programming languages are available in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the IAM bootstrap broker addresses along with its request to access Apache Kafka resources.
  3. The MSK broker decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK clusters with Kafka version 2.7.1 and above. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Set up the MSK cluster

You need to enable the IAM access control authentication scheme for your MSK provisioned cluster and wait until the cluster finishes updating and turns to the Active state. This is because SASL/OAUTHBEARER uses the same broker addresses for IAM authentication.

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. Next, update your application to use the bootstrap server addresses for IAM authentication. You also need to make sure the security group associated with your MSK cluster has an inbound rule allowing the traffic from the client applications in the same VPC as the cluster to the TCP port 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

In this post, we use the JavaScript programming language. We also use https://github.com/tulios/kafkajs as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for JavaScript with your Kafka client library, run the following command:
    npm install https://github.com/aws/aws-msk-iam-sasl-signer-js

  2. You need to import the installed Amazon MSK IAM SASL signer library in your code:
    const { Kafka } = require('kafkajs')
    
    const { generateAuthToken } = require('aws-msk-iam-sasl-signer-js')

  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    async function oauthBearerTokenProvider(region) {
        // Uses AWS Default Credentials Provider Chain to fetch credentials
        const authTokenResponse = await generateAuthToken({ region });
        return {
            value: authTokenResponse.token
        }
    }

  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your JavaScript Kafka client properties, and pass the token provider in the configuration object:
    const run = async () => {
        const kafka = new Kafka({
            clientId: 'my-app',
            brokers: [bootstrap server addresses for IAM],
            ssl: true,
            sasl: {
                mechanism: 'oauthbearer',
                oauthBearerProvider: () => oauthBearerTokenProvider('us-east-1')
            }
        })
    
        const producer = kafka.producer()
        const consumer = kafka.consumer({ groupId: 'test-group' })
    
        // Producing
        await producer.connect()
        await producer.send({
            topic: 'test-topic',
            messages: [
                { value: 'Hello KafkaJS user!' },
            ],
        })
    
        // Consuming
        await consumer.connect()
        await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
    
        await consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                console.log({
                    partition,
                    offset: message.offset,
                    value: message.value.toString(),
                })
            },
        })
    }
    
    run().catch(console.error)

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

IAM access control for Amazon MSK enables you to handle both authentication and authorization for your MSK cluster. This eliminates the need to use one mechanism for authentication and another for authorization. For example, when a client tries to write to your cluster, Amazon MSK uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

With today’s launch, Amazon MSK IAM authentication now supports all programming languages. This means you can connect your applications in all languages without worrying about implementing separate authentication and authorization mechanisms. For workloads that require Amazon MSK multi-VPC private connectivity and cluster policy support, you can now simplify connectivity to your MSK brokers and manage all access to the cluster and topics in one place that is your cluster policy.

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Scaling improvements when processing Apache Kafka with AWS Lambda

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/scaling-improvements-when-processing-apache-kafka-with-aws-lambda/

AWS Lambda is improving the automatic scaling behavior when processing data from Apache Kafka event-sources. Lambda is increasing the default number of initial consumers, improving how quickly consumers scale up, and helping to ensure that consumers don’t scale down too quickly. There is no additional action that you must take, and there is no additional cost.

Running Kafka on AWS

Apache Kafka is a popular open-source platform for building real-time streaming data pipelines and applications. You can deploy and manage your own Kafka solution on-premises or in the cloud on Amazon EC2.

Amazon Managed Streaming for Apache Kafka (MSK) is a fully managed service that makes it easier to build and run applications that use Kafka to process streaming data. MSK Serverless is a cluster type for Amazon MSK that allows you to run Kafka without having to manage and scale cluster capacity. It automatically provisions and scales capacity while managing the partitions in your topic, so you can stream data without thinking about right-sizing or scaling clusters. MSK Serverless offers a throughput-based pricing model, so you pay only for what you use. For more information, see Using Kafka to build your streaming application.

Using Lambda to consume records from Kafka

Processing streaming data can be complex in traditional, server-based architectures, especially if you must react in real-time. Many organizations spend significant time and cost managing and scaling their streaming platforms. In order to react fast, they must provision for peak capacity, which adds complexity.

Lambda and serverless architectures remove the undifferentiated heavy lifting when processing Kafka streams. You don’t have to manage infrastructure, can reduce operational overhead, lower costs, and scale on-demand. This helps you focus more on building streaming applications. You can write Lambda functions in a number of programming languages, which provide flexibility when processing streaming data.

Lambda event source mapping

Lambda can integrate natively with your Kafka environments as a consumer to process stream data as soon as it’s generated.

To consume streaming data from Kafka, you configure an event source mapping (ESM) on your Lambda functions. This is a resource managed by the Lambda service, which is separate from your function. It continually polls records from the topics in the Kafka cluster. The ESM optionally filters records and batches them into a payload. Then, it calls the Lambda Invoke API to deliver the payload to your Lambda function synchronously for processing.

As Lambda manages the pollers, you don’t need to manage a fleet of consumers across multiple teams. Each team can create and configure their own ESM with Lambda handling the polling.

AWS Lambda event source mapping

AWS Lambda event source mapping

For more information on using Lambda to process Kafka streams, see the learning guide.

Scaling and throughput

Kafka uses partitions to increase throughput and spread the load of records to all brokers in a cluster.

The Lambda event source mapping resource includes pollers and processors. Pollers have consumers that read records from Kafka partitions. The poller assigners send them to processors which batch the records and invoke your function.

When you create a Kafka event source mapping, Lambda allocates consumers to process all partitions in the Kafka topic. Previously, Lambda allocated a minimum of one processor for a consumer.

Lambda previous initial scaling

Lambda previous initial scaling

With these scaling improvements, Lambda allocates multiple processors to improve processing. This reduces the possibility of a single invoke slowing down the entire processing stream.

Lambda updated initial scaling

Lambda updated initial scaling

Each consumer sends records to multiple processors running in parallel to handle increased workloads. Records in each partition are only assigned to a single processor to maintain order.

Lambda automatically scales up or down the number of consumers and processors based on workload. Lambda samples the consumer offset lag of all the partitions in the topic every minute. If the lag is increasing, this means Lambda can’t keep up with processing the records from the partition.

The scaling algorithm accounts for the current offset lag, and also the rate of new messages added to the topic. Lambda can reach the maximum number of consumers within three minutes to lower the offset lag as quickly as possible. Lambda is also reducing the scale down behavior to ensure records are processed more quickly and latency is reduced, particularly for bursty workloads.

Total processors for all pollers can only scale up to the total number of partitions in the topic.

After successful invokes, the poller periodically commits offsets to the respective brokers.

Lambda further scaling

Lambda further scaling

You can monitor the throughput of your Kafka topic using consumer metrics consumer_lag and consumer_offset.

To check how many function invocations occur in parallel, you can also monitor the concurrency metrics for your function. The concurrency is approximately equal to the total number of processors across all pollers, depending on processor activity. For example, if three pollers have five processors running for a given ESM, the function concurrency would be approximately 15 (5 + 5 + 5).

Seeing the improved scaling in action

There are a number of Serverless Patterns that you can use to process Kafka streams using Lambda. To set up Amazon MSK Serverless, follow the instructions in the GitHub repo:

  1. Create an example Amazon MSK Serverless topic with 1000 partitions.
  2. ./kafka-topics.sh --create --bootstrap-server "{bootstrap-server}" --command-config client.properties --replication-factor 3 --partitions 1000 --topic msk-1000p
  3. Add records to the topic using a UUID as a key to distribute records evenly across partitions. This example adds 13 million records.
  4. for x in {1..13000000}; do echo $(uuidgen -r),message_$x; done | ./kafka-console-producer.sh --broker-list "{bootstrap-server}" --topic msk-1000p --producer.config client.properties --property parse.key=true --property key.separator=, --producer-property acks=all
  5. Create a Python function based on this pattern, which logs the processed records.
  6. Amend the function code to insert a delay of 0.1 seconds to simulate record processing.
  7. import json
    import base64
    import time
    
    def lambda_handler(event, context):
        # Define a variable to keep track of the number of the message in the batch of messages
        i=1
        # Looping through the map for each key (combination of topic and partition)
        for record in event['records']:
            for messages in event['records'][record]:
                print("********************")
                print("Record number: " + str(i))
                print("Topic: " + str(messages['topic']))
                print("Partition: " + str(messages['partition']))
                print("Offset: " + str(messages['offset']))
                print("Timestamp: " + str(messages['timestamp']))
                print("TimestampType: " + str(messages['timestampType']))
                if None is not messages.get('key'):
                    b64decodedKey=base64.b64decode(messages['key'])
                    decodedKey=b64decodedKey.decode('ascii')
                else:
                    decodedKey="null"
                if None is not messages.get('value'):
                    b64decodedValue=base64.b64decode(messages['value'])
                    decodedValue=b64decodedValue.decode('ascii')
                else:
                    decodedValue="null"
                print("Key = " + str(decodedKey))
                print("Value = " + str(decodedValue))
                i=i+1
                time.sleep(0.1)
        return {
            'statusCode': 200,
        }
    
  8. Configure the ESM to point to the previously created cluster and topic.
  9. Use the default batch size of 100. Set the StartingPosition to TRIM_HORIZON to process from the beginning of the stream.
  10. Deploy the function, which also adds and configures the ESM.
  11. View the Amazon CloudWatch ConcurrentExecutions and OffsetLag metrics to view the processing.

With the scaling improvements, once the ESM is configured, the ESM and function scale up to handle the number of partitions.

Lambda automatic scaling improvement graph

Lambda automatic scaling improvement graph

Increasing data processing throughput

It is important that your function can keep pace with the rate of traffic. A growing offset lag means that the function processing cannot keep up. If the age is high in relation to the stream’s retention period, you can lose data as records expire from the stream.

This value should generally not exceed 50% of the stream’s retention period. When the value reaches 100% of the stream retention period, data is lost. One temporary solution is to increase the retention time of the stream. This gives you more time to resolve the issue before losing data.

There are several ways to improve processing throughput.

  1. Avoid processing unnecessary records by using content filtering to control which records Lambda sends to your function. This helps reduce traffic to your function, simplifies code, and reduces overall cost.
  2. Lambda allocates processors across all pollers based on the number of partitions up to a maximum of one concurrent Lambda function per partition. You can increase the number of processing Lambda functions by increasing the number of partitions.
  3. For compute intensive functions, you can increase the memory allocated to your function, which also increases the amount of virtual CPU available. This can help reduce the duration of a processing function.
  4. Lambda polls Kafka with a configurable batch size of records. You can increase the batch size to process more records in a single invocation. This can improve processing time and reduce costs, particularly if your function has an increased init time. A larger batch size increases the latency to process the first record in the batch, but potentially decreases the latency to process the last record in the batch. There is a tradeoff between cost and latency when optimizing a partition’s capacity and the decision depends on the needs of your workload.
  5. Ensure that your producers evenly distribute records across partitions using an effective partition key strategy. A workload is unbalanced when a single key dominates other keys, creating a hot partition, which impacts throughput.

See Increasing data processing throughput for some additional guidance.

Conclusion

Today, AWS Lambda is improving the automatic scaling behavior when processing data from Apache Kafka event-sources. Lambda is increasing the default number of initial consumers, improving how quickly they scale up, and ensuring they don’t scale down too quickly. There is no additional action that you must take, and there is no additional cost.

You can explore the scaling improvements with your existing workloads or deploy an Amazon MSK cluster and try one of the patterns to measure processing time.

To explore using Lambda to process Kafka streams, see the learning guide.

For more serverless learning resources, visit Serverless Land.