Tag Archives: distributed-systems

Consistent caching mechanism in Titus Gateway

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/consistent-caching-mechanism-in-titus-gateway-6cb89b9ce296

by Tomasz Bak and Fabio Kung

Introduction

Titus is the Netflix cloud container runtime that runs and manages containers at scale. In the time since it was first presented as an advanced Mesos framework, Titus has transparently evolved from being built on top of Mesos to Kubernetes, handling an ever-increasing volume of containers. As the number of Titus users increased over the years, the load and pressure on the system increased substantially. The original assumptions and architectural choices were no longer viable. This blog post presents how our current iteration of Titus deals with high API call volumes by scaling out horizontally.

We introduce a caching mechanism in the API gateway layer, allowing us to offload processing from singleton leader elected controllers without giving up strict data consistency and guarantees clients observe. Titus API clients always see the latest (not stale) version of the data regardless of which gateway node serves their request, and in which order.

Overview

The figure below depicts a simplified high-level architecture of a single Titus cluster (a.k.a cell):

Titus Job Coordinator is a leader elected process managing the active state of the system. Active data includes jobs and tasks that are currently running. When a new leader is elected it loads all data from external storage. Mutations are first persisted to the active data store before in-memory state is changed. Data for completed jobs and tasks is moved to the archive store first, and only then removed from the active data store and from the leader memory.

Titus Gateway handles user requests. A user request could be a job creation request, a query to the active data store, or a query to the archive store (the latter handled directly in Titus Gateway). Requests are load balanced across all Titus Gateway nodes. All reads are consistent, so it does not matter which Titus Gateway instance is serving a query. For example, it is OK to send writes through one instance, and do reads from another one with full data read consistency guarantees. Titus Gateways always connect to the current Titus Job Coordinator leader. During leader failovers, all writes and reads of the active data are rejected until a connection to the active leader is re-established.

In the original version of the system, all queries to the active data set were forwarded to a singleton Titus Job Coordinator. The freshest data is served to all requests, and clients never observe read-your-write or monotonic-read consistency issues¹:

Data consistency on the Titus API is highly desirable as it simplifies client implementation. Causal consistency, which includes read-your-writes and monotonic-reads, frees clients from implementing client-side synchronization mechanisms. In PACELC terms we choose PC/EC and have the same level of availability for writes of our previous system while improving our theoretical availability for reads.

For example, a batch workflow orchestration system may create multiple jobs which are part of a single workflow execution. After the jobs are created, it monitors their execution progress. If the system creates a new job, followed immediately by a query to get its status, and there is a data propagation lag, it might decide that the job was lost and a replacement must be created. In that scenario, the system would need to deal with the data propagation latency directly, for example, by use of timeouts or client-originated update tracking mechanisms. As Titus API reads are always consistently reflecting the up-to-date state, such workarounds are not needed.

With traffic growth, a single leader node handling all request volume started becoming overloaded. We started seeing increased response latencies and leader servers running at dangerously high utilization. To mitigate this issue we decided to handle all query requests directly from Titus Gateway nodes but still preserve the original consistency guarantees:

The state from Titus Job Coordinator is replicated over a persistent stream connection, with low event propagation latencies. A new wire protocol provided by Titus Job Coordinator allows monitoring of the cache consistency level and guarantees that clients always receive the latest data version. The cache is kept in sync with the current leader process. When there is a failover (because of node failures with the current leader or a system upgrade), a new snapshot from the freshly elected leader is loaded, replacing the previous cache state. Titus Gateways handling client requests can now be horizontally scaled out. The details and workings of these mechanisms are the primary topics of this blog post.

How do I know that my cache is up to date?

It is an easy answer for systems that were built from the beginning with a consistent data versioning scheme and can depend on clients to follow the established protocol. Kubernetes is a good example here. Each object and each collection read from the Kubernetes cluster has a unique revision which is a monotonically increasing number. A user may request all changes since the last received revision. For more details, see Kubernetes API Concepts and the Shared Informer Pattern.

