Tag Archives: Event Sourcing

Towards a Reliable Device Management Platform

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/towards-a-reliable-device-management-platform-4f86230ca623

By Benson Ma, Alok Ahuja

Introduction

At Netflix, hundreds of different device types, from streaming sticks to smart TVs, are tested every day through automation to ensure that new software releases continue to deliver the quality of the Netflix experience that our customers enjoy. In addition, Netflix continuously works with its partners (such as Roku, Samsung, LG, Amazon) to port the Netflix SDK to their new and upcoming devices (TVs, smart boxes, etc), to ensure the quality bar is reached before allowing the Netflix application on the device to go out into the world. The Partner Infrastructure team at Netflix provides solutions to support these two significant efforts by enabling device management at scale.

Background

To normalize the diversity of networking environments across both the Netflix and Partner networks and create a consistent and controllable computing environment on which users can run regression and Netflix application certification testing for devices, the Partner Infrastructure team provides a customized embedded computer called the Reference Automation Environment (RAE). Complementing the hardware is the software on the RAE and in the cloud, and bridging the software on both ends is a bi-directional control plane. Together, they form the Device Management Platform, which is the infrastructural foundation for Netflix Test Studio (NTS). Users then effectively run tests by connecting their devices to the RAE in a plug-and-play fashion.

The platform allows for effective device management at scale, and its feature set is broadly divided into two areas:

  1. Provide a service-level abstraction for controlling devices and their environments (hardware and software topologies).
  2. Collect and aggregate information and state updates for all devices attached to the RAEs in the fleet. In this blog post, we will focus on the latter feature set.

Over the lifecycle of a device connected to the RAE, the device can change attributes at any time. For example, when running tests, the state of the device will change from “available for testing” to “in test.” In addition, because many of these devices are pre-production devices and thus subject to frequent firmware changes, attributes that are generally static in production devices can sometimes change as well, such as the MAC address and the Electronic Serial Number (ESN) assigned to the Netflix installation on the device. As such, it is very critical to be able to keep device information up to date for device tests to work properly. In the Device Management Platform, this is achieved by having device updates be event-sourced through the control plane to the cloud so that NTS will always have the most up-to-date information about the devices available for testing. The challenge, then, is to be able to ingest and process these events in a scalable manner, i.e., scaling with the number of devices, which will be the focus of this blog post.

System Setup

Architecture

The following diagram summarizes the architecture description:

Figure 1: Event-sourcing architecture of the Device Management Platform.

The RAE is configured to be effectively a router that devices under test (DUTs) are connected to. On the RAE, there exists a service called the Local Registry, which is responsible for detecting, onboarding, and maintaining information about all devices connected to the LAN side of the RAE. When a new hardware device is connected, the Local Registry detects and collects a set of information about it, such as networking information and ESN. At periodic intervals, the Local Registry probes the device to check on its connection status. As the device attributes and properties change over time, these changes are saved into the Local Registry and simultaneously published upstream to the Device Management Platform’s control plane. In addition to attribute changes, a complete snapshot of the device record is published upstream by the Local Registry at regular intervals as a form of state reconciliation. These checkpoint events enable faster state reconstruction by consumers of the data feed while guarding against missed updates.

On the cloud side, a service called the Cloud Registry ingests the device information updates published by the Local Registry instance, processes them, and subsequently pushes materialized data into a datastore backed by CockroachDB. CockroachDB is chosen as the backing data store since it offered SQL capabilities, and our data model for the device records was normalized. In addition, unlike other SQL stores, CockroachDB is designed from the ground up to be horizontally scalable, which addresses our concerns about Cloud Registry’s ability to scale up with the number of devices onboarded onto the Device Management Platform.

Control Plane

MQTT forms the basis of the control plane for the Device Management Platform. MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT) and was designed as a highly lightweight yet reliable publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT clients connect to the MQTT broker and send messages prefixed with a topic. In contrast, the broker is responsible for receiving all messages, filtering them, determining who is subscribed to which topic, and sending the messages to the subscribed clients accordingly. The key features that make MQTT highly appealing to us are its support for hierarchical topics, client authentication and authorization, per-topic ACLs, and bi-directional request/response message patterns, all of which are crucial for the business use cases we have for the control plane.

Inside the control plane, device commands and device information updates are prefixed with a topic string that includes both the RAE serial number and the device_session_id, which is a UUID corresponding to a device session. Embedding these two bits of information into the topic for every message allows for us to apply topic ACLs and effectively control which RAEs and DUTs users can see and interact with, in the safety and isolation against other users’ devices.

Since Kafka is a supported messaging platform at Netflix, a bridge is established between the two protocols to allow cloud-side services to communicate with the control plane. Through the bridge, MQTT messages are converted directly to Kafka records, where the record key is set to be the MQTT topic that the message was assigned to. Since device information updates published on MQTT contain the device_session_id in the topic, this means that all device information updates for a given device session will effectively appear on the same Kafka partition, thus giving us a well-defined message order for consumption.

Canary Test Workloads

In addition to serving the regular message traffic between users and DUTs, the control plane itself is stress-tested at roughly 3-hour intervals, where nearly 3000 ephemeral MQTT clients are created to connect to and generate flash traffic on the MQTT brokers. This is intended to be a canary test to verify that the brokers are online and able to handle sudden influxes of client connections and high message loads. As such, we can see that the traffic load on the Device Management Platform’s control plane is very dynamic over time.

Adherence to the Paved-Path

At Netflix, we emphasize building out solutions that use paved-path tooling as much as possible (see posts here and here). In particular, the flavor of Spring Boot Native maintained by the Runtime team is the basis for many of the web services developed inside Netflix (including the Cloud Registry). The Netflix Spring package comes with all the integrations needed for applications to work seamlessly within the Netflix ecosystem. In particular, the Kafka integration is the most relevant for this blog post.

Translating to System Requirements

Given the system setup that we have described, we came up with a list of fundamental business requirements that the Cloud Registry’s Kafka-based device updates processing solution must address.

Back-Pressure Support

Because the processing workload varies significantly over time, the solution must first and foremost scale with the message load by providing back-pressure support as defined in the Reactive Streams specification — in other words, the solution should be able to switch between push and pull-based back-pressure models depending on the downstream being able to cope with the message production rate or not.

In-Order Processing

The semantics of correct device information updates ingestion requires that messages be consumed in the order that they are produced. Since message order is guaranteed per Kafka partition, and all updates for a given device session are assigned to the same partition, this means that the order of processing of updates for each device can be enforced as long as only one thread is assigned per partition. At the same time, events arriving on different partitions should be processed in parallel for maximum throughput.

Fault Tolerance

If the underlying KafkaConsumer crashes due to ephemeral system or network events, it should be automatically restarted. If an exception is thrown during the consumption of a message, the exception should be gracefully caught, and message consumption should seamlessly continue after the offending message is dropped.

Graceful Shutdown

Application shutdowns are necessary and inevitable when a service is re-deployed, or its instance group is resized. As such, processor shutdowns should be invokable from outside of the Kafka consumption context to facilitate graceful application termination. In addition, since Kafka messages are usually pulled down in batches by the KafkaConsumer, the implemented solution should, upon receiving the shutdown signal, consume and drain all the already-fetched messages remaining in its internal queue prior to shutting down.

Paved-Path Integration

As mentioned earlier, Spring is heavily employed as the paved-path solution for developing services at Netflix, and the Cloud Registry is a Spring Boot Native application. Thus, the implemented solution must integrate with Netflix Spring facilities for authentication and metrics support at the very minimum — the former for access to the Kafka clusters and the latter for service monitoring and alerts. In addition, the lifecycle management of the implemented solution must also be integrated into Spring’s lifecycle management.

Long-Term Maintainability

The implemented solution must be friendly enough for long-term maintenance support. This means that it must at the very least be unit- and functional-testable for rapid and iterative feedback-driven development, and the code must be reasonably ergonomic to lower the learning curve for new maintainers.

Adopting a Stream Processing Framework