In our case, we did not want to change the API contract and impose additional constraints and requirements on our users. Doing so would require a substantial migration effort to move all clients off the old API with questionable value to the affected teams (except for helping us solve Titus' internal scalability problems). In our experience, such migrations require a nontrivial amount of work, particularly with the migration timeline not fully in our control.

To fulfill the existing API contract, we had to guarantee that for a request received at a time T₀, the data returned to the client is read from a cache that contains all state updates in Titus Job Coordinator up to time T₀.

The path over which data travels from Titus Job Coordinator to a Titus Gateway cache can be described as a sequence of event queues with different processing speeds:

A message generated by the event source may be buffered at any stage. Furthermore, as each event stream subscription from Titus Gateway to Titus Job Coordinator establishes a different instance of the processing pipeline, the state of the cache in each gateway instance may be vastly different.

Let’s assume a sequence of events E₁…E₁₀, and their location within the pipeline of two Titus Gateway instances at time T₁:

If a client makes a call to Titus Gateway 2 at the time T₁, it will read version E₈ of the data. If it immediately makes a request to Titus Gateway 1, the cache there is behind with respect to the other gateway so the client might read an older version of the data.

In both cases, data is not up to date in the caches. If a client created a new object at time T₀, and the object value is captured by an event update E₁₀, this object will be missing in both gateways at time T₁. A surprise to the client who successfully completed a create request, but the follow-up query returned a not-found error (read-your-write consistency violation).

The solution is to flush all the events created up to time T₁ and force clients to wait for the cache to receive them all. This work can be split into two different steps each with its own unique solution.

Implementation details

We solved the cache synchronization problem (as stated above) with a combination of two strategies:

  • Titus Gateway <-> Titus Job Coordinator synchronization protocol over the wire.
  • Usage of high-resolution monotonic time sources like Java’s nano time within a single server process. Java’s nano time is used as a logical time within a JVM to define an order for events happening in the JVM process. An alternative solution based on an atomic integer values generator to order the events would suffice as well. Having the local logical time source avoids issues with distributed clock synchronization.

If Titus Gateways subscribed to the Titus Job Coordinator event stream without synchronization steps, the amount of data staleness would be impossible to estimate. To guarantee that a Titus Gateway received all state updates that happened until some time Tₙ an explicit synchronization between the two services must happen. Here is what the protocol we implemented looks like:

  1. Titus Gateway receives a client request (queryₐ).
  2. Titus Gateway makes a request to the local cache to fetch the latest version of the data.
  3. The local cache in Titus Gateway records the local logical time and sends it to Titus Job Coordinator in a keep-alive message (keep-aliveₐ).
  4. Titus Job Coordinator saves the keep-alive request together with the local logical time Tₐ of the request arrival in a local queue (KAₐ, Tₐ).
  5. Titus Job Coordinator sends state updates to Titus Gateway until the former observes a state update (event) with a timestamp past the recorded local logical time (E1, E2).
  6. At that time, Titus Job Coordinator sends an acknowledgment event for the keep-alive message (KAₐ keep-alive ACK).
  7. Titus Gateway receives the keep-alive acknowledgment and consequently knows that its local cache contains all state changes that happened up to the time when the keep-alive request was sent.
  8. At this point the original client request can be handled from the local cache, guaranteeing that the client will get a fresh enough version of the data (responseₐ).

This process is illustrated by the figure below:

The procedure above explains how to synchronize a Titus Gateway cache with the source of truth in Titus Job Coordinator, but it does not address how the internal queues in Titus Job Coordinator are drained to the point where all relevant messages are processed. The solution here is to add a logical timestamp to each event and guarantee a minimum time interval between messages emitted inside the event stream. If not enough events are created because of data updates, a dummy message is generated and inserted into the stream. Dummy messages guarantee that each keep-alive request is acknowledged within a bounded time, and does not wait indefinitely until some change in the system happens. For example:

Ta, Tb, Tc, Td, and Te are high-resolution monotonic logical timestamps. At time Td a dummy message is inserted, so the interval between two consecutive events in the event stream is always below a configurable threshold. These timestamp values are compared with keep-alive request arrival timestamps to know when a keep-alive acknowledgment can be sent.

There are a few optimization techniques that can be used. Here are those implemented in Titus:

  • Before sending a keep-alive request for each new client request, wait a fixed interval and send a single keep-alive request for all requests that arrived during that time. So the maximum rate of keep-alive requests is constrained by 1 / max_interval. For example, if max_interval is set to 5ms, the max keep alive request rate is 200 req / sec.
  • Collapse multiple keep-alive requests in Titus Job Coordinator, sending a response to the latest one which has the arrival timestamp less than that of the timestamp of the last event sent over the network. On the Titus Gateway side, a keep-alive response with a given timestamp acknowledges all pending requests with keep-alive timestamps earlier or equal to the received one.
  • Do not wait for cache synchronization on requests that do not have ordering requirements, serving data from the local cache on each Titus Gateway. Clients that can tolerate eventual consistency can opt into this new API for lower response times and increased availability.

Given the mechanism described so far, let’s try to estimate the maximum wait time of a client request that arrived at Titus Gateway for different scenarios. Let’s assume that the maximum keep alive interval is 5ms, and the maximum interval between events emitted in Titus Job Coordinator is 2ms.

Assuming that the system runs idle (no changes made to the data), and the client request arrives at a time when a new keep-alive request wait time starts, the cache update latency is equal to 7 milliseconds + network propagation delay + processing time. If we ignore the processing time and assume that the network propagation delay is <1ms given we have to only send back a small keep-alive response, we should expect an 8ms delay in the typical case. If the client request does not have to wait for the keep-alive to be sent, and the keep-alive request is acknowledged immediately in Titus Job Coordinator, the delay is equal to network propagation delay + processing time, which we estimated to be <1ms. The average delay introduced by cache synchronization is around 4ms.

Network propagation delays and stream processing times start to become a more important factor as the number of state change events and client requests increases. However, Titus Job Coordinator can now dedicate its capacity for serving high bandwidth streams to a finite number of Titus Gateways, relying on the gateway instances to serve client requests, instead of serving payloads to all client requests itself. Titus Gateways can then be scaled out to match client request volumes.

We ran empirical tests for scenarios of low and high request volumes, and the results are presented in the next section.

Performance test results

To show how the system performs with and without the caching mechanism, we ran two tests:

  • A test with a low/moderate load showing a median latency increase due to overhead from the cache synchronization mechanism, but better 99th percentile latencies.
  • A test with load close to the peak of Titus Job Coordinator capacity, above which the original system collapses. Previous results hold, showing better scalability with the caching solution.

A single request in the tests below consists of one query. The query is of a moderate size, which is a collection of 100 records, with a serialized response size of ~256KB. The total payload (request size times the number of concurrently running requests) requires a network bandwidth of ~2Gbps in the first test and ~8Gbps in the second one.

Moderate load level

This test shows the impact of cache synchronization on query latency in a moderately loaded system. The query rate in this test is set to 1K requests/second.

Median latency without caching is half of what we observe with the introduction of the caching mechanism, due to the added synchronization delays. In exchange, the worst-case 99th percentile latencies are 90% lower, dropping from 292 milliseconds without a cache to 30 milliseconds with the cache.

Load level close to Titus Job Coordinator maximum

If Titus Job Coordinator has to handle all query requests (when the cache is not enabled), it handles the traffic well up to 4K test queries / second, and breaks down (sharp latency increase and a rapid drop of throughput) at around 4.5K queries/sec. The maximum load test is thus kept at 4K queries/second.

Without caching enabled the 99th percentile hovers around 1000ms, and the 80th percentile is around 336ms, compared with the cache-enabled 99th percentile at 46ms and 80th percentile at 22ms. The median still looks better on the setup with no cache at 17ms vs 19ms when the cache is enabled. It should be noted however that the system with caching enabled scales out linearly to more request load while keeping the same latency percentiles, while the no-cache setup collapses with a mere ~15% additional load increase.

Doubling the load when the caching is enabled does not increase the latencies at all. Here are latency percentiles when running 8K query requests/second:

Conclusion

After reaching the limit of vertical scaling of our previous system, we were pleased to implement a real solution that provides (in a practical sense) unlimited scalability of Titus read-only API. We were able to achieve better tail latencies with a minor sacrifice in median latencies when traffic is low, and gained the ability to horizontally scale out our API gateway processing layer to handle growth in traffic without changes to API clients. The upgrade process was completely transparent, and no single client observed any abnormalities or changes in API behavior during and after the migration.

The mechanism described here can be applied to any system relying on a singleton leader elected component as the source of truth for managed data, where the data fits in memory and latency is low.

As for prior art, there is ample coverage of cache coherence protocols in the literature, both in the context of multiprocessor architectures (Adve & Gharachorloo, 1996) and distributed systems (Gwertzman & Seltzer, 1996). Our work fits within mechanisms of client polling and invalidation protocols explored by Gwertzman and Seltzer (1996) in their survey paper. Central timestamping to facilitate linearizability in read replicas is similar to the Calvin system (example real-world implementations in systems like FoundationDB) as well as the replica watermarking in AWS Aurora.

¹ Designing Data-Intensive Applications is an excellent book that goes into detail about consistency models discussed in this blog post.

² Adve, S. V., & Gharachorloo, K. (1996). Shared memory consistency models: A tutorial. computer, 29(12), 66–76.

³ Gwertzman, J., & Seltzer, M. I. (1996, January). World Wide Web Cache Consistency. In USENIX annual technical conference (Vol. 141, p. 152).


Consistent caching mechanism in Titus Gateway was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Orchestrating Data/ML Workflows at Scale With Netflix Maestro

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/orchestrating-data-ml-workflows-at-scale-with-netflix-maestro-aaa2b41b800c

by Jun He, Akash Dwivedi, Natallia Dzenisenka, Snehal Chennuru, Praneeth Yenugutala, Pawan Dixit

At Netflix, Data and Machine Learning (ML) pipelines are widely used and have become central for the business, representing diverse use cases that go beyond recommendations, predictions and data transformations. A large number of batch workflows run daily to serve various business needs. These include ETL pipelines, ML model training workflows, batch jobs, etc. As Big data and ML became more prevalent and impactful, the scalability, reliability, and usability of the orchestrating ecosystem have increasingly become more important for our data scientists and the company.

In this blog post, we introduce and share learnings on Maestro, a workflow orchestrator that can schedule and manage workflows at a massive scale.

Motivation

Scalability and usability are essential to enable large-scale workflows and support a wide range of use cases. Our existing orchestrator (Meson) has worked well for several years. It schedules around 70 thousands of workflows and half a million jobs per day. Due to its popularity, the number of workflows managed by the system has grown exponentially. We started seeing signs of scale issues, like:

  • Slowness during peak traffic moments like 12 AM UTC, leading to increased operational burden. The scheduler on-call has to closely monitor the system during non-business hours.
  • Meson was based on a single leader architecture with high availability. As the usage increased, we had to vertically scale the system to keep up and were approaching AWS instance type limits.

With the high growth of workflows in the past few years — increasing at > 100% a year, the need for a scalable data workflow orchestrator has become paramount for Netflix’s business needs. After perusing the current landscape of workflow orchestrators, we decided to develop a next generation system that can scale horizontally to spread the jobs across the cluster consisting of 100’s of nodes. It addresses the key challenges we face with Meson and achieves operational excellence.

Challenges in Workflow Orchestration

Scalability

The orchestrator has to schedule hundreds of thousands of workflows, millions of jobs every day and operate with a strict SLO of less than 1 minute of scheduler introduced delay even when there are spikes in the traffic. At Netflix, the peak traffic load can be a few orders of magnitude higher than the average load. For example, a lot of our workflows are run around midnight UTC. Hence, the system has to withstand bursts in traffic while still maintaining the SLO requirements. Additionally, we would like to have a single scheduler cluster to manage most of user workflows for operational and usability reasons.

Another dimension of scalability to consider is the size of the workflow. In the data domain, it is common to have a super large number of jobs within a single workflow. For example, a workflow to backfill hourly data for the past five years can lead to 43800 jobs (24 * 365 * 5), each of which processes data for an hour. Similarly, ML model training workflows usually consist of tens of thousands of training jobs within a single workflow. Those large-scale workflows might create hotspots and overwhelm the orchestrator and downstream systems. Therefore, the orchestrator has to manage a workflow consisting of hundreds of thousands of jobs in a performant way, which is also quite challenging.

Usability

Netflix is a data-driven company, where key decisions are driven by data insights, from the pixel color used on the landing page to the renewal of a TV-series. Data scientists, engineers, non-engineers, and even content producers all run their data pipelines to get the necessary insights. Given the diverse backgrounds, usability is a cornerstone of a successful orchestrator at Netflix.

We would like our users to focus on their business logic and let the orchestrator solve cross-cutting concerns like scheduling, processing, error handling, security etc. It needs to provide different grains of abstractions for solving similar problems, high-level to cater to non-engineers and low-level for engineers to solve their specific problems. It should also provide all the knobs for configuring their workflows to suit their needs. In addition, it is critical for the system to be debuggable and surface all the errors for users to troubleshoot, as they improve the UX and reduce the operational burden.

Providing abstractions for the users is also needed to save valuable time on creating workflows and jobs. We want users to rely on shared templates and reuse their workflow definitions across their team, saving time and effort on creating the same functionality. Using job templates across the company also helps with upgrades and fixes: when the change is made in a template it’s automatically updated for all workflows that use it.

However, usability is challenging as it is often opinionated. Different users have different preferences and might ask for different features. Sometimes, the users might ask for the opposite features or ask for some niche cases, which might not necessarily be useful for a broader audience.

Introducing Maestro

Maestro is the next generation Data Workflow Orchestration platform to meet the current and future needs of Netflix. It is a general-purpose workflow orchestrator that provides a fully managed workflow-as-a-service (WAAS) to the data platform at Netflix. It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, for various use cases.

Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users. Figure 1 shows the high-level architecture.

Figure 1. Maestro high level architecture
Figure 1. Maestro high level architecture

In Maestro, a workflow is a DAG (Directed acyclic graph) of individual units of job definition called Steps. Steps can have dependencies, triggers, workflow parameters, metadata, step parameters, configurations, and branches (conditional or unconditional). In this blog, we use step and job interchangeably. A workflow instance is an execution of a workflow, similarly, an execution of a step is called a step instance. Instance data include the evaluated parameters and other information collected at runtime to provide different kinds of execution insights. The system consists of 3 main micro services which we will expand upon in the following sections.

Maestro ensures the business logic is run in isolation. Maestro launches a unit of work (a.k.a. Steps) in a container and ensures the container is launched with the users/applications identity. Launching with identity ensures the work is launched on-behalf-of the user/application, the identity is later used by the downstream systems to validate if an operation is allowed or not, for an example user/application identity is checked by the data warehouse to validate if a table read/write is allowed or not.

Workflow Engine

Workflow engine is the core component, which manages workflow definitions, the lifecycle of workflow instances, and step instances. It provides rich features to support:

  • Any valid DAG patterns
  • Popular data flow constructs like sub workflow, foreach, conditional branching etc.
  • Multiple failure modes to handle step failures with different error retry policies
  • Flexible concurrency control to throttle the number of executions at workflow/step level
  • Step templates for common job patterns like running a Spark query or moving data to Google sheets
  • Support parameter code injection using customized expression language
  • Workflow definition and ownership management.
    Timeline including all state changes and related debug info.

We use Netflix open source project Conductor as a library to manage the workflow state machine in Maestro. It ensures to enqueue and dequeue each step defined in a workflow with at least once guarantee.

Time-Based Scheduling Service

Time-based scheduling service starts new workflow instances at the scheduled time specified in workflow definitions. Users can define the schedule using cron expression or using periodic schedule templates like hourly, weekly etc;. This service is lightweight and provides an at-least-once scheduling guarantee. Maestro engine service will deduplicate the triggering requests to achieve an exact-once guarantee when scheduling workflows.

Time-based triggering is popular due to its simplicity and ease of management. But sometimes, it is not efficient. For example, the daily workflow should process the data when the data partition is ready, not always at midnight. Therefore, on top of manual and time-based triggering, we also provide event-driven triggering.

Signal Service

Maestro supports event-driven triggering over signals, which are pieces of messages carrying information such as parameter values. Signal triggering is efficient and accurate because we don’t waste resources checking if the workflow is ready to run, instead we only execute the workflow when a condition is met.

Signals are used in two ways:

  • A trigger to start new workflow instances
  • A gating function to conditionally start a step (e.g., data partition readiness)

Signal service goals are to

  • Collect and index signals
  • Register and handle workflow trigger subscriptions
  • Register and handle the step gating functions
  • Captures the lineage of workflows triggers and steps unblocked by a signal
Figure 2. Signal service high level architecture
Figure 2. Signal service high level architecture

The maestro signal service consumes all the signals from different sources, e.g. all the warehouse table updates, S3 events, a workflow releasing a signal, and then generates the corresponding triggers by correlating a signal with its subscribed workflows. In addition to the transformation between external signals and workflow triggers, this service is also responsible for step dependencies by looking up the received signals in the history. Like the scheduling service, the signal service together with Maestro engine achieves exactly-once triggering guarantees.

Signal service also provides the signal lineage, which is useful in many cases. For example, a table updated by a workflow could lead to a chain of downstream workflow executions. Most of the time the workflows are owned by different teams, the signal lineage helps the upstream and downstream workflow owners to see who depends on whom.

Orchestration at Scale

All services in the Maestro system are stateless and can be horizontally scaled out. All the requests are processed via distributed queues for message passing. By having a shared nothing architecture, Maestro can horizontally scale to manage the states of millions of workflow and step instances at the same time.

CockroachDB is used for persisting workflow definitions and instance state. We chose CockroachDB as it is an open-source distributed SQL database that provides strong consistency guarantees that can be scaled horizontally without much operational overhead.

It is hard to support super large workflows in general. For example, a workflow definition can explicitly define a DAG consisting of millions of nodes. With that number of nodes in a DAG, UI cannot render it well. We have to enforce some constraints and support valid use cases consisting of hundreds of thousands (or even millions) of step instances in a workflow instance.

Based on our findings and user feedback, we found that in practice

  • Users don’t want to manually write the definitions for thousands of steps in a single workflow definition, which is hard to manage and navigate over UI. When such a use case exists, it is always feasible to decompose the workflow into smaller sub workflows.
  • Users expect to repeatedly run a certain part of DAG hundreds of thousands (or even millions) times with different parameter settings in a given workflow instance. So at runtime, a workflow instance might include millions of step instances.

Therefore, we enforce a workflow DAG size limit (e.g. 1K) and we provide a foreach pattern that allows users to define a sub DAG within a foreach block and iterate the sub DAG with a larger limit (e.g. 100K). Note that foreach can be nested by another foreach. So users can run millions or billions of steps in a single workflow instance.

In Maestro, foreach itself is a step in the original workflow definition. Foreach is internally treated as another workflow which scales similarly as any other Maestro workflow based on the number of step executions in the foreach loop. The execution of sub DAG within foreach will be delegated to a separate workflow instance. Foreach step will then monitor and collect status of those foreach workflow instances, each of which manages the execution of one iteration.

Figure 3. Maestro’s scalable foreach design to support super large iterations
Figure 3. Maestro’s scalable foreach design to support super large iterations

With this design, foreach pattern supports sequential loop and nested loop with high scalability. It is easy to manage and troubleshoot as users can see the overall loop status at the foreach step or view each iteration separately.

Workflow Platform for Everyone

We aim to make Maestro user friendly and easy to learn for users with different backgrounds. We made some assumptions about user proficiency in programming languages and they can bring their business logic in multiple ways, including but not limited to, a bash script, a Jupyter notebook, a Java jar, a docker image, a SQL statement, or a few clicks in the UI using parameterized workflow templates.

User Interfaces

Maestro provides multiple domain specific languages (DSLs) including YAML, Python, and Java, for end users to define their workflows, which are decoupled from their business logic. Users can also directly talk to Maestro API to create workflows using the JSON data model. We found that human readable DSL is popular and plays an important role to support different use cases. YAML DSL is the most popular one due to its simplicity and readability.

Here is an example workflow defined by different DSLs.

Figure 4. An example workflow defined by YAML, Python, and Java DSLs
Figure 4. An example workflow defined by YAML, Python, and Java DSLs

Additionally, users can also generate certain types of workflows on UI or use other libraries, e.g.

  • In Notebook UI, users can directly schedule to run the chosen notebook periodically.
  • In Maestro UI, users can directly schedule to move data from one source (e.g. a data table or a spreadsheet) to another periodically.
  • Users can use Metaflow library to create workflows in Maestro to execute DAGs consisting of arbitrary Python code.

Parameterized Workflows

Lots of times, users want to define a dynamic workflow to adapt to different scenarios. Based on our experiences, a completely dynamic workflow is less favorable and hard to maintain and troubleshooting. Instead, Maestro provides three features to assist users to define a parameterized workflow

  • Conditional branching
  • Sub-workflow
  • Output parameters

Instead of dynamically changing the workflow DAG at runtime, users can define those changes as sub workflows and then invoke the appropriate sub workflow at runtime because the sub workflow id is a parameter, which is evaluated at runtime. Additionally, using the output parameter, users can produce different results from the upstream job step and then iterate through those within the foreach, pass it to the sub workflow, or use it in the downstream steps.

Here is an example (using YAML DSL) of backfill workflow with 2 steps. In step1, the step computes the backfill ranges and returns the dates back. Next, foreach step uses the dates from step1 to create foreach iterations. Finally, each of the backfill jobs gets the date from the foreach and backfills the data based on the date.

Workflow:
id: demo.pipeline
jobs:
- job:
id: step1
type: NoOp
'!dates': return new int[]{20220101,20220102,20220103}; #SEL
- foreach:
id: step2
params:
date: ${[email protected]} #reference a upstream step parameter
jobs:
- job:
id: backfill
type: Notebook
notebook:
input_path: s3://path/to/notebook.ipynb
arg1: $date #pass the foreach parameter into notebook
Figure 4. An example of using parameterized workflow for backfill data
Figure 5. An example of using parameterized workflow for backfill data

The parameter system in Maestro is completely dynamic with code injection support. Users can write the code in Java syntax as the parameter definition. We developed our own secured expression language (SEL) to ensure security. It only exposes limited functionality and includes additional checks (e.g. the number of iteration in the loop statement, etc.) in the language parser.

Execution Abstractions

Maestro provides multiple levels of execution abstractions. Users can choose to use the provided step type and set its parameters. This helps to encapsulate the business logic of commonly used operations, making it very easy for users to create jobs. For example, for spark step type, all users have to do is just specify needed parameters like spark sql query, memory requirements, etc, and Maestro will do all behind-the-scenes to create the step. If we have to make a change in the business logic of a certain step, we can do so seamlessly for users of that step type.

If provided step types are not enough, users can also develop their own business logic in a Jupyter notebook and then pass it to Maestro. Advanced users can develop their own well-tuned docker image and let Maestro handle the scheduling and execution.

Additionally, we abstract the common functions or reusable patterns from various use cases and add them to the Maestro in a loosely coupled way by introducing job templates, which are parameterized notebooks. This is different from step types, as templates provide a combination of various steps. Advanced users also leverage this feature to ship common patterns for their own teams. While creating a new template, users can define the list of required/optional parameters with the types and register the template with Maestro. Maestro validates the parameters and types at the push and run time. In the future, we plan to extend this functionality to make it very easy for users to define templates for their teams and for all employees. In some cases, sub-workflows are also used to define common sub DAGs to achieve multi-step functions.

Moving Forward

We are taking Big Data Orchestration to the next level and constantly solving new problems and challenges, please stay tuned. If you are motivated to solve large scale orchestration problems, please join us as we are hiring.


Orchestrating Data/ML Workflows at Scale With Netflix Maestro was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support for Non-Parallelizable Workloads

by Kostas Christidis

Introduction

Timestone is a high-throughput, low-latency priority queueing system we built in-house to support the needs of Cosmos, our media encoding platform. Over the past 2.5 years, its usage has increased, and Timestone is now also the priority queueing engine backing Conductor, our general-purpose workflow orchestration engine, and BDP Scheduler, the scheduler for large-scale data pipelines. All in all, millions of critical workflows within Netflix now flow through Timestone on a daily basis.

Timestone clients can create queues, enqueue messages with user-defined deadlines and metadata, then dequeue these messages in an earliest-deadline-first (EDF) fashion. Filtering for EDF messages with criteria (e.g. “messages that belong to queue X and have metadata Y”) is also supported.

One of the things that make Timestone different from other priority queues is its support for a construct we call exclusive queues — this is a means to mark chunks of work as non-parallelizable, without requiring any locking or coordination on the consumer side; everything is taken care of by the exclusive queue in the background. We explain the concept in detail in the sections that follow.

Why Timestone

When designing the successor to Reloaded — our media encoding system — back in 2018 (see “Background” section in The Netflix Cosmos Platform), we needed a priority queueing system that would provide queues between the three components in Cosmos (Figure 1):

  1. the API framework (Optimus),
  2. the forward chaining rule engine (Plato), and
  3. the serverless computing layer (Stratum)
Figure 1. A video encoding application built on top of Cosmos. Notice the three Cosmos subsystems: Optimus, an API layer mapping external requests to internal business models, Plato, a workflow layer for business rule modeling, and Stratum, the serverless layer for running stateless and computational-intensive functions. Source: The Netflix Cosmos Platform

Some of the key requirements this priority queueing system would need to satisfy:

1. A message can only be assigned to one worker at any given time. The work that tends to happen in Cosmos is resource-intensive, and can fan out to thousands of actions. Assume then, that there is replication lag between the replicas in our data store, and we present as dequeueable to worker B the message that was just dequeued by worker A via a different node. When we do that, we waste significant compute cycles. This requirement then throws eventually consistent solutions out of the window, and means we want linearizable consistency at the queue level.

2. Allow for non-parallelizable work.

Given that Plato is continuously polling all workflow queues for more work to execute —

While Plato is executing a workflow for a given project (a request for work on a given service) —

Then Plato should not be able to dequeue additional requests for work for that project on that workflow. Otherwise Plato’s inference engine will evaluate the workflow prematurely, and may move the workflow to an incorrect state.

There exists then, a certain type of work in Cosmos that should not be parallelizable, and the ask is for the queueing system to support this type of access pattern natively. This requirement gave birth to the exclusive queue concept. We explain how exclusive queues work in Timestone in the“Key Concepts” section.

3. Allow for dequeueing and queue depth querying using filters (metadata key-value pairs)

4. Allow for the automatic creation of a queue upon message ingestion

5. Render a message dequeueable within a second of ingestion

We built Timestone because we could not find an off-the-shelf solution that meets these requirements.

System Architecture

Timestone is a gRPC-based service. We use protocol buffers to define the interface of our service and the structure of our request and response messages. The system diagram for the application is shown in Figure 2.

Figure 2. Timestone system diagram. Arrows link all the components touched during a typical Timestone client-server interaction. Numbers in red indicate sequence steps. Identical numbers identify concurrent steps.

System of record

The system of record is a durable Redis cluster. Every write request (see Step 1 — note that this includes dequeue requests since they alter the state of the queue) that reaches the cluster (Step 2) is persisted to a transaction log before a response is sent back to the server (Step 3).

Inside the database, we represent each queue with a sorted set where we rank message IDs (see “Message” section) according to priority. We persist messages and queue configurations (see “Queues” section) in Redis as hashes. All data structures related to a queue — from the messages it contains to the in-memory secondary indexes needed to support dequeue-by-filter — are placed in the same Redis shard. We achieve this by having them share a common prefix, specific to the queue in question. We then codify this prefix as a Redis hash tag. Each message carries a payload (see “Message” section) that can weigh up to 32 KiB.

Almost all of the interactions between Timestone and Redis (see “Message States” section) are codified as Lua scripts. In most of these Lua scripts, we tend to update a number of data structures. Since Redis guarantees that each script is executed atomically, a successful script execution is guaranteed to leave the system in a consistent (in the ACID sense) state.

All API operations are queue-scoped. All API operations that modify state are idempotent.

Secondary indexes

For observability purposes, we capture information about incoming messages and their transition between states in two secondary indexes maintained on Elasticsearch.

When we get a write response from Redis, we concurrently (a) return the response to the client, and (b) convert this response into an event that we post to a Kafka cluster, as shown in Step 4. Two Flink jobs — one for each type of index we maintain — consume the events from the corresponding Kafka topics, and update the indexes in Elasticsearch.

One index (“current”) gives users a best-effort view into the current state of the system, while the other index (“historic”) gives users a best effort longitudinal view for messages, allowing them to trace the messages as they flow through Timestone, and answer questions such as time spent in a state, and number of processing errors. We maintain a version counter for each message; every write operation increments that counter. We rely on that version counter to order the events in the historic index.

Events are stored in the Elasticsearch cluster for a finite number of days.

Current Usage at Netflix

The system is dequeue heavy. We see 30K dequeue requests per second (RPS) with a P99 latency of 45ms. In comparison, we see 1.2K enqueue RPS at 25ms P99 latency. We regularly see 5K RPS enqueue bursts at 85ms P99 latency.

15B messages have been enqueued to Timestone since the beginning of the year; these messages have been dequeued 400B times. Pending messages regularly reach 10M.

Usage is expected to double next year, as we migrate the rest of Reloaded, our legacy media encoding system, to Cosmos.

Key Concepts

Message

A message carries an opaque payload, a user-defined priority (see “Priority” section), an optional (mandatory for exclusive queues) set of metadata key-value pairs that can be used for filter-based dequeueing, and an optional invisibility duration.

Any message that is placed into a queue can be dequeued a finite number of times. We call these attempts; each dequeue invocation on a message decreases the attempts left on it.

Priority

The priority of a message is expressed as an integer value; the lower the value, the higher the priority. While an application is free to use whatever range they see fit, the norm is to use Unix timestamps in milliseconds (e.g. 1661990400000 for 9/1/2022 midnight UTC).

Figure 3. A snippet from the PriorityClass enum used by a streaming encoding pipeline in Cosmos. The values in parentheses indicate the offset in days.

It is also entirely up to the application to define its own priority levels. For instance a streaming encoding pipeline within Cosmos uses mail priority classes, as shown in Figure 3. Messages belonging to the standard class use the time of enqueue as their priority, while all other classes have their priority values adjusted in multiples of ∼10 years. The priority is set at the workflow rule level, but can be overridden if the request carries a studio tag, such as DAY_OF_BROADCAST.

Message States

Within a queue, a Timestone message belongs to one of six states (Figure 4):

  1. invisible
  2. pending
  3. running
  4. completed
  5. canceled
  6. errored

In general, a message can be enqueued with or without invisibility, which makes the message invisible or pending respectively. Invisible messages become pending when their invisibility window elapses.

A worker can dequeue a pending earliest-deadline-first message from a queue by specifying the amount of time (lease duration) they will be processing it for. Dequeueing messages in batch is also supported. This moves the message to the running state.

The same worker can then issue a complete call to Timestone within the allotted lease window to move the message to the completed state, or issue a lease extension call if they want to maintain control of the message. (A worker can also move a typically running message to the canceled state to signal it is no longer need for processing.)

If none of these calls are issued on time, the message becomes dequeueable again, and this attempt on the message is spent. If there are no attempts left on the message, it is moved automatically to the errored state.

The terminal states (completed, errored, and canceled) are garbage-collected periodically by a background process.

Messages can move states either when a worker invokes an API operation, or when Timestone runs its background processes (Figure 4, marked in red — these run periodically). Figure 4 shows the complete state transition diagram.

Figure 4. State transition diagram for Timestone messages.

Queues

All incoming messages are stored in queues. Within a queue, messages are sorted by their priority date. Timestone can host an arbitrary number of user-created queues, and offers a set of API operations for queue management, all revolving around a queue configuration object. Data we store in this object includes the queue’s type (see rest of section), the lease duration that applies to dequeued messages, or the invisibility duration that applies to enqueued messages, the number of times a message can be dequeued, and whether enqueueing or dequeueing is temporarily blocked. Note that a message producer can override the default lease or invisibility duration by setting it at the message level during enqueue.

All queues in Timestone fall into two types, simple, and exclusive.

When an exclusive queue is created, it is associated with a user-defined exclusivity key — for example project. All messages posted to that queue must carry this key in their metadata. For instance, a message with project=foo will be accepted into the queue; a message without the project key will not be. In this example, we call foo, the value that corresponds to the exclusivity key, the message’s exclusivity value.

The contract for exclusive queues is that at any point in time, there can be only up to one consumer per exclusivity value. Therefore, if the project-based exclusive queue in our example has two messages with the key-value pair project=foo in it, and one of them is already leased out to a worker, the other one is not dequeueable. This is depicted in Figure 5.

Figure 5. When worker_2 issues a dequeue call, they lease msg_2 instead of msg_1, even though msg_1 has a higher priority. That happens because the queue is exclusive, and the exclusive value foo is already leased out.

In a simple queue no such contract applies, and there is no tight coupling with message metadata keys. A simple queue works as your typical priority queue, simply ordering messages in an earliest-deadline-first fashion.

What We Are Working On

Some of the things we’re working on:

  1. As the the usage of Timestone within Cosmos grows, so does the need to support a range of queue depth queries. To solve this, we are building a dedicated query service that uses a distinct query model.
  2. As noted above (see “System of record” section), a queue and its contents can only currently occupy one Redis shard. Hot queues however can grow big, esp. when compute capacity is scarce. We want to support arbitrarily large queues, which has us building support for queue sharding.
  3. Messages can carry up to 4 key-value pairs. We currently use all of these key-value pairs to populate the secondary indexes used during dequeue-by-filter. This operation is exponentially complex both in terms of time and space (O(2^n)). We are switching to lexicographical ordering on sorted sets to drop the number of indexes by half, and handle metadata in a more cost-efficient manner.

We may be covering our work on the above in follow-up posts. If these kinds of problems sound interesting to you, and if you like the challenges of building distributed systems for the Netflix Content and Studio ecosystem at scale in general, you should consider joining us.

Acknowledgements

Poorna Reddy, Kostas Christidis, Aravindan Ramkumar, Surafel Korse, Jiaofen Xu, Anoop Panicker, and Kishore Banala have contributed to this project. We thank Charles Zhao, Olof Johansson, Frank San Miguel, Dmitry Vasilyev, Prudhvi Chaganti, and the rest of the Cosmos team for their constructive feedback while developing and operating Timestone.


Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Let’s Architect! Architecting with custom chips and accelerators

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-custom-chips-and-accelerators/

It’s hard to imagine a world without computer chips. They are at the heart of the devices that we use to work and play every day. Currently, Amazon Web Services (AWS) is offering customers the next generation of computer chip, with lower cost, higher performance, and a reduced carbon footprint.

This edition of Let’s Architect! focuses on custom computer chips, accelerators, and technologies developed by AWS, such as AWS Nitro System, custom-designed Arm-based AWS Graviton processors that support data-intensive workloads, as well as AWS Trainium, and AWS Inferentia chips optimized for machine learning training and inference.

In this post, we discuss these new AWS technologies, their main characteristics, and how to take advantage of them in your architecture.

Deliver high performance ML inference with AWS Inferentia

As Deep Learning models become increasingly large and complex, the training cost for these models increases, as well as the inference time for serving.

With AWS Inferentia, machine learning practitioners can deploy complex neural-network models that are built and trained on popular frameworks, such as Tensorflow, PyTorch, and MXNet on AWS Inferentia-based Amazon EC2 Inf1 instances.

This video introduces you to the main concepts of AWS Inferentia, a service designed to reduce both cost and latency for inference. To speed up inference, AWS Inferentia: selects and shares a model across multiple chips, places pieces inside the on-chip cache, then streams the data via pipeline for low-latency predictions.

Presenters discuss through the structure of the chip, software considerations, as well as anecdotes from the Amazon Alexa team, who uses AWS Inferentia to serve predictions. If you want to learn more about high throughput coupled with low latency, explore Achieve 12x higher throughput and lowest latency for PyTorch Natural Language Processing applications out-of-the-box on AWS Inferentia on the AWS Machine Learning Blog.

AWS Inferentia shares a model across different chips to speed up inference

AWS Inferentia shares a model across different chips to speed up inference

AWS Lambda Functions Powered by AWS Graviton2 Processor – Run Your Functions on Arm and Get Up to 34% Better Price Performance

AWS Lambda is a serverless, event-driven compute service that enables code to run from virtually any type of application or backend service, without provisioning or managing servers. Lambda uses a high-availability compute infrastructure and performs all of the administration of the compute resources, including server- and operating-system maintenance, capacity-provisioning, and automatic scaling and logging.

AWS Graviton processors are designed to deliver the best price and performance for cloud workloads. AWS Graviton3 processors are the latest in the AWS Graviton processor family and provide up to: 25% increased compute performance, two-times higher floating-point performance, and two-times faster cryptographic workload performance compared with AWS Graviton2 processors. This means you can migrate AWS Lambda functions to Graviton in minutes, plus get as much as 19% improved performance at approximately 20% lower cost (compared with x86).

Comparison between x86 and Arm/Graviton2 results for the AWS Lambda function computing prime numbers

Comparison between x86 and Arm/Graviton2 results for the AWS Lambda function computing prime numbers (click to enlarge)

Powering next-gen Amazon EC2: Deep dive on the Nitro System

The AWS Nitro System is a collection of building-block technologies that includes AWS-built hardware offload and security components. It is powering the next generation of Amazon EC2 instances, with a broadening selection of compute, storage, memory, and networking options.

In this session, dive deep into the Nitro System, reviewing its design and architecture, exploring new innovations to the Nitro platform, and understanding how it allows for fasting innovation and increased security while reducing costs.

Traditionally, hypervisors protect the physical hardware and bios; virtualize the CPU, storage, networking; and provide a rich set of management capabilities. With the AWS Nitro System, AWS breaks apart those functions and offloads them to dedicated hardware and software.

AWS Nitro System separates functions and offloads them to dedicated hardware and software, in place of a traditional hypervisor

AWS Nitro System separates functions and offloads them to dedicated hardware and software, in place of a traditional hypervisor

How Amazon migrated a large ecommerce platform to AWS Graviton

In this re:Invent 2021 session, we learn about the benefits Amazon’s ecommerce Datapath platform has realized with AWS Graviton.

With a range of 25%-40% performance gains across 53,000 Amazon EC2 instances worldwide for Prime Day 2021, the Datapath team is lowering their internal costs with AWS Graviton’s improved price performance. Explore the software updates that were required to achieve this and the testing approach used to optimize and validate the deployments. Finally, learn about the Datapath team’s migration approach that was used for their production deployment.

AWS Graviton2: core components

AWS Graviton2: core components

See you next time!

Thanks for exploring custom computer chips, accelerators, and technologies developed by AWS. Join us in a couple of weeks when we talk more about architectures and the daily challenges faced while working with distributed systems.

Other posts in this series

Looking for more architecture content?

AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

Let’s Architect! Modern data architectures

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-modern-data-architectures/

With the rapid growth in data coming from data platforms and applications, and the continuous improvements in state-of-the-art machine learning algorithms, data are becoming key assets for companies.

Modern data architectures include data mesh—a recent style that represents a paradigm shift, in which data is treated as a product and data architectures are designed around business domains. This type of approach supports the idea of distributed data, where each business domain focuses on the quality of the data it produces and exposes to the consumers.

In this edition of Let’s Architect!, we focus on data mesh and how it is designed on AWS, plus other approaches to adopt modern architectural patterns.

Design a data mesh architecture using AWS Lake Formation and AWS Glue

Domain Driven Design (DDD) is a software design approach where a solution is divided into domains aligned with business capabilities, software, and organizational boundaries. Unlike software architectures, most data architectures are often designed around technologies rather than business domains.

In this blog, you can learn about data mesh, an architectural pattern that applies the principles of DDD to data architectures. Data are organized into domains and considered the product that each team owns and offers for consumption.

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

Building Data Mesh Architectures on AWS

In this video, discover how to use the data mesh approach in AWS. Specifically, how to implement certain design patterns for building a data mesh architecture with AWS services in the cloud.

This is a pragmatic presentation to get a quick understanding of data mesh fundamentals, the benefits/challenges, and the AWS services that you can use to build it. This video provides additional context to the aforementioned blog post and includes several examples on the benefits of modern data architectures.

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

Build a modern data architecture on AWS with Amazon AppFlow, AWS Lake Formation, and Amazon Redshift

In this blog, you can learn how to build a modern data strategy using AWS managed services to ingest data from sources like Salesforce. Also discussed is how to automatically create metadata catalogs and share data seamlessly between the data lake and data warehouse, plus creating alerts in the event of an orchestrated data workflow failure.

The second part of the post explains how a data warehouse can be built by using an agile data modeling pattern, as well as how ELT jobs were quickly developed, orchestrated, and configured to perform automated data quality testing.

A data platform architecture and the subcomponents used to build it

A data platform architecture and the subcomponents used to build it

AWS Lake Formation Workshop

With a modern data architecture on AWS, architects and engineers can rapidly build scalable data lakes; use a broad and deep collection of purpose-built data services; and ensure compliance via unified data access, security, and governance. As data mesh is a modern architectural pattern, you can build it using a service like AWS Lake Formation.

Familiarize yourself with new technologies and services by not only learning how they work, but also to building prototypes and projects to gain hands-on experience. This workshop allows builders to become familiar with the features of AWS Lake Formation and its integrations with other AWS services.

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

See you next time!

Thanks for joining our discussion on data mesh! See you in a couple of weeks when we talk more about architectures and the challenges that we face every day while working with distributed systems.

Other posts in this series

Looking for more architecture content?

AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

How we store and process millions of orders daily

Post Syndicated from Grab Tech original https://engineering.grab.com/how-we-store-millions-orders

Introduction

In the real world, after a passenger places a GrabFood order from the Grab App, the merchant-partner will prepare the order. A driver-partner will then collect the food and deliver it to the passenger. Have you ever wondered what happens in the backend system? The Grab Order Platform is a distributed system that processes millions of GrabFood or GrabMart orders every day. This post aims to share the journey of how we designed the database solution that powers the order platform.

Background

What are the design goals when building the database solution? We collected the requirements by analysing query patterns and traffic patterns.

Query patterns

Here are some important query examples that the Order Platform supports:

  1. Write queries:

    a. Create an order.

    b. Update an order.

  2. Read queries:

    a. Get order by id.

    b. Get ongoing orders by passenger id.

    c. Get historical orders by various conditions.

    d. Get order statistics (for example, get the number of orders)

We can break down queries into two categories: transactional queries and analytical queries. Transactional queries are critical to online order creation and completion, including the write queries and read queries such as 2a or 2b. Analytical queries like 2c and 2d retrieves historical orders or order statistics on demand. Analytical queries are not essential to the oncall order processing.

Traffic patterns

Grab’s Order Platform processes a significant amount of transaction data every month.

During peak hours, the write Queries per Second (QPS) is three times of primary key reads; whilst the range Queries per Second are four times of the primary key reads.

Design goals

From the query and traffic patterns, we arrived at the following three design goals:

  1. Stability – the database solution must be able to handle high read and write QPS. Online order processing queries must have high availability. Even when some part of the system is down, we must be able to provide a degraded experience to the end users allowing them to still be able to create and complete an order.
  2. Scalability and cost – the database solution must be able to support fast evolution of business requirements, given now we handle up to a million orders per month. The solution must also be cost effective at a large scale.
  3. Consistency – strong consistency for transactional queries, and eventually consistency for analytical queries.

Solution

The first design principle towards a stable and scalable database solution is to use different databases to serve transactional and analytical queries, also known as OLTP and OLAP queries. An OLTP database serves queries critical to online order processing. This table keeps data for only a short period of time. Meanwhile, an OLAP database has the same set of data, but serves our historical and statistical queries. This database keeps data for a longer time.

What are the benefits from this design principle? From a stability point of view, we can choose different databases which can better fulfil our different query patterns and QPS requirements. An OLTP database is the single source of truth for online order processing; any failure in the OLAP database will not affect online transactions. From a scalability and cost point of view, we can choose a flexible database for OLAP to support our fast evolution of business requirements. We can maintain less data in our OLTP database while keeping some older data in our OLAP database.

To ensure that the data in both databases are consistent, we introduced the second design principle – data ingestion pipeline. In Figure 1, Order Platform writes data to the OLTP database to process online orders and asynchronously pushes the data into the data ingestion pipeline. The data ingestion pipeline ensures that the OLAP database data is eventually consistent.

Figure 1: Order Platform database solution overview

Architecture details

OLTP database

There are two categories of OLTP queries, the key-value queries (for example, load by order id) and the batch queries (for example, Get ongoing orders by passenger id). We use DynamoDB as the database to support these OLTP queries.

Why DynamoDB?

  1. Scalable and highly available: the tables of DynamoDB are partitioned and each partition is three-way replicated.
  2. Support for strong consistent reads by primary key.
  3. DynamoDB has a mechanism called adaptive capacity to handle hotkey traffic. Internally, DynamoDB will distribute higher capacity to high-traffic partitions, and isolate frequently accessed items to a dedicated partition. This way, the hotkey can utilise the full capacity of an entire partition, which is up to 3000 read capacity units and 1000 write capacity units.
Figure 2: DynamoDB table structure overview. Source: Amazon Web Services (2019, 28 April)

In each DynamoDB table, it has many items with attributes. In each item, it has a partition key and sort key. The partition key is used for key-value queries, and the sort key is used for range queries. In our case, the table contains multiple order items. The partition key is order ID. We can easily support key-value queries by the partition key.

order_id (PK) state pax_id created_at pax_id_gsi
order1 Ongoing Alice 9:00am
order2 Ongoing Alice 9:30am
order3 Completed Alice 8:30am

Batch queries like ‘Get ongoing orders by passenger id’ are supported by DynamoDB Global Secondary Index (GSI). A GSI is like a normal DynamoDB table, which also has keys and attributes.

In our case, we have a GSI table where the partition key is the pax_id_gsi. The attribute pax_id_gsi is linked to the main table. It is eventually consistent with the main table that is maintained by DynamoDB. If the Order Platform queries ongoing orders for Alice, two items will be returned from the GSI table.

pax_id_gsi (PK) created_at (SK) order_id
Alice 9:00am order1
Alice 9:30am order2

We also make use of an advanced feature of GSI named sparse index to support ongoing order queries. When we update order status from ongoing to completed, at the same time, we set the pax_id_gsi to empty, so that the linked item in the GSI will be automatically deleted by DynamoDB. At any time, the GSI table only stores the ongoing orders. We use a sparse index mechanism to control our table size for better performance and to be more cost effective.

The next problem is data retention. This is achieved with the DynamoDB Time To Live (TTL) feature. DynamoDB will auto-scan expired items and delete them. But the challenge is when we add TTL to big tables, it will bring a heavy load to the background scanner and might result in an outage. Our solution is to only add a TTL attribute to the new items in the table. Then, we manually delete the items without TTL attributes, and run a script to delete items with TTL attributes that are too old. After this process, the table size will be quite small, so we can enable the TTL feature on the TTL attribute that we previously added without any concern. The retention period of our DynamoDB data is three months.

Costwise, DynamoDB is charged by storage size and the provision of the read write capability. The provision capability is actually auto scalable. The cost is on-demand. So it’s generally cheaper than RDS.

OLAP database

We use MySQL RDS as the database to support historical and statistical OLAP queries.

Why not Aurora? We choose RDS mainly because it is a mature database solution. Even if Aurora can provide better high-availability, RDS is enough to support our less critical use cases. Costwise, Aurora charges by data storage and the number of requested Input/Output Operations per Second (IOPS). RDS charges only by data storage. As we are using General Purpose (SSD) storage, IOPS is free and supports up to 16k IOPS.

We use MySQL partitioning for data retention. The order table is partitioned by creation time monthly. Since the data access pattern is mostly by month, the partition key can reduce cross-partition queries. Partitions older than six months are dropped at the beginning of each month.

Data ingestion pipeline

Figure 3: Data Ingestion Pipeline Architecture.

A Kafka stream is used to process data in the data ingestion pipeline. We choose the Kafka stream, because it has 99.95% SLA. It is not restricted by the OLTP and OLAP database types.

Even if Kafka can provide 99.95% SLA, there is still the chance of stream producer failures. When the producer fails, we will store the message in an Amazon Simple Queue Service (SQS) and retry. If the retry also fails, it will be moved to the SQS dead letter queue (DLQ), to be consumed at a later time.

On the stream consumer side, we use back-off retry at both stream and database levels to ensure consistency. In a worst-case scenario, we can rewind the stream events from Kafka.

It is important for the data ingestion pipeline to handle duplicate messages and out-of-order messages.

Duplicate messages are handled by the database level unique key (for example, order ID + creation time).

For the out-of-order messages, we implemented the following two mechanisms:

  1. Version update: we only update the most recently updated data. The precision of the update time is in microseconds, which is enough for most of the use cases.
  2. Upsert: if the update events occur before the create events, we simulate an upsert operation.

Impact

After launching our solution this year, we have saved significantly on cloud costs. In the earlier solution, Order Platform synchronously writes to DynamoDB and Aurora and the data is kept forever.

Conclusion

In terms of stability, we use DynamoDB as the critical OLTP database to ensure high availability for online order processing. Scalability wise, we use RDS as the OLAP database to support our quickly evolving business requirements by using a rich, multiple index. Cost efficiency is achieved by data retention in both databases. For consistency, we built a single source of truth OLTP database and an OLAP database that is eventually consistent with the help of the data ingestion pipeline.

What’s next?

Currently, the database solution is running on the production environment. Even though the database solution is proven to be stable, scalable and consistent, we still see some potential areas of improvement.

We use MySQL RDS for OLAP data storage. Even though MySQL is stable and cost effective, it is difficult to serve more complicated queries like free text search. Hence, we plan to explore other NoSQL databases like ElasticSearch.

We hope this post helps you understand how we store Grab orders and fulfil the queries from the Grab Order Platform.

References

Amazon Web Services. (2019, 28 April) Build with DynamoDB: S1 E1 – Intro to Amazon DynamoDB [Video]. YouTube.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Formulating ‘Out of Memory Kill’ Prediction on the Netflix App as a Machine Learning Problem

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/formulating-out-of-memory-kill-prediction-on-the-netflix-app-as-a-machine-learning-problem-989599029109

by Aryan Mehra
with
Farnaz Karimdady Sharifabad, Prasanna Vijayanathan, Chaïna Wade, Vishal Sharma and Mike Schassberger

Aim and Purpose — Problem Statement

The purpose of this article is to give insights into analyzing and predicting “out of memory” or OOM kills on the Netflix App. Unlike strong compute devices, TVs and set top boxes usually have stronger memory constraints. More importantly, the low resource availability or “out of memory” scenario is one of the common reasons for crashes/kills. We at Netflix, as a streaming service running on millions of devices, have a tremendous amount of data about device capabilities/characteristics and runtime data in our big data platform. With large data, comes the opportunity to leverage the data for predictive and classification based analysis. Specifically, if we are able to predict or analyze the Out of Memory kills, we can take device specific actions to pre-emptively lower the performance in favor of not crashing — aiming to give the user the ultimate Netflix Experience within the “performance vs pre-emptive action” tradeoff limitations. A major advantage of prediction and taking pre-emptive action, is the fact that we can take actions to better the user experience.

This is done by first elaborating on the dataset curation stage — specially focussing on device capabilities and OOM kill related memory readings. We also highlight steps and guidelines for exploratory analysis and prediction to understand Out of Memory kills on a sample set of devices. Since memory management is not something one usually associates with classification problems, this blog focuses on formulating the problem as an ML problem and the data engineering that goes along with it. We also explore graphical analysis of the labeled dataset and suggest some feature engineering and accuracy measures for future exploration.

Challenges of Dataset Curation and Labeling

Unlike other Machine Learning tasks, OOM kill prediction is tricky because the dataset will be polled from different sources — device characteristics come from our on-field knowledge and runtime memory data comes from real-time user data pushed to our servers.

Secondly, and more importantly, the sheer volume of the runtime data is a lot. Several devices running Netflix will log memory usage at fixed intervals. Since the Netflix App does not get killed very often (fortunately!), this means most of these entries represent normal/ideal/as expected runtime states. The dataset will thus be very biased/skewed. We will soon see how we actually label which entries are erroneous and which are not.

Dataset Features and Components

The schema figure above describes the two components of the dataset — device capabilities/characteristics and runtime memory data. When joined together based on attributes that can uniquely match the memory entry with its device’s capabilities. These attributes may be different for different streaming services — for us at Netflix, this is a combination of the device type, app session ID and software development kit version (SDK version). We now explore each of these components individually, while highlighting the nuances of the data pipeline and pre-processing.

Device Capabilities

All the device capabilities may not reside in one source table — requiring multiple if not several joins to gather the data. While creating the device capability table, we decided to primary index it through a composite key of (device type ID, SDK version). So given these two attributes, Netflix can uniquely identify several of the device capabilities. Some nuances while creating this dataset come from the on-field domain knowledge of our engineers. Some features (as an example) include Device Type ID, SDK Version, Buffer Sizes, Cache Capacities, UI resolution, Chipset Manufacturer and Brand.

Major Milestones in Data Engineering for Device Characteristics

Structuring the data in an ML-consumable format: The device capability data needed for the prediction was distributed in over three different schemas across the Big Data Platform. Joining them together and building a single indexable schema that can directly become a part of a bigger data pipeline is a big milestone.

Dealing with ambiguities and missing data: Sometimes the entries in BDP are contaminated with testing entries and NULL values, along with ambiguous values that have no meaning or just simply contradictory values due to unreal test environments. We deal with all of this by a simple majority voting (statistical mode) on the view that is indexed by the device type ID and SDK version from the user query. We thus verify the hypothesis that actual device characteristics are always in majority in the data lake.

Incorporating On-site and field knowledge of devices and engineers: This is probably the single most important achievement of the task because some of the features mentioned above (and some of the ones redacted) involved engineering the features manually. Example: Missing values or NULL values might mean the absence of a flag or feature in some attribute, while it might require extra tasks in others. So if we have a missing value for a feature flag, that might mean “False”, whereas a missing value in some buffer size feature might mean that we need subqueries to fetch and fill the missing data.

Runtime Memory, OOM Kill Data and ground truth labeling

Runtime data is always increasing and constantly evolving. The tables and views we use are refreshed every 24 hours and joining between any two such tables will lead to tremendous compute and time resources. In order to curate this part of the dataset, we suggest some tips given below (written from the point of view of SparkSQL-like distributed query processors):

  • Filtering the entries (conditions) before JOIN, and for this purpose using WHERE and LEFT JOIN clauses carefully. Conditions that eliminate entries after the join operation are much more expensive than when elimination happens before the join. It also prevents the system running out of memory during execution of the query.
  • Restricting Testing and Analysis to one day and device at a time. It is always good to pick a single high frequency day like New Years, or Memorial day, etc. to increase frequency counts and get normalized distributions across various features.
  • Striking a balance between driver and executor memory configurations in SparkSQL-like systems. Too high allocations may fail and restrict system processes. Too low memory allocations may fail at the time of a local collect or when the driver tries to accumulate the results.

Labeling the data — Ground Truth

An important aspect of the dataset is to understand what features will be available to us at inference time. Thus memory data (that contains the navigational level and memory reading) can be labeled using the OOM kill data, but the latter cannot be reflected in the input features. The best way to do this is to use a sliding window approach where we label the memory readings of the sessions in a fixed window before the OOM kill as erroneous, and the rest of the entries as non-erroneous. In order to make the labeling more granular, and bring more variation in a binary classification model, we propose a graded window approach as explained by the image below. Basically, it assigns higher levels to memory readings closer to the OOM kill, making it a multi-class classification problem. Level 4 is the most near to the OOM kill (range of 2 minutes), whereas Level 0 is beyond 5 minutes of any OOM kill ahead of it. We note here that the device and session of the OOM kill instance and the memory reading needs to match for the sanity of the labeling. Later the confusion matrix and model’s results can later be reduced to binary if need be.

Summary of OOM Prediction — Problem Formulation

The dataset now consists of several entries — each of which has certain runtime features (navigational level and memory reading in our case) and device characteristics (a mix of over 15 features that may be numerical, boolean or categorical). The output variable is the graded or ungraded classification variable which is labeled in accordance with the section above — primarily based on the nearness of the memory reading stamp to the OOM kill. Now we can use any multi-class classification algorithm — ANNs, XGBoost, AdaBoost, ElasticNet with softmax etc. Thus we have successfully formulated the problem of OOM kill prediction for a device streaming Netflix.

Data Analysis and Observations

Without diving very deep into the actual devices and results of the classification, we now show some examples of how we could use the structured data for some preliminary analysis and make observations. We do so by just looking at the peak of OOM kills in a distribution over the memory readings within 5 minutes prior to the kill.

Different device types

From the graph above, we show how even without doing any modeling, the structured data can give us immense knowledge about the memory domain. For example, the early peaks (marked in red) are mostly crashes not visible to users, but were marked erroneously as user-facing crashes. The peaks marked in green are real user-facing crashes. Device 2 is an example of a sharp peak towards the higher memory range, with a decline that is sharp and almost no entries after the peak ends. Hence, for Device 1 and 2, the task of OOM prediction is relatively easier, after which we can start taking pre-emptive action to lower our memory usage. In case of Device 3, we have a normalized gaussian like distribution — indicating that the OOM kills occur all over, with the decline not being very sharp, and the crashes happen all over in an approximately normalized fashion.

Feature Engineering, Accuracy Measures and Future Work Directions

We leave the reader with some ideas to engineer more features and accuracy measures specific to the memory usage context in a streaming environment for a device.

  • We could manually engineer features on memory to utilize the time-series nature of the memory value when aggregated over a user’s session. Suggestions include a running mean of the last 3 values, or a difference of the current entry and running exponential average. The analysis of the growth of memory by the user could give insights into whether the kill was caused by in-app streaming demand, or due to external factors.
  • Another feature could be the time spent in different navigational levels. Internally, the app caches several pre-fetched data, images, descriptions etc, and the time spent in the level could indicate whether or not those caches are cleared.
  • When deciding on accuracy measures for the problem, it is important to analyze the distinction between false positives and false negatives. The dataset (fortunately for Netflix!) will be highly biased — as an example, over 99.1% entries are non-kill related. In general, false negatives (not predicting the kill when actually the app is killed) are more detrimental than false positives (predicting a kill even though the app could have survived). This is because since the kill happens rarely (0.9% in this example), even if we end up lowering memory and performance 2% of the time and catch almost all the 0.9% OOM kills, we will have eliminated approximately. all OOM kills with the tradeoff of lowering the performance/clearing the cache an extra 1.1% of the time (False Positives).

Summary

This post has focussed on throwing light on dataset curation and engineering when dealing with memory and low resource crashes for streaming services on device. We also cover the distinction between non-changing attributes and runtime attributes and strategies to join them to make one cohesive dataset for OOM kill prediction. We covered labeling strategies that involved graded window based approaches and explored some graphical analysis on the structured dataset. Finally, we ended with some future directions and possibilities for feature engineering and accuracy measurements in the memory context.

Stay tuned for further posts on memory management and the use of ML modeling to deal with systemic and low latency data collected at the device level. We will try to soon post results of our models on the dataset that we have created.

Acknowledgements
I would like to thank the members of various teams — Partner Engineering (Mihir Daftari, Akshay Garg), TVUI team (Andrew Eichacker, Jason Munning), Streaming Data Team, Big Data Platform Team, Device Ecosystem Team and Data Science Engineering Team (Chris Pham), for all their support.


Formulating ‘Out of Memory Kill’ Prediction on the Netflix App as a Machine Learning Problem was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Rapid Event Notification System at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/rapid-event-notification-system-at-netflix-6deb1d2b57d1

By: Ankush Gulati, David Gevorkyan
Additional credits: Michael Clark, Gokhan Ozer

Intro

Netflix has more than 220 million active members who perform a variety of actions throughout each session, ranging from renaming a profile to watching a title. Reacting to these actions in near real-time to keep the experience consistent across devices is critical for ensuring an optimal member experience. This is not an easy task, considering the wide variety of supported devices and the sheer volume of actions our members perform. To this end, we developed a Rapid Event Notification System (RENO) to support use cases that require server initiated communication with devices in a scalable and extensible manner.

In this blog post, we will give an overview of the Rapid Event Notification System at Netflix and share some of the learnings we gained along the way.

Motivation

With the rapid growth in Netflix member base and the increasing complexity of our systems, our architecture has evolved into an asynchronous one that enables both online and offline computation. Providing a seamless and consistent Netflix experience across various platforms (iOS, Android, smart TVs, Roku, Amazon FireStick, web browser) and various device types (mobile phones, tablets, televisions, computers, set top boxes) requires more than the traditional request-response model. Over time, we’ve seen an increase in use cases where backend systems need to initiate communication with devices to notify them of member-driven changes or experience updates quickly and consistently.

Use cases

  • Viewing Activity
    When a member begins to watch a show, their “Continue Watching” list should be updated across all of their devices to reflect that viewing.
  • Personalized Experience Refresh
    Netflix Recommendation engine continuously refreshes recommendations for every member. The updated recommendations need to be delivered to the device timely for an optimal member experience.
  • Membership Plan Changes
    Members often change their plan types, leading to a change in their experience that must be immediately reflected across all of their devices.
  • Member “My List” Updates
    When members update their “My List” by adding or removing titles, the changes should be reflected across all of their devices.
  • Member Profile Changes
    When members update their account settings like add/delete/rename profiles or change their preferred maturity level for content, these updates must be reflected across all of their devices.
  • System Diagnostic Signals
    In special scenarios, we need to send diagnostic signals to the Netflix app on devices to help troubleshoot problems and enable tracing capabilities.

Design Decisions

In designing the system, we made a few key decisions that helped shape the architecture of RENO:

  1. Single Events Source
  2. Event Prioritization
  3. Hybrid Communication Model
  4. Targeted Delivery
  5. Managing High RPS

Single Events Source

The use cases we wanted to support originate from various internal systems and member actions, so we needed to listen for events from several different microservices. At Netflix, our near-real-time event flow is managed by an internal distributed computation framework called Manhattan (you can learn more about it here). We leveraged Manhattan’s event management framework to create a level of indirection serving as the single source of events for RENO.

Event Prioritization

Considering the use cases were wide ranging both in terms of their sources and their importance, we built segmentation into the event processing. For example, a member-triggered event such as “change in a profile’s maturity level” should have a much higher priority than a “system diagnostic signal”. We thus assigned a priority to each use case and sharded event traffic by routing to priority-specific queues and the corresponding event processing clusters. This separation allows us to tune system configuration and scaling policies independently for different event priorities and traffic patterns.

Hybrid Communication Model

As mentioned earlier in this post, one key challenge for a service like RENO is supporting multiple platforms. While a mobile device is almost always connected to the internet and reachable, a smart TV is only online while in use. This network connection heterogeneity made choosing a single delivery model difficult. For example, entirely relying on a Pull model wherein the device frequently calls home for updates would result in chatty mobile apps. That in turn will be triggering the per-app communication limits that iOS and Android platforms enforce (we also need to be considerate of low bandwidth connections). On the other hand, using only a Push mechanism would lead smart TVs to miss notifications while they are powered off during most of the day. We therefore chose a hybrid Push AND Pull communication model wherein the server tries to deliver notifications to all devices immediately using Push notifications, and devices call home at various stages of the application lifecycle.

Using a Push-and-Pull delivery model combination also supports devices limited to a single communication model. This includes older, legacy devices that do not support Push Notifications.

Targeted Delivery

Considering the use cases were wide ranging in terms of both sources and target device types, we built support for device specific notification delivery. This capability allows notifying specific device categories as per the use case. When an actionable event arrives, RENO applies the use case specific business logic, gathers the list of devices eligible to receive this notification and attempts delivery. This helps limit the outgoing traffic footprint considerably.

Managing High RPS

With over 220 million members, we were conscious of the fact that a service like RENO needs to process many events per member during a viewing session. At peak times, RENO serves about 150k events per second. Such a high RPS during specific times of the day can create a thundering herd problem and put strain on internal and external downstream services. We therefore implemented a few optimizations:

  • Event Age
    Many events that need to be notified to the devices are time sensitive, and they are of no or little value unless sent almost immediately. To avoid processing old events, a staleness filter is applied as a gating check. If an event age is older than a configurable threshold, it is not processed. This filter weeds out events that have no value to the devices early in the processing phase and protects the queues from being flooded due to stale upstream events that may have been backed up.
  • Online Devices
    To reduce the ongoing traffic footprint, notifications are sent only to devices that are currently online by leveraging an existing registry that is kept up-to-date by Zuul (learn more about it here).
  • Scaling Policies
    To address the thundering herd problem and to keep latencies under acceptable thresholds, the cluster scale-up policies are configured to be more aggressive than the scale-down policies. This approach enables the computing power to catch up quickly when the queues grow.
  • Event Deduplication
    Both iOS and Android platforms aggressively restrict the level of activity generated by backgrounded apps, hence the reason why incoming events are deduplicated in RENO. Duplicate events can occur in case of high RPS, and they are merged together when it does not cause any loss of context for the device.
  • Bulkheaded Delivery
    Multiple downstream services are used to send push notifications to different device platforms including external ones like Apple Push Notification Service (APNS) for Apple devices and Google’s Firebase Cloud Messaging (FCM) for Android. To safeguard against a downstream service bringing down the entire notification service, the event delivery is parallelized across different platforms, making it best-effort per platform. If a downstream service or platform fails to deliver the notification, the other devices are not blocked from receiving push notifications.

Architecture

As shown in the diagram above, the RENO service can be broken down into the following components.

Event Triggers

Member actions and system-driven updates that require refreshing the experience on members’ devices.

Event Management Engine

The near-real-time event flow management framework at Netflix referred to as Manhattan can be configured to listen to specific events and forward events to different queues.

Event Priority Based Queues

Amazon SQS queues that are populated by priority-based event forwarding rules are set up in Manhattan to allow priority based sharding of traffic.

Event Priority Based Clusters

AWS Instance Clusters that subscribe to the corresponding queues with the same priority. They process all the events arriving on those queues and generate actionable notifications for devices.

Outbound Messaging System

The Netflix messaging system that sends in-app push notifications to members is used to send RENO-produced notifications on the last mile to mobile devices. This messaging system is described in this blog post.

For notifications to web, TV & other streaming devices, we use a homegrown push notification solution ​​called Zuul Push that provides “always-on” persistent connections with online devices. To learn more about the Zuul Push solution, listen to this talk from a Netflix colleague.

Persistent Store

A Cassandra database that stores all the notifications emitted by RENO for each device to allow those devices to poll for their messages at their own cadence.

Observability

At Netflix, we put a strong emphasis on building robust monitoring into our systems to provide a clear view of system health. For a high RPS service like RENO that relies on several upstream systems as its traffic source and simultaneously produces heavy traffic for different internal and external downstream systems, it is important to have a strong combination of metrics, alerting and logging in place. For alerting, in addition to the standard system health metrics such as CPU, memory, and performance, we added a number of “edge-of-the-service” metrics and logging to capture any aberrations from upstream or downstream systems. Furthermore, in addition to real-time alerting, we added trend analysis for important metrics to help catch longer term degradations. We instrumented RENO with a real time stream processing application called Mantis (you can learn more about it here). It allowed us to track events in real-time over the wire at device specific granularity thus making debugging easier. Finally, we found it useful to have platform-specific alerting (for iOS, Android, etc.) in finding the root causes of issues faster.

Wins

  • Can easily support new use cases
  • Scales horizontally with higher throughput

When we set out to build RENO the goal was limited to the “Personalized Experience Refresh” use case of the product. As the design of RENO evolved, support for new use cases became possible and RENO was quickly positioned as the centralized rapid notification service for all product areas at Netflix.

The design decisions we made early on paid off, such as making addition of new use cases a “plug-and-play” solution and providing a hybrid delivery model across all platforms. We were able to onboard additional product use cases at a fast pace thus unblocking a lot of innovation.

An important learning in building this platform was ensuring that RENO could scale horizontally as more types of events and higher throughput was needed over time. This ability was primarily achieved by allowing sharding based on either event type or priority, along with using an asynchronous event driven processing model that can be scaled by simply adding more machines for event processing.

Looking Ahead

As Netflix’s member base continues to grow at a rapid pace, it is increasingly beneficial to have a service like RENO that helps give our members the best and most up to date Netflix experience. From membership related updates to contextual personalization, and more — we are continually evolving our notifications portfolio as we continue to innovate on our member experience. Architecturally, we are evaluating opportunities to build in more features such as guaranteed message delivery and message batching that can open up more use cases and help reduce the communication footprint of RENO.

Building Great Things Together

We are just getting started on this journey to build impactful systems that help propel our business forward. The core to bringing these engineering solutions to life is our direct collaboration with our colleagues and using the most impactful tools and technologies available. If this is something that excites you, we’d love for you to join us.


Rapid Event Notification System at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.