There are many frameworks available for reliable stream processing for integration into web services (for example, Kafka Streams, Spring KafkaListener, Project Reactor, Flink, Alpakka-Kafka, to name a few). We chose Alpakka-Kafka as the basis of the Kafka processing solution for the following reasons.

  1. Alpakka-Kafka turns out to satisfy all of the system requirements we laid out, including the need for Netflix Spring integration. It further provides advanced and fine-grained control over stream processing, including automatic back-pressure support and streams supervision.
  2. Compared to the other solutions that may satisfy all of our system requirements, Akka is a much more lightweight framework, with its integration into a Spring Boot application being relatively short and concise. In addition, Akka and Alpakka-Kafka code is much less terse than the other solutions out there, which lowers the learning curve for maintainers.
  3. The maintenance costs over time for an Alpakka-Kafka-based solution is much lower than that for the other solutions, as both Akka and Alpakka-Kafka are mature ecosystems in terms of documentation and community support, having been around for at least 12 and 6 years, respectively.

The construction of the Alpakka-based Kafka processing pipeline can be summarized with the following diagram:

Figure 2: Kafka processing pipeline employed by the Cloud Registry.

Implementation

The integration of Alpakka-Kafka streams with the Netflix Spring application context is very straightforward and is implemented as follows:

  1. Import the Alpakka-Kafka library in build.gradle, but exclude the kafka-client transitive dependency that comes packaged with it so that the Netflix internal-enhanced variant is used.
  2. Build a Spring @Configuration class that autowires the KafkaProperties bean injected by the Netflix Spring runtime and, using the Kafka settings available from that bean, construct an Alpakka-Kafka ConsumerSettings bean.
  3. Construct an Alpakka-Kafka processing graph using the ConsumerSettings bean as an input.

Because this integration explicitly uses the Netflix-enhanced KafkaConsumer and Netflix Spring-injected Kafka settings, the authentication, and metrics-logging facilities that come with the paved-path Spring KafkaListener are immediately enjoyed by the Alpakka-Kafka-based solution.

Testing

Functional testing of the Alpakka-Kafka consumers is very straightforward with the EmbeddedKafka library, which provides an in-memory Kafka instance to run tests against. To scale up testing with the complexity of the Kafka message processing pipeline, the message processing code was separated from the Alpakka-Kafka graph code. This allowed the message processing code to be tested separately using functional tests while minimizing the surface area of required testing by EmbeddedKafka-based Kafka integration tests.

Results

Prior to Alpakka-Kafka

The original Kafka processing solution implemented in the Cloud Registry was built on Spring KafkaListener, primarily due to its immediate availability as a paved-path solution provided by Netflix Spring. A timeline of the transition from Spring KafkaListener to Alpakka-Kafka is presented here for a better understanding of the motivations for the transition.

Memory and GC Troubles

The Spring KafkaListener-based solution was deployed earlier this year, during which messages on the Kafka topic were sparse because the Local Registry was not fully in production at the time. Upstream event sourcing was fully enabled on the producer side at around 2021–07–15 15:00 PST. By the following morning, alerts were received regarding high memory consumption and GC latencies, to the point where the service was unresponsive to HTTP requests. An investigation of the JVM memory dump revealed an internal Kafka message concurrent queue whose size had grown uncontrollably to over 1.3 million elements.

The cause for this abnormal queue growth is due to Spring KafkaListener’s lack of native back-pressure support. With KafkaListener, the Kafka message fetch rate is fixed on application startup. However, it can be adjusted by tuning the max-poll-interval-ms and max-poll-records configuration values, which need to be somehow determined empirically beforehand for best performance. This setup is neither optimal nor break-proof since the Kafka message processing rate will vary depending on environmental factors, such as database latencies in our system setup. As a result, the KafkaListener ends up effectively over-consuming messages over time, which is manifested in the growth of its internal message queue.

After doubling the number of service instances and increasing the instance sizes with only mediocre success, the decision was made to look into an alternative Kafka processing solution with full back-pressure management capabilities.

Kafka Topic Metrics

The enabling of event-sourcing from Local Registry significantly increased the Device Management Platform’s control plane traffic, as evidenced by the 9x growth of Kafka topic message publication frequency from 100 messages / 90 kB incoming per second to 900 messages / 840kB incoming per second (Figure 3).

Figure 3: Message traffic over time before and after event-sourcing was enabled.

The spikes that occur on 3-hour intervals shown here correspond to the canary runs mentioned earlier that effectively load-test the Kafka topic with a flood of new records. Hereafter, they will be referred to as burst events. While the average message publication rate is low compared to the data systems out there that produce hundreds of thousands, if not millions, events per second, it does highlight the significance of having back-pressure management in place even at the lower end of the message load spectrum.

Kafka Consumption Improvements with Alpakka-Kafka

We now compare the Kafka consumption between the Spring KafkaListener-based Kafka processing solution and the Alpakka-Kafka-based solution, the latter of which was deployed to production on 2021–07–23 18:00 PST. In particular, we will look at three indicators of Kafka consumption performance: the message fetch rate, the max consumer lag, and the commit rate.

Fetch Request Metrics

Upon deployment of the Alpakka-Kafka-based processor, we made a few observations:

  • Prior to the deployment, the number of fetch calls over time generally remained unchanged across burst events but was otherwise actually quite unstable over time (Figure 4).
  • After the deployment, the fetch calls over time followed a 1:1 correspondence with the Kafka topic’s message publication rate, including the interval burst events (Figure 4). Outside of the burst event windows, the number of fetch calls over time was very stable.
  • Surprisingly, the average number of records fetched per fetch request during the burst events windows decreased compared to that of the Spring KafkaListener-based processor (Figure 5).

What we can infer from these observations is that, with native back-pressure support in place, the Alpakka-Kafka-based processor is able to dynamically scale its Kafka consumption such that it is never under-consuming or over-consuming Kafka messages. This behavior keeps the processor constantly busy enough, but without overloading it with a growing queue of messages pulled from Kafka that eventually overflows the JVM’s memory and GC capacity.

Figure 4: Record fetch calls made by the KafkaConsumer over time, before and after deployment of the Alpakka-Kafka-based processor.
Figure 5: Average number of records fetched per fetch request over time, before and after deployment.

Max Consumer Lag

Except for JVM and service uptime, the most significant improvements with the Alpakka-Kafka-based processor manifested in the Kafka consumer lag metrics. While the Spring KafkaListener was deployed, the max consumer lag generally floated long-term at around 60,000 records, excluding the burst event time windows (this is not visually discernible from the graph due to the orders of magnitude differences in plotted values). From a functional point of view, this was unacceptable, as such a large constant lag value implies that device information updates will take a significantly long enough time to propagate into service such that it will be noticeable by our users. The situation exacerbates during the burst event windows, where the max consumer lag would increase to values of over 100 million records (Figure 6).

Since the deployment of the Alpakka-Kafka-based processor, the max consumer lag over time has averaged at zero outside of the burst event windows. Inside the burst event windows, the max consumer lag increases ephemerally to roughly 20,000 records, with only one outlier in the 48 hour time period since deployment (Figure 7). These metrics show us that the Kafka consumption patterns employed by Alpakka-Kafka and the streaming capabilities of Akka, in general, perform exceptionally well at scale, from the quiet use case to the presence of sudden huge message loads.

Figure 6: Max consumer lag of the KafkaConsumer over time, before and after deployment.
Figure 7: Max consumer lag of the KafkaConsumer over time, magnified to the time window some time after deployment.

Commit Rate and Average Commit Latency

When a Kafka consumer fetches records, it can perform manual or automatic offset commits — this is configurable through enable.auto.commit. Contrary to the name, the semantics of manual vs auto commit don’t necessarily refer to how the offset commits are performed, but when in relations to the record fetch-process cycle. With auto commits, messages are acknowledged to have been received as soon as they are fetched and irrespective of processing, whereas with manual commits, the consumer can decide to acknowledge only after a message is properly processed.

By default, when enable.auto.commit is set to false, the Spring KafkaListener performs an offset commit every time a record is processed, i.e., the acknowledgement mode is set to AckMode.RECORD. This is exceedingly inefficient, and is known to reduce the message consumption throughput of the consumer. With the Alpakka-Kafka-based processor, we opted for making record commits in batches (set to 1000 by default), with a max interval of 1 second allowed between commits. This behavior is similar to the AckMode.COUNT_TIME acknowledgement mode in Spring KafkaListener, but with the added benefit of automatically attempting to complete outstanding commit requests when the Kafka consumption fails or terminates.

Under a manual offset commit scheme, it is always possible to re-process Kafka messages in the case of failures. To retain the (mainly) exactly-once processing that is guaranteed by the automatic offset commit scheme, the Kafka processor was updated to store device updates using idempotent upserts, i.e., perform an upsert conditioned on the timestamp of record in the database being earlier than the timestamp of the update to be upserted. This effectively ensures exactly-once processing on a per-event basis.

With the deployment of the Alpakka-Kafka-based processor, the commit rate was significantly lowered from roughly 7 kbytes/sec to 50 bytes/sec (Figure 8), but the average commit latency increased from 1 ms on average to 12 ms (Figure 9). Nonetheless, this is a considerable reduction in the network overhead spent on committing offsets, and has contributed significantly to the improved throughput of the Kafka processing.

Figure 8: Rate of offset commits made by the KafkaConsumer over time, before and after deployment.
Figure 9: Average latency per offset commit over time, before and after deployment.

Conclusion

Kafka streams processing can be difficult to get right. Many system implementation details need to be considered in light of the business requirements. Fortunately, the primitives provided by Akka streams and Alpakka-Kafka empower us to achieve exactly this by allowing us to build streaming solutions that match the business workflows we have while scaling up developer productivity in building out and maintaining these solutions. With the Alpakka-Kafka-based processor in place in the Cloud Registry, we have ensured fault tolerance in the consumer side of the control plane, which is key to enabling accurate and reliable device state aggregation within the Device Management Platform.

Though we have achieved fault-tolerant message consumption, it is only one aspect of the design and implementation of the Device Management Platform. The reliability of the platform and its control plane rests on significant work made in several areas, including the MQTT transport, authentication and authorization, and systems monitoring, all of which we plan to discuss in detail in future blog posts. In the meantime, as a result of this work, we can expect the Device Management Platform to continue to scale to increasing workloads over time as we onboard ever more devices into our systems.


Towards a Reliable Device Management Platform was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Democratizing Fare Storage at scale using Event Sourcing

Post Syndicated from Grab Tech original https://engineering.grab.com/democratizing-fare-storage-at-scale-using-event-sourcing

From humble beginnings, Grab has expanded across different markets in the last couple of years. We’ve added a wide range of features to the Grab platform to continue to delight our customers and driver-partners. We had to incessantly find ways to improve our existing solutions to better support new features.

In this blog, we discuss how we built Fare Storage, Grab’s single source of truth fare data store, and how we overcame the challenges to make it more reliable and scalable to support our expanding features.

High-level Flow

To set some context for this blog, let’s define some key terms before proceeding. A Fare is a dollar amount calculated to move someone or something from point A to point B. And, a Fee is a dollar amount added to or subtracted from the original fare amount for any additional service.

Now that you’re acquainted with the key concepts, let’s look take a look at the following image. It illustrates that features such as Destination Change Fee, Waiting Fee, Cancellation Fee, Tolls, Promos, Surcharges, and many others store additional fee breakdown along with the original fare. This set of information is crucial for generating receipts and debugging processes. However, our legacy storage system wasn’t designed to host massive quantities of information effectively.

Sample Flow with Fee Breakdown

In our legacy architecture, we stored all the booking and fare-related information in a single relational database table. Adding new fare fields and breakdowns required changes in our critical booking system, making iterations prohibitively expensive and hindering innovation.

The need to store the fare information and metadata for every additional feature along with other booking information resulted in a bloated booking entity. With millions of bookings created every day at Grab, this posed a scaling and stability threat to our booking service storage. Moreover, the legacy storage only tracked the latest value of fare and lacked a holistic view of all the modifications to the fare. So, debugging the fare was also a massive chore for our Engineering and Tech Operations teams.

Drafting a solution

The shortcomings of our legacy system led us to explore options for decoupling the fare and its metadata storage from the booking details. We wanted to build a platform that can store and provide access to both fare and its audit trail.

High-level functional requirements for the new fare store were:

  • Provide a platform to store and retrieve fare and associated breakdowns, with no tight coupling between services.
  • Act as a single source-of-truth for fare and associated fees in the Grab ecosystem.
  • Enable clients to access the metadata of fare change events in real-time, enabling the Product team to innovate freely.
  • Provide smooth access to a fare’s audit trail, improving the response time to our customers’ queries.

Non-functional requirements for the fare store were:

  • High availability for the read and write APIs, with few milliseconds latency.
  • Handle concurrent updates to the fare gracefully.
  • Detect duplicate events for a booking for the same transaction.

Storing change sequence with Event Sourcing

Our legacy storage solution used a defined schema and only stored the latest state of the fare. We needed an audit trail-based storage system with fast querying capabilities that can store and retrieve changes in chronological order.

The Event Sourcing pattern stood out as a flexible architectural pattern as it allowed us to store and query the sequence of changes in the order it occurred. In Martin Fowler’s blog, he described Event Sourcing as:

“The fundamental idea of Event Sourcing is to ensure that every change to the state of an application is captured in an event object and that these event objects are themselves stored in the sequence they were applied for the same lifetime as the application state itself.”

With the Event Sourcing pattern, we store all the fare changes as events in the order they occurred for a booking. We iterate through these events to retrieve a complete snapshot of the modifications to the fare.

A sample Fare Event looks like this:

message Event {
  // type of the event, ADD, SUB, SET, resilient
  EventType type = 1;
  // value which was added, subtracted or modified
  double value = 2;
  // fare for the booking after applying discount
  double fare = 3;

  ...

  // description bytes generated by SDK
  bytes description = 11;
  //transactionID for the EventType
  string transactionID = 12;
}

The Event Sourcing pattern also enable us to use the Command Query Responsibility Segregation (CQRS) pattern to decouple the read responsibility for different use cases.

CQRS Pattern

Clients of the fare life cycle read the current fare and create events to change the fare value as per their logic. Clients can also access fare events, when required. This pattern enable clients to modify fares independently, and give them visibility to the sequence for different business needs.

The diagram below describes the overall fare life cycle from creation, modification to display using the event store.

Overall Fare Life Cycle

Architecture overview

Fare Cycle Architecture

Clients interact with the Fare LifeCycle service through an SDK. The SDK offers various features such as metadata serialization, deserialization, retries, and timeouts configurations, some of which are discussed later.

The Fare LifeCycle Store service uses DynamoDB as Event Store to persist and read the fare change events backed by a cache for eventually consistent reads. For further processing, such as archiving and generation of receipts, the successfully updated events are streamed out to a message queue system.

Ensuring the integrity of the fare sequence

Democratizing the responsibility of fare modification means that multiple services might try to update the fare in parallel without prior synchronization. Concurrent fare updates for the same booking might result in a race condition. Concurrency and consistency problems are always highlights of distributed storage systems.

Let’s understand why the ordering of fare updates are important. Business rules for different cities and countries regulate the pricing features based on local market conditions and prevailing laws. For example, in some scenarios, Tolls and Waiting Fees may not be eligible for discounts or promotions. The service applying discounts needs to consider this information while applying a discount. Therefore, updates to the fare are not independent of the previous fare events.

Fare Integrity

We needed a mechanism to detect race conditions and handle them appropriately to ensure the integrity of the fare. To handle race conditions based on our use case, we explored Pessimistic and Optimistic locking mechanisms.

All the expected fare change events happen based on certain conditions being true or false. For example, less than 1% of the bookings have a payment change request initiated by passengers during a ride. And, the probability of multiple similar changes happening on the same booking is rather low. Optimistic Locking offers both efficiency and correctness for our requirements where the chances of race conditions are low, and the records are independent of each other.

The logic to calculate the fare/surcharge is coupled with the business logic of the system that calculates the fare component or fees. So, handling data race conditions on the data store layer was not an acceptable option either. It made more sense to let the clients handle it and keep the storage system decoupled from the business logic to compute the fare.

Optimistic Locking

To achieve Optimistic Locking, we store a fare version and increment it on every successful update. The client must pass the version they read to modify the fare. Should there be a version mismatch between the update query and the current fare, the update is rejected. On version mismatches, the clients read the updated checksum(version) and retry with the recalculated fare.

Idempotency of event updates

The next challenge we came across was how to handle client retries – ensuring that we do not duplicate the same event for the booking. Clients might encounter sporadic errors as a result of network-related issues, although the update was successful. Under such circumstances, clients retry to update the same event, resulting in duplicate events. Duplicate events not only result in an extra space requirement, but it also impairs the clients’ understanding on whether we’ve taken an action multiple times on the fare.

As discussed in the previous section, retrying with the same version would fail due to the version mismatch. If the previous attempt successfully modified the fare, it would also update the version.

However, clients might not know if their update modified the version or if any other clients updated the data. Relying on clients to check for event duplication makes the client-side complex and leaves a chance of error if the clients do not handle it correctly.

Solution for Duplicate Events

To handle the duplicate events, we associate each event with a unique UUID (transactionID) generated from the client-side using a UUID library from the Fare LifeCycle service SDK. We check whether the transactionID is already part of successful transaction IDs before updating the fare. If we identify a non-unique transactionID, we return duplicate event errors to the client.

For unique transactionIDs, we append it to the list of transactionIDs and save it to the Event Store along with the event.

Schema-less metadata

Metadata are the breakdowns associated with the fare. We require the metadata for specific fee/fare calculation for the generation of receipts and debugging purposes. Thus, for the storage system and multiple clients, they need not know the metadata definition of all events.

One goal for our data store was to give our clients the flexibility to add new fields to existing metadata or to define new metadata without changing the API. We adopted an SDK-based approach for metadata, where clients interact with the Fare LifeCycle service via SDK. The SDK has the following responsibilities for metadata:

  1. Serialize the metadata into bytes before making an API call to the Fare LifeCycle service.
  2. Deserialize the bytes metadata returned from the Fare LifeCycle service into a Go struct for client access.
Fare LifeCycle SDK

Serializing and deserializing the metadata on the client-side decoupled it from the Fare LifeCycle Store API. This helped teams update the metadata without deploying the storage service each time.

For reading the breakdown, the clients pass the metadata bytes to the SDK along with the Event Type, and then it converts them back into the corresponding proto schema. With this approach, clients can update the metadata without changing the Data Store Service.

Conclusion

The Fare LifeCycle service enabled us to revolutionize the fare storage at scale for Grab’s ecosystem of services. Further benefits realized with the system are:

  • The feature agnostic platform helped us to reduce the time-to-market for our hyper-local features so that we can further outserve our customers and driver-partners.
  • Decoupling the fare information from the booking information also helped us to achieve a better separation of concerns between services.
  • Improve the overall reliability and scalability of the Grab platform by decoupling fare and booking information, allowing them to scale independently of each other.
  • Reduce unnecessary coupling between services to fetch fare related information and update fare.
  • The audit-trail of fare changes in the chronological order reduced the time to debug fare and improved our response to customers for fare-related queries.

We hope this post helped you to have a closer look at how we used the Event Source pattern for building a data store and how we handled a few caveats and challenges in the process.


Authored by Sourabh Suman on behalf of the Pricing team at Grab. Special thanks to Karthik Gandhi, Kurni Famili, ChandanKumar Agarwal, Adarsh Koyya, Niteesh Mehra, Sebastian Wong, Matthew Saw, Muhammad Muneer, and Vishal Sharma for their contributions.


Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.

Optimally scaling Kafka consumer applications

Post Syndicated from Grab Tech original https://engineering.grab.com/optimally-scaling-kafka-consumer-applications

Earlier this year, we took you on a journey on how we built and deployed our event sourcing and stream processing framework at Grab. We’re happy to share that we’re able to reliably maintain our uptime and continue to service close to 400 billion events a week. We haven’t stopped there though. To ensure that we can scale our framework as the Grab business continuously grows, we have spent efforts optimizing our infrastructure.

In this article, we will dive deeper into our Kubernetes infrastructure setup for our stream processing framework. We will cover why and how we focus on optimal scalability and availability of our infrastructure.

Quick Architecture Recap

Coban Platform Architecture

The Coban platform provides lightweight Golang plugin architecture-based data processing pipelines running in Kubernetes. These are essentially Kafka consumer pods that consume data, process it, and then materialize the results into various sinks (RDMS, other Kafka topics).

Anatomy of a Processing Pod

Anatomy of a Processing Pod

Each stream processing pod (the smallest unit of a pipeline’s deployment) has three top level components:

  • Trigger: An interface that connects directly to the source of the data and converts it into an event channel.
  • Runtime: This is the app’s entry point and the orchestrator of the pod. It manages the worker pools, triggers, event channels, and lifecycle events.
  • Pipeline plugin: This is provided by the user, and conforms to a contract that the platform team publishes. It contains the domain logic for the pipeline and houses the pipeline orchestration defined by a user based on our Stream Processing Framework.

Optimal Scaling

We initially architected our Kubernetes setup around horizontal pod autoscaling (HPA), which scales the number of pods per deployment based on CPU and memory usage. HPA keeps CPU and memory per pod specified in the deployment manifest and scales horizontally as the load changes.

These were the areas of application wastage we observed on our platform:

  • As Grab’s traffic is uneven, we’d always have to provision for peak traffic. As users would not (or could not) always account for ramps, they would be fairly liberal with setting limit values (CPU and memory), leading to resource wastage.
  • Pods often had uneven traffic distribution despite fairly even partition load distribution in Kafka. The Stream Processing Framework(SPF) is essentially Kafka consumers consuming from Kafka topics, hence the number of pods scaling in and out resulted in unequal partition load per pod.

Vertically Scaling with Fixed Number of Pods

We initially kept the number of pods for a pipeline equal to the number of partitions in the topic the pipeline consumes from. This ensured even distribution of partitions to each pod providing balanced consumption. In order to abstract this from the end user, we automated the application deployment process to directly call the Kafka API to fetch the number of partitions during runtime.

After achieving a fixed number of pods for the pipeline, we wanted to move away from HPA. We wanted our pods to scale up and down as the load increases or decreases without any manual intervention. Vertical pod autoscaling (VPA) solves this problem as it relieves us from any manual operation for setting up resources for our deployment.

We just deploy the application and let VPA handle the resources required for its operation. It’s known to not be very susceptible to quick load changes as it trains its model to monitor the deployment’s load trend over a period of time before recommending an optimal resource. This process ensures the optimal resource allocation for our pipelines considering the historic trends on throughput.

We saw a ~45% reduction in our total resource usage vs resource requested after moving to VPA with a fixed number of pods from HPA.

Anatomy of a Processing Pod

Managing Availability

We broadly classify our workloads as latency sensitive (critical) and latency tolerant (non-critical). As a result, we could optimize scheduling and cost efficiency using priority classes and overprovisioning on heterogeneous node types on AWS.

Kubernetes Priority Classes

The main cost of running EKS in AWS is attributed to the EC2 machines that form the worker nodes for the Kubernetes cluster. Running On-Demand brings all the guarantees of instance availability but it is definitely very expensive. Hence, our first action to drive cost optimisation was to include Spot instances in our worker node group.

With the uncertainty of losing a spot instance, we started assigning priority to our various applications. We then let the user choose the priority of their pipeline depending on their use case. Different priorities would result in different node affinity to different kinds of instance groups (On-Demand/Spot). For example, Critical pipelines (latency sensitive) run on On-Demand worker node groups and Non-critical pipelines (latency tolerant) on Spot instance worker node groups.

We use priority class as a method of preemption, as well as a node affinity that chooses a certain priority pipeline for the node group to deploy to.

Overprovisioning

With spot instances running we realised a need to make our cluster quickly respond to failures. We wanted to achieve quick rescheduling of evicted pods, hence we added overprovisioning to our cluster. This means we keep some noop pods occupying free space running in our worker node groups for the quick scheduling of evicted or deploying pods.

The overprovisioned pods are the lowest priority pods, thus can be preempted by any pod waiting in the queue for scheduling. We used cluster proportional autoscaler to decide the right number of these overprovisioned pods, which scales up and down proportionally to cluster size (i.e number of nodes and CPU in worker node group). This relieves us from tuning the number of these noop pods as the cluster scales up or down over the period keeping the free space proportional to current cluster capacity.

Lastly, overprovisioning also helped improve the deployment time because there is no  dependency on the time required for Auto Scaling Groups (ASG) to add a new node to the cluster every time we want to deploy a new application.

Future Improvements

Evolution is an ongoing process. In the next few months, we plan to work on custom resources for combining VPA and fixed deployment size. Our current architecture setup works fine for now, but we would like to create a more tuneable in-house CRD(Custom Resource Definition) for VPA that incorporates rightsizing our Kubernetes deployment horizontally.


Authored By Shubham Badkur on behalf of the Coban team at Grab – Ryan Ooi, Karan Kamath, Hui Yang, Yuguang Xiao, Jump Char, Jason Cusick, Shrinand Thakkar, Dean Barlan, Shivam Dixit, Andy Nguyen, and Ravi Tandon.


Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.