Tag Archives: software engineering

Curbing Connection Churn in Zuul

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/curbing-connection-churn-in-zuul-2feb273a3598

By Arthur Gonigberg, Argha C

Plaintext Past

When Zuul was designed and developed, there was an inherent assumption that connections were effectively free, given we weren’t using mutual TLS (mTLS). It’s built on top of Netty, using event loops for non-blocking execution of requests, one loop per core. To reduce contention among event loops, we created connection pools for each, keeping them completely independent. The result is that the entire request-response cycle happens on the same thread, significantly reducing context switching.

There is also a significant downside. It means that if each event loop has a connection pool that connects to every origin (our name for backend) server, there would be a multiplication of event loops by servers by Zuul instances. For example, a 16-core box connecting to an 800-server origin would have 12,800 connections. If the Zuul cluster has 100 instances, that’s 1,280,000 connections. That’s a significant amount and certainly more than is necessary relative to the traffic on most clusters.

As streaming has grown over the years, these numbers multiplied with bigger Zuul and origin clusters. More acutely, if a traffic spike occurs and Zuul instances scale up, it exponentially increases connections open to origins. Although this has been a known issue for a long time, it has never been a critical pain point until we moved large streaming applications to mTLS and our Envoy-based service mesh.

Fixing the Flows

The first step in improving connection overhead was implementing HTTP/2 (H2) multiplexing to the origins. Multiplexing allows the reuse of existing connections by creating multiple streams per connection, each able to send a request. Rather than requiring a connection for every request, we could reuse the same connection for many simultaneous requests. The more we reuse connections, the less overhead we have in establishing mTLS sessions with roundtrips, handshaking, and so on.

Although Zuul has had H2 proxying for some time, it never supported multiplexing. It effectively treated H2 connections as HTTP/1 (H1). For backward compatibility with existing H1 functionality, we modified the H2 connection bootstrap to create a stream and immediately release the connection back into the pool. Future requests will then be able to reuse the existing connection without creating a new one. Ideally, the connections to each origin server should converge towards 1 per event loop. It seems like a minor change, but it had to be seamlessly integrated into our existing metrics and connection bookkeeping.

The standard way to initiate H2 connections is, over TLS, via an upgrade with ALPN (Application-Layer Protocol Negotiation). ALPN allows us to gracefully downgrade back to H1 if the origin doesn’t support H2, so we can broadly enable it without impacting customers. Service mesh being available on many services made testing and rolling out this feature very easy because it enables ALPN by default. It meant that no work was required by service owners who were already on service mesh and mTLS.

Sadly, our plan hit a snag when we rolled out multiplexing. Although the feature was stable and functionally there was no impact, we didn’t get a reduction in overall connections. Because some origin clusters were so large, and we were connecting to them from all event loops, there wasn’t enough re-use of existing connections to trigger multiplexing. Even though we were now capable of multiplexing, we weren’t utilizing it.

Divide and Conquer

H2 multiplexing will improve connection spikes under load when there is a large demand for all the existing connections, but it didn’t help in steady-state. Partitioning the whole origin into subsets would allow us to reduce total connection counts while leveraging multiplexing to maintain existing throughput and headroom.

We had discussed subsetting many times over the years, but there was concern about disrupting load balancing with the algorithms available. An even distribution of traffic to origins is critical for accurate canary analysis and preventing hot-spotting of traffic on origin instances.

Subsetting was also top of mind after reading a recent ACM paper published by Google. It describes an improvement on their long-standing Deterministic Subsetting algorithm that they’ve used for many years. The Ringsteady algorithm (figure below) creates an evenly distributed ring of servers (yellow nodes) and then walks the ring to allocate them to each front-end task (blue nodes).

The figure above is from Google’s ACM paper

The algorithm relies on the idea of low-discrepancy numeric sequences to create a naturally balanced distribution ring that is more consistent than one built on a randomness-based consistent hash. The particular sequence used is a binary variant of the Van der Corput sequence. As long as the sequence of added servers is monotonically incrementing, for each additional server, the distribution will be evenly balanced between 0–1. Below is an example of what the binary Van der Corput sequence looks like.

Another big benefit of this distribution is that it provides a consistent expansion of the ring as servers are removed and added over time, evenly spreading new nodes among the subsets. This results in the stability of subsets and no cascading churn based on origin changes over time. Each node added or removed will only affect one subset, and new nodes will be added to a different subset every time.

Here’s a more concrete demonstration of the sequence above, in decimal form, with each number between 0–1 assigned to 4 subsets. In this example, each subset has 0.25 of that range depicted with its own color.

You can see that each new node added is balanced across subsets extremely well. If 50 nodes are added quickly, they will get distributed just as evenly. Similarly, if a large number of nodes are removed, it will affect all subsets equally.

The real killer feature, though, is that if a node is removed or added, it doesn’t require all the subsets to be shuffled and recomputed. Every single change will generally only create or remove one connection. This will hold for bigger changes, too, reducing almost all churn in the subsets.

Zuul’s Take

Our approach to implement this in Zuul was to integrate with Eureka service discovery changes and feed them into a distribution ring, based on the ideas discussed above. When new origins register in Zuul, we load their instances and create a new ring, and from then on, manage it with incremental deltas. We also take the additional step of shuffling the order of nodes before adding them to the ring. This helps prevent accidental hot spotting or overlap among Zuul instances.

The quirk in any load balancing algorithm from Google is that they do their load balancing centrally. Their centralized service creates subsets and load balances across their entire fleet, with a global view of the world. To use this algorithm, the key insight was to apply it to the event loops rather than the instances themselves. This allows us to continue having decentralized, client-side load balancing while also having the benefits of accurate subsetting. Although Zuul continues connecting to all origin servers, each event loop’s connection pool only gets a small subset of the whole. We end up with a singular, global view of the distribution that we can control on each instance — and a single sequence number that we can increment for each origin’s ring.

When a request comes in, Netty assigns it to an event loop, and it remains there for the duration of the request-response lifecycle. After running the inbound filters, we determine the destination and load the connection pool for this event loop. This will pull from a mapping of loop-to-subset, giving us the limited set of nodes we’re looking for. We then load balance using a modified choice-of-2, as discussed before. If this sounds familiar, it’s because there are no fundamental changes to how Zuul works. The only difference is that we provide a loop-bound subset of nodes to the load balancer as a starting point for its decision.

Another insight we had was that we needed to replicate the number of subsets among the event loops. This allows us to maintain low connection counts for large and small origins. At the same time, having a reasonable subset size ensures we can continue providing good balance and resiliency features for the origin. Most origins require this because they are not big enough to create enough instances in each subset.

However, we also don’t want to change this replication factor too often because it would cause a reshuffling of the entire ring and introduce a lot of churn. After a lot of iteration, we ended up implementing this by starting with an “ideal” subset size. We achieve this by computing the subset size that would achieve the ideal replication factor for a given cardinality of origin nodes. We can scale the replication factor across origins by growing our subsets until the desired subset size is achieved, especially as they scale up or down based on traffic patterns. Finally, we work backward to divide the ring into even slices based on the computed subset size.

Our ideal subset side is roughly 25–50 nodes, so an origin with 400 nodes will have 8 subsets of 50 nodes. On a 32-core instance, we’ll have a replication factor of 4. However, that also means that between 200 and 400 nodes, we’re not shuffling the subsets at all. An example of this subset recomputation is in the rollout graphs below.

An interesting challenge here was to satisfy the dual constraints of origin nodes with a range of cardinality, and the number of event loops that hold the subsets. Our goal is to scale the subsets as we run on instances with higher event loops, with a sub-linear increase in overall connections, and sufficient replication for availability guarantees. Scaling the replication factor elastically described above helped us achieve this successfully.

Subsetting Success

The results were outstanding. We saw improvements across all key metrics on Zuul, but most importantly, there was a significant reduction in total connection counts and churn.

Total Connections

This graph (as well as the ones below) shows a week’s worth of data, with the typical diurnal cycle of Netflix usage. Each of the 3 colors represents our deployment regions in AWS, and the blue vertical line shows when we turned on the feature.

Total connections at peak were significantly reduced in all 3 regions by a factor of 10x. This is a huge improvement, and it makes sense if you dig into how subsetting works. For example, a machine running 16 event loops could have 8 subsets — each subset is on 2 event loops. That means we’re dividing an origin by 8, hence an 8x improvement. As to why peak improvement goes up to 10x, it’s probably related to reduced churn (below).

Churn

This graph is a good proxy for churn. It shows how many TCP connections Zuul is opening per second. You can see the before and after very clearly. Looking at the peak-to-peak improvement, there is roughly an 8x improvement.

The decrease in churn is a testament to the stability of the subsets, even as origins scale up, down, and redeploy over time.

Looking specifically at connections created in the pool, the reduction is even more impressive:

The peak-to-peak reduction is massive and clearly shows how stable this distribution is. Although hard to see on the graph, the reduction went from thousands per second at peak down to about 60. There is effectively no churn of connections, even at peak traffic.

Load Balancing

The key constraint to subsetting is ensuring that the load balance on the backends is still consistent and evenly distributed. You’ll notice all the RPS on origin nodes grouped tightly, as expected. The thicker lines represent the subset size and the total origin size.

Balance at deploy
Balance 12 hours after deploy

In the second graph, you’ll note that we recompute the subset size (blue line) because the origin (purple line) became large enough that we could get away with less replication in the subsets. In this case, we went from a subset size of 100 for 400 servers (a division of 4) to 50 (a division of 8).

System Metrics

Given the significant reduction in connections, we saw reduced CPU utilization (~4%), heap usage (~15%), and latency (~3%) on Zuul, as well.

Zuul canary metrics

Rolling it Out

As we rolled this feature out to our largest origins — streaming playback APIs — we saw the pattern above continue, but with scale, it became more impressive. On some Zuul shards, we saw a reduction of as much as 13 million connections at peak, with almost no churn.

Today the feature is rolled out widely. We’re serving the same amount of traffic but with tens of millions fewer connections. Despite the reduction of connections, there is no decrease in resiliency or load balancing. H2 multiplexing allows us to scale up requests separately from connections, and our subsetting algorithm ensures an even traffic balance.

Although challenging to get right, subsetting is a worthwhile investment.

Acknowledgments

We would also like to thank Peter Ward, Paul Wankadia, and Kavita Guliani at Google for developing this algorithm and publishing their work for the benefit of the industry.


Curbing Connection Churn in Zuul was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Consistent caching mechanism in Titus Gateway

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/consistent-caching-mechanism-in-titus-gateway-6cb89b9ce296

by Tomasz Bak and Fabio Kung

Introduction

Titus is the Netflix cloud container runtime that runs and manages containers at scale. In the time since it was first presented as an advanced Mesos framework, Titus has transparently evolved from being built on top of Mesos to Kubernetes, handling an ever-increasing volume of containers. As the number of Titus users increased over the years, the load and pressure on the system increased substantially. The original assumptions and architectural choices were no longer viable. This blog post presents how our current iteration of Titus deals with high API call volumes by scaling out horizontally.

We introduce a caching mechanism in the API gateway layer, allowing us to offload processing from singleton leader elected controllers without giving up strict data consistency and guarantees clients observe. Titus API clients always see the latest (not stale) version of the data regardless of which gateway node serves their request, and in which order.

Overview

The figure below depicts a simplified high-level architecture of a single Titus cluster (a.k.a cell):

Titus Job Coordinator is a leader elected process managing the active state of the system. Active data includes jobs and tasks that are currently running. When a new leader is elected it loads all data from external storage. Mutations are first persisted to the active data store before in-memory state is changed. Data for completed jobs and tasks is moved to the archive store first, and only then removed from the active data store and from the leader memory.

Titus Gateway handles user requests. A user request could be a job creation request, a query to the active data store, or a query to the archive store (the latter handled directly in Titus Gateway). Requests are load balanced across all Titus Gateway nodes. All reads are consistent, so it does not matter which Titus Gateway instance is serving a query. For example, it is OK to send writes through one instance, and do reads from another one with full data read consistency guarantees. Titus Gateways always connect to the current Titus Job Coordinator leader. During leader failovers, all writes and reads of the active data are rejected until a connection to the active leader is re-established.

In the original version of the system, all queries to the active data set were forwarded to a singleton Titus Job Coordinator. The freshest data is served to all requests, and clients never observe read-your-write or monotonic-read consistency issues¹:

Data consistency on the Titus API is highly desirable as it simplifies client implementation. Causal consistency, which includes read-your-writes and monotonic-reads, frees clients from implementing client-side synchronization mechanisms. In PACELC terms we choose PC/EC and have the same level of availability for writes of our previous system while improving our theoretical availability for reads.

For example, a batch workflow orchestration system may create multiple jobs which are part of a single workflow execution. After the jobs are created, it monitors their execution progress. If the system creates a new job, followed immediately by a query to get its status, and there is a data propagation lag, it might decide that the job was lost and a replacement must be created. In that scenario, the system would need to deal with the data propagation latency directly, for example, by use of timeouts or client-originated update tracking mechanisms. As Titus API reads are always consistently reflecting the up-to-date state, such workarounds are not needed.

With traffic growth, a single leader node handling all request volume started becoming overloaded. We started seeing increased response latencies and leader servers running at dangerously high utilization. To mitigate this issue we decided to handle all query requests directly from Titus Gateway nodes but still preserve the original consistency guarantees:

The state from Titus Job Coordinator is replicated over a persistent stream connection, with low event propagation latencies. A new wire protocol provided by Titus Job Coordinator allows monitoring of the cache consistency level and guarantees that clients always receive the latest data version. The cache is kept in sync with the current leader process. When there is a failover (because of node failures with the current leader or a system upgrade), a new snapshot from the freshly elected leader is loaded, replacing the previous cache state. Titus Gateways handling client requests can now be horizontally scaled out. The details and workings of these mechanisms are the primary topics of this blog post.

How do I know that my cache is up to date?

It is an easy answer for systems that were built from the beginning with a consistent data versioning scheme and can depend on clients to follow the established protocol. Kubernetes is a good example here. Each object and each collection read from the Kubernetes cluster has a unique revision which is a monotonically increasing number. A user may request all changes since the last received revision. For more details, see Kubernetes API Concepts and the Shared Informer Pattern.

In our case, we did not want to change the API contract and impose additional constraints and requirements on our users. Doing so would require a substantial migration effort to move all clients off the old API with questionable value to the affected teams (except for helping us solve Titus' internal scalability problems). In our experience, such migrations require a nontrivial amount of work, particularly with the migration timeline not fully in our control.

To fulfill the existing API contract, we had to guarantee that for a request received at a time T₀, the data returned to the client is read from a cache that contains all state updates in Titus Job Coordinator up to time T₀.

The path over which data travels from Titus Job Coordinator to a Titus Gateway cache can be described as a sequence of event queues with different processing speeds:

A message generated by the event source may be buffered at any stage. Furthermore, as each event stream subscription from Titus Gateway to Titus Job Coordinator establishes a different instance of the processing pipeline, the state of the cache in each gateway instance may be vastly different.

Let’s assume a sequence of events E₁…E₁₀, and their location within the pipeline of two Titus Gateway instances at time T₁:

If a client makes a call to Titus Gateway 2 at the time T₁, it will read version E₈ of the data. If it immediately makes a request to Titus Gateway 1, the cache there is behind with respect to the other gateway so the client might read an older version of the data.

In both cases, data is not up to date in the caches. If a client created a new object at time T₀, and the object value is captured by an event update E₁₀, this object will be missing in both gateways at time T₁. A surprise to the client who successfully completed a create request, but the follow-up query returned a not-found error (read-your-write consistency violation).

The solution is to flush all the events created up to time T₁ and force clients to wait for the cache to receive them all. This work can be split into two different steps each with its own unique solution.

Implementation details

We solved the cache synchronization problem (as stated above) with a combination of two strategies:

  • Titus Gateway <-> Titus Job Coordinator synchronization protocol over the wire.
  • Usage of high-resolution monotonic time sources like Java’s nano time within a single server process. Java’s nano time is used as a logical time within a JVM to define an order for events happening in the JVM process. An alternative solution based on an atomic integer values generator to order the events would suffice as well. Having the local logical time source avoids issues with distributed clock synchronization.

If Titus Gateways subscribed to the Titus Job Coordinator event stream without synchronization steps, the amount of data staleness would be impossible to estimate. To guarantee that a Titus Gateway received all state updates that happened until some time Tₙ an explicit synchronization between the two services must happen. Here is what the protocol we implemented looks like:

  1. Titus Gateway receives a client request (queryₐ).
  2. Titus Gateway makes a request to the local cache to fetch the latest version of the data.
  3. The local cache in Titus Gateway records the local logical time and sends it to Titus Job Coordinator in a keep-alive message (keep-aliveₐ).
  4. Titus Job Coordinator saves the keep-alive request together with the local logical time Tₐ of the request arrival in a local queue (KAₐ, Tₐ).
  5. Titus Job Coordinator sends state updates to Titus Gateway until the former observes a state update (event) with a timestamp past the recorded local logical time (E1, E2).
  6. At that time, Titus Job Coordinator sends an acknowledgment event for the keep-alive message (KAₐ keep-alive ACK).
  7. Titus Gateway receives the keep-alive acknowledgment and consequently knows that its local cache contains all state changes that happened up to the time when the keep-alive request was sent.
  8. At this point the original client request can be handled from the local cache, guaranteeing that the client will get a fresh enough version of the data (responseₐ).

This process is illustrated by the figure below:

The procedure above explains how to synchronize a Titus Gateway cache with the source of truth in Titus Job Coordinator, but it does not address how the internal queues in Titus Job Coordinator are drained to the point where all relevant messages are processed. The solution here is to add a logical timestamp to each event and guarantee a minimum time interval between messages emitted inside the event stream. If not enough events are created because of data updates, a dummy message is generated and inserted into the stream. Dummy messages guarantee that each keep-alive request is acknowledged within a bounded time, and does not wait indefinitely until some change in the system happens. For example:

Ta, Tb, Tc, Td, and Te are high-resolution monotonic logical timestamps. At time Td a dummy message is inserted, so the interval between two consecutive events in the event stream is always below a configurable threshold. These timestamp values are compared with keep-alive request arrival timestamps to know when a keep-alive acknowledgment can be sent.

There are a few optimization techniques that can be used. Here are those implemented in Titus:

  • Before sending a keep-alive request for each new client request, wait a fixed interval and send a single keep-alive request for all requests that arrived during that time. So the maximum rate of keep-alive requests is constrained by 1 / max_interval. For example, if max_interval is set to 5ms, the max keep alive request rate is 200 req / sec.
  • Collapse multiple keep-alive requests in Titus Job Coordinator, sending a response to the latest one which has the arrival timestamp less than that of the timestamp of the last event sent over the network. On the Titus Gateway side, a keep-alive response with a given timestamp acknowledges all pending requests with keep-alive timestamps earlier or equal to the received one.
  • Do not wait for cache synchronization on requests that do not have ordering requirements, serving data from the local cache on each Titus Gateway. Clients that can tolerate eventual consistency can opt into this new API for lower response times and increased availability.

Given the mechanism described so far, let’s try to estimate the maximum wait time of a client request that arrived at Titus Gateway for different scenarios. Let’s assume that the maximum keep alive interval is 5ms, and the maximum interval between events emitted in Titus Job Coordinator is 2ms.

Assuming that the system runs idle (no changes made to the data), and the client request arrives at a time when a new keep-alive request wait time starts, the cache update latency is equal to 7 milliseconds + network propagation delay + processing time. If we ignore the processing time and assume that the network propagation delay is <1ms given we have to only send back a small keep-alive response, we should expect an 8ms delay in the typical case. If the client request does not have to wait for the keep-alive to be sent, and the keep-alive request is acknowledged immediately in Titus Job Coordinator, the delay is equal to network propagation delay + processing time, which we estimated to be <1ms. The average delay introduced by cache synchronization is around 4ms.

Network propagation delays and stream processing times start to become a more important factor as the number of state change events and client requests increases. However, Titus Job Coordinator can now dedicate its capacity for serving high bandwidth streams to a finite number of Titus Gateways, relying on the gateway instances to serve client requests, instead of serving payloads to all client requests itself. Titus Gateways can then be scaled out to match client request volumes.

We ran empirical tests for scenarios of low and high request volumes, and the results are presented in the next section.

Performance test results

To show how the system performs with and without the caching mechanism, we ran two tests:

  • A test with a low/moderate load showing a median latency increase due to overhead from the cache synchronization mechanism, but better 99th percentile latencies.
  • A test with load close to the peak of Titus Job Coordinator capacity, above which the original system collapses. Previous results hold, showing better scalability with the caching solution.

A single request in the tests below consists of one query. The query is of a moderate size, which is a collection of 100 records, with a serialized response size of ~256KB. The total payload (request size times the number of concurrently running requests) requires a network bandwidth of ~2Gbps in the first test and ~8Gbps in the second one.

Moderate load level

This test shows the impact of cache synchronization on query latency in a moderately loaded system. The query rate in this test is set to 1K requests/second.

Median latency without caching is half of what we observe with the introduction of the caching mechanism, due to the added synchronization delays. In exchange, the worst-case 99th percentile latencies are 90% lower, dropping from 292 milliseconds without a cache to 30 milliseconds with the cache.

Load level close to Titus Job Coordinator maximum

If Titus Job Coordinator has to handle all query requests (when the cache is not enabled), it handles the traffic well up to 4K test queries / second, and breaks down (sharp latency increase and a rapid drop of throughput) at around 4.5K queries/sec. The maximum load test is thus kept at 4K queries/second.

Without caching enabled the 99th percentile hovers around 1000ms, and the 80th percentile is around 336ms, compared with the cache-enabled 99th percentile at 46ms and 80th percentile at 22ms. The median still looks better on the setup with no cache at 17ms vs 19ms when the cache is enabled. It should be noted however that the system with caching enabled scales out linearly to more request load while keeping the same latency percentiles, while the no-cache setup collapses with a mere ~15% additional load increase.

Doubling the load when the caching is enabled does not increase the latencies at all. Here are latency percentiles when running 8K query requests/second:

Conclusion

After reaching the limit of vertical scaling of our previous system, we were pleased to implement a real solution that provides (in a practical sense) unlimited scalability of Titus read-only API. We were able to achieve better tail latencies with a minor sacrifice in median latencies when traffic is low, and gained the ability to horizontally scale out our API gateway processing layer to handle growth in traffic without changes to API clients. The upgrade process was completely transparent, and no single client observed any abnormalities or changes in API behavior during and after the migration.

The mechanism described here can be applied to any system relying on a singleton leader elected component as the source of truth for managed data, where the data fits in memory and latency is low.

As for prior art, there is ample coverage of cache coherence protocols in the literature, both in the context of multiprocessor architectures (Adve & Gharachorloo, 1996) and distributed systems (Gwertzman & Seltzer, 1996). Our work fits within mechanisms of client polling and invalidation protocols explored by Gwertzman and Seltzer (1996) in their survey paper. Central timestamping to facilitate linearizability in read replicas is similar to the Calvin system (example real-world implementations in systems like FoundationDB) as well as the replica watermarking in AWS Aurora.

¹ Designing Data-Intensive Applications is an excellent book that goes into detail about consistency models discussed in this blog post.

² Adve, S. V., & Gharachorloo, K. (1996). Shared memory consistency models: A tutorial. computer, 29(12), 66–76.

³ Gwertzman, J., & Seltzer, M. I. (1996, January). World Wide Web Cache Consistency. In USENIX annual technical conference (Vol. 141, p. 152).


Consistent caching mechanism in Titus Gateway was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Achieving observability in async workflows

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/achieving-observability-in-async-workflows-cd89b923c784

Written by Colby Callahan, Megha Manohara, and Mike Azar.

Managing and operating asynchronous workflows can be difficult without the proper tools and architecture that puts observability, debugging, and tracing at the forefront.

Imagine getting paged outside normal work hours — users are having trouble with the application you’re responsible for, and you start diving into logs. However, they are scattered across multiple systems, and there isn’t an easy way to tie related messages together. Once you finally find useful identifiers, you may begin writing SQL queries against your production database to find out what went wrong. You’re joining tables, resolving status types, cross-referencing data manually with other systems, and by the end of it all you ask yourself why?

An upset on-call

This was the experience for us as the backend team on Prodicle Distribution, which is one of the many services offered in the suite of content production-facing applications called Prodicle.

Prodicle is one of the many applications that is at the exciting intersection of connecting the world of content productions to Netflix Studio Engineering. It enables a Production Office Coordinator to keep a Production’s cast, crew, and vendors organized and up to date with the latest information throughout the course of a title’s filming. (e.g. Netflix original series such as La Casa De Papel), as well as with Netflix Studio.

Users of Prodicle: Production Office Coordinator on their job

As the adoption of Prodicle grew over time, Productions asked for more features, which led to the system quickly evolving in multiple programming languages under different teams. When our team took ownership of Prodicle Distribution, we decided to revamp the service and expand its implementation to multiple UI clients built for web, Android and iOS.

Prodicle Distribution

Prodicle Distribution allows a production office coordinator to send secure, watermarked documents, such as scripts, to crew members as attachments or links, and track delivery. One distribution job might result in several thousand watermarked documents and links being created. If a job has 10 files and 20 recipients, then we have 10 x 20 = 200 unique watermarked documents and (optionally) links associated with them depending on the type of the Distribution job. The recipients of watermarked documents are able to access these documents and links in their email as well as in the Prodicle mobile application.

Prodicle Distribution

Our service is required to be elastic and handle bursty traffic. It also needs to handle third-party integration with Google Drive, making copies of PDFs with watermarks specific to each recipient, adding password protection, creating revocable links, generating thumbnails, and sending emails and push notifications. We are expected to process 1,000 watermarks for a single distribution in a minute, with non-linear latency growth as the number of watermarks increases. The goal is to process these documents as fast as possible and reliably deliver them to recipients while offering strong observability to both our users and internal teams.

Prodicle Distribution Requirements

Asynchronous workflow

Previously, the Distribution feature of Prodicle was treated as its own unique application. In late 2019, our team started integrating it with the rest of the ecosystem by writing a thin Java Domain graph service (DGS) to wrap the asynchronous watermarking functionality that was then in Ruby on Rails. The watermarking functionality, at the start, was a simple offering with various Google Drive integrations for storage and links. Our team was responsible for Google integrations, watermarking, bursty traffic management, and on-call support for this application. We had to traverse multiple codebases, and observability systems to debug errors and inefficiencies in the system. Things got hairy. New feature requests were adding to the maintenance burden for the team.

Initial offering of Prodicle Distribution backend

When we decided to migrate the asynchronous workflow to Java, we landed on these additional requirements: 1. We wanted a scalable service that was near real-time, 2. We wanted a workflow orchestrator with good observability for developers, and 3. We wanted to delegate the responsibility of watermarking and bursty traffic management for our asynchronous functions to appropriate teams.

Migration consideration for Prodicle Distribution’s asynchronous workflow

We evaluated what it would take to do this ourselves or rely on the offerings from our platform teams — Conductor and one of the new offerings Cosmos. Even though Cosmos was developed for asynchronous media processing, we worked with them to expand to generic file processing and tune their workflow platform for our near real-time use case. Early prototypes and load tests validated that the offering could meet our needs. We leaned into Cosmos because of the low variance in latency through the system, separation of concerns between the API, workflow, and the function systems, ease of load testing, customizable API layer and notifications, support for File I/O abstractions and elastic functions. Another benefit was their observability portal and its capabilities with search. We also migrated the ownership of watermarking to another internal team to focus on developing and supporting additional features.

Current architecture of Prodicle Distribution on Cosmos

With Cosmos, we are well-positioned to expand to future use cases like watermarking on images and videos. The Cosmos team is dedicated to improving features and functionality over the next year to make observations of our async workflows even better. It is great to have a team that will be improving the platform in the background as we continue our application development. We expect the performance and scaling to continue to get better without much effort on our part. We also expect other services to move some of their processing functionality into Cosmos, which makes integrations even easier because services can expose a function within the platform instead of GRPC or REST endpoints. The more services move to Cosmos, the bigger the value proposition becomes.

Deployed to Production for Productions

With productions returning to work in the midst of a global pandemic, the adoption of Prodicle Distribution has grown 10x, between June 2020 and April 2021. Starting January 2021 we did an incremental release of Prodicle Distribution on Cosmos and completed the migration in April 2021. We now support hundreds of productions, with tens of thousands of Distribution jobs, and millions of watermarks every month.

With our migration of Prodicle Distribution to Cosmos, we are able to use their observability portal called Nirvana to debug our workflow and bottlenecks.

Observing Prodicle Distribution on Cosmos in Nirvana

Now that we have a platform team dedicated to the management of our async infrastructure and watermarking, our team can better maintain and support the distribution of documents. Since our migration, the number of support tickets has decreased. It is now easier for the on-call engineer and the developers to find the associated logs and traces while visualizing the state of the asynchronous workflow and data in the whole system.

A stress-free on-call


Achieving observability in async workflows was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Growth Engineering at Netflix- Creating a Scalable Offers Platform

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/growth-engineering-at-netflix-creating-a-scalable-offers-platform-69330136dd87

by Eric Eiswerth

Background

Netflix has been offering streaming video-on-demand (SVOD) for over 10 years. Throughout that time we’ve primarily relied on 3 plans (Basic, Standard, & Premium), combined with the 30-day free trial to drive global customer acquisition. The world has changed a lot in this time. Competition for people’s leisure time has increased, the device ecosystem has grown phenomenally, and consumers want to watch premium content whenever they want, wherever they are, and on whatever device they prefer. We need to be constantly adapting and innovating as a result of this change.

The Growth Engineering team is responsible for executing growth initiatives that help us anticipate and adapt to this change. In particular, it’s our job to design and build the systems and protocols that enable customers from all over the world to sign up for Netflix with the plan features and incentives that best suit their needs. For more background on Growth Engineering and the signup funnel, please have a look at our previous blog post that covers the basics. Alternatively, here’s a quick review of what the typical user journey for a signup looks like:

Signup Funnel Dynamics

There are 3 steps in a basic Netflix signup. We refer to these steps that comprise a user journey as a signup flow. Each step of the flow serves a distinct purpose.

  1. Introduction and account creation
    Highlight our value propositions and begin the account creation process.
  2. Plans & offers
    Highlight the various types of Netflix plans, along with any potential offers.
  3. Payment
    Highlight the various payment options we have so customers can choose what suits their needs best.

The primary focus for the remainder of this post will be step 2: plans & offers. In particular, we’ll define plans and offers, review the legacy architecture and some of its shortcomings, and dig into our new architecture and some of its advantages.

Plans & Offers

Definitions

Let’s define what a plan and an offer is at Netflix. A plan is essentially a set of features with a price.

An offer is an incentive that typically involves a monetary discount or superior product features for a limited amount of time. Broadly speaking, an offer consists of one or more incentives and a set of attributes.

When we merge these two concepts together and present them to the customer, we have the plan selection page (shown above). Here, you can see that we have 3 plans and a 30-day free trial offer, regardless of which plan you choose. Let’s take a deeper look at the architecture, protocols, and systems involved.

Legacy Architecture

As previously mentioned, Netflix has had a relatively static set of plans and offers since the inception of streaming. As a result of this simple product offering, the architecture was also quite straightforward. It consisted of a small set of XML files that were loaded at runtime and stored in local memory. This was a perfectly sufficient design for many years. However, there are some downsides as the company continues to grow and the product continues to evolve. To name a few:

  • Updating XML files is error-prone and manual in nature.
  • A full deployment of the service is required whenever the XML files are updated.
  • Updating the XML files requires engaging domain experts from the backend engineering team that owns these files. This pulls them away from other business-critical work and can be a distraction.
  • A flat domain object structure that resulted in client-side logic in order to extract relevant plan and offer information in order to render the UI. For example, consider the data structure for a 30 day free trial on the Basic plan.
{
"offerId": 123,
"planId": 111,
"price": "$8.99",
"hasSD": true,
"hasHD": false,
"hasFreeTrial": true,
etc…
}
  • As the company matures and our product offering adapts to our global audience, all of the above issues are exacerbated further.

Below is a visual representation of the various systems involved in retrieving plan and offer data. Moving forward, we’ll refer to the combination of plan and offer data simply as SKU (Stock Keeping Unit) data.

New Architecture

If you recall from our previous blog post, Growth Engineering owns the business logic and protocols that allow our UI partners to build lightweight and flexible applications for almost any platform. This implies that the presentation layer should be void of any business logic and should simply be responsible for rendering data that is passed to it. In order to accomplish this we have designed a microservice architecture that emphasizes the Separation of Concerns design principle. Consider the updated system interaction diagram below:

There are 2 noteworthy changes that are worth discussing further. First, notice the presence of a dedicated SKU Eligibility Service. This service contains specialized business logic that used to be part of the Orchestration Service. By migrating this logic to a new microservice we simplify the Orchestration Service, clarify ownership over the domain, and unlock new use cases since it is now possible for other services not shown in this diagram to also consume eligible SKU data.

Second, notice that the SKU Service has been extended to a platform, which now leverages a rules engine and SKU catalog DB. This platform unlocks tremendous business value since product-oriented teams are now free to use the platform to experiment with different product offerings for our global audience, with little to no code changes required. This means that engineers can spend less time doing tedious work and more time designing creative solutions to better prepare us for future needs. Let’s take a deeper look at the role of each service involved in retrieving SKU data, starting from the visitor’s device and working our way down the stack.

Step 1 — Device sends a request for the plan selection page
As discussed in our previous Growth Engineering blog post, we use a custom JSON protocol between our client UIs and our middle-tier Orchestration Service. An example of what this protocol might look like for a browser request to retrieve the plan selection page shown above might look as follows:

GET /plans
{
“flow”: “browser”,
“mode”: “planSelection”
}

As you can see, there are 2 critical pieces of information in this request:

  • Flow — The flow is a way to identify the platform. This allows the Orchestration Service to route the request to the appropriate platform-specific request handling logic.
  • Mode — This is essentially the name of the page being requested.

Given the flow and mode, the Orchestration Service can then process the request.

Step 2 — Request is routed to the Orchestration Service for processing
The Orchestration Service is responsible for validating upstream requests, orchestrating calls to downstream services, and composing JSON responses during a signup flow. For this particular request the Orchestration Service needs to retrieve the SKU data from the SKU Eligibility Service and build the JSON response that can be consumed by the UI layer.

The JSON response for this request might look something like below. Notice the difference in data structures from the legacy implementation. This new contextual representation facilitates greater reuse, as well as potentially supporting offers other than a 30 day free trial:

{
“flow”: “browser”,
“mode”: “planSelection”,
“fields”: {
“skus”: [
{
“id”: 123,
“incentives”: [“FREE_TRIAL”],
“plan”: {
“name”: “Basic”,
“quality”: “SD”,
“price” : “$8.99”,
...
}
...
},
{
“id”: 456,
“incentives”: [“FREE_TRIAL”],
“plan”: {
“name”: “Standard”,
“quality”: “HD”,
“price” : “$13.99”,
...
}
...
},
{
“id”: 789,
“incentives”: [“FREE_TRIAL”],
“plan”: {
“name”: “Premium”,
“quality”: “UHD”,
“price” : “$17.99”,
...
}
...
}
],
“selectedSku”: {
“type”: “Numeric”,
“value”: 789
}
"nextAction": {
"type": "Action"
"withFields": [
"selectedSku"
]
}
}
}

As you can see, the response contains a list of SKUs, the selected SKU, and an action. The action corresponds to the button on the page and the withFields specify which fields the server expects to have sent back when the button is clicked.

Step 3 & 4 — Determine eligibility and retrieve eligible SKUs from SKU Eligibility Service
Netflix is a global company and we often have different SKUs in different regions. This means we need to distinguish between availability of SKUs and eligibility for SKUs. You can think of eligibility as something that is applied at the user level, while availability is at the country level. The SKU Platform contains the global set of SKUs and as a result, is said to control the availability of SKUs. Eligibility for SKUs is determined by the SKU Eligibility Service. This distinction creates clear ownership boundaries and enables the Growth Engineering team to focus on surfacing the correct SKUs for our visitors.

This centralization of eligibility logic in the SKU Eligibility Service also enables innovation in different parts of the product that have traditionally been ignored. Different services can now interface directly with the SKU Eligibility Service in order to retrieve SKU data.

Step 5 — Retrieve eligible SKUs from SKU Platform
The SKU Platform consists of a rules engine, a database, and application logic. The database contains the plans, prices and offers. The rules engine provides a means to extract available plans and offers when certain conditions within a rule match. Let’s consider a simple example where we attempt to retrieve offers in the US.

Keeping the Separation of Concerns in mind, notice that the SKU Platform has only one core responsibility. It is responsible for managing all Netflix SKUs. It provides access to these SKUs via a simple API that takes customer context and attempts to match it against the set of SKU rules. SKU eligibility is computed upstream and is treated just as any other condition would be in the SKU ruleset. By not coupling the concepts of eligibility and availability into a single service, we enable increased developer productivity since each team is able to focus on their core competencies and any change in eligibility does not affect the SKU Platform. One of the core tenets of a platform is the ability to support self-service. This negates the need to engage the backend domain experts for every desired change. The SKU Platform supports this via lightweight configuration changes to rules that do not require a full deployment. The next step is to invest further into self-service and support rule changes via a SKU UI. Stay tuned for more details on this, as well as more details on the internals of the new SKU Platform in one of our upcoming blog posts.

Conclusion

This work was a large cross-functional effort. We rebuilt our offers and plans from the ground up. It resulted in systems changes, as well as interaction changes between teams. Where there was once ambiguity, we now have clearly defined ownership over SKU availability and eligibility. We are now capable of introducing new plans and offers in various markets around the globe in order to meet our customer’s needs, with minimal engineering effort.

Let’s review some of the advantages the new architecture has over the legacy implementation. To name a few:

  • Domain objects that have a more reusable and extensible “shape”. This shape facilitates code reuse at the UI layer as well as the service layers.
  • A SKU Platform that enables product innovation with minimal engineering involvement. This means engineers can focus on more challenging and creative solutions for other problems. It also means fewer engineering teams are required to support initiatives in this space.
  • Configuration instead of code for updating SKU data, which improves innovation velocity.
  • Lower latency as a result of fewer service calls, which means fewer errors for our visitors.

The world is constantly changing. Device capabilities continue to improve. How, when, and where people want to be entertained continues to evolve. With these types of continued investments in infrastructure, the Growth Engineering team is able to build a solid foundation for future innovations that will allow us to continue to deliver the best possible experience for our members.

Join Growth Engineering and help us build the next generation of services that will allow the next 200 million subscribers to experience the joy of Netflix.


Growth Engineering at Netflix- Creating a Scalable Offers Platform was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Growth Engineering at Netflix — Automated Imagery Generation

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/growth-engineering-at-netflix-automated-imagery-generation-5a105fd51569

Growth Engineering at Netflix — Automated Imagery Generation

by Eric Eiswerth

Background

There’s a good chance you’ve probably visited the Netflix homepage. In the Growth Engineering team, we refer to this as the top of the signup funnel. For more background on the signup funnel and Growth Engineering’s role in the signup funnel, please read our initial post on the topic: Growth Engineering at Netflix — Accelerating Innovation. The primary focus of this post will be the top of the signup funnel. In particular, the Netflix homepage:

As discussed in our previous post, Growth Engineering owns the business logic and protocols that allow our UI partners to build lightweight and flexible applications for almost any platform. In some cases, like the homepage, this even involves providing appropriate imagery (e.g., the background image shown above). In this post, we’ll take a deep dive into the journey of content-based imagery on the Netflix homepage.

Motivation

At Netflix we do one thing — entertainment — and we aim to do it really well. We live and breathe TV shows and films, and we want everyone to be able to enjoy them too. That’s why we aspire to have best in class stories, across genres and believe people should have access to new voices, cultures and perspectives. The member-focused teams at Netflix are responsible for making sure the member experience is relevant and personalized, ensuring that this content is shown to the right people at the right time. But what about non-members; those who are simply interested in signing up for Netflix, how should we highlight our content and convey our value propositions to them?

The Solution

The main mechanism for highlighting our content in the signup flow is through content-based imagery. Before designing a solution it’s important to understand the main product requirements for such a feature:

  • The content needs to be new, relevant, and regional (not all countries have the same catalogue).
  • The artwork needs to appeal to a broader audience. The non-member homepage serves a very broad audience and is not personalized to the extent of the member experience.
  • The imagery needs to be localized.
  • We need to be able to easily determine what imagery is present for a given platform, region, and language.
  • The homepage needs to load in a reasonable amount of time, even in poor network conditions.

Unpacking Product Requirements

Given the scale we require and the product requirements listed above, there are a number of technical requirements:

  • A list of titles for the asset, in some order.
  • Ensure the titles are appropriate for a broad audience, which means all titles need to be tagged with metadata.
  • Localized images for each of the titles.
  • Different assets for different device types and screen sizes.
  • Server-generated assets, since client-side generation would require the retrieval of many individual images, which would increase latency and time-to-render.
  • To reduce latency, assets should be generated in an offline fashion and not in real time.
  • The assets need to be compressed, without reducing quality significantly.
  • The assets will need to be stored somewhere and we’ll need to generate URLs for each of them.
  • We’ll need to figure out how to provide the correct asset URL for a given request.
  • We’ll need to build a search index so that the assets can be searchable.

Given this set of requirements, we can effectively break this work down into 3 functional buckets:

The Design

For our design, we decided to build 3 separate microservices, mapping to the aforementioned functional buckets. Let’s take a look at each of these services in turn.

Asset Generation

The Asset Generation Service is responsible for generating groups of assets. We call these groups of assets, asset groups. Each request will generate a single asset group that will contain one or more assets. To support the demands of our stakeholders we designed a Domain Specific Language (DSL) that we call an asset generation recipe. An asset generation request contains a recipe. Below is an example of a simple recipe:

{
"titleIds": [12345, 23456, 34567, …],
"countries": [“US”],
"type": “perspective”, // this is the design of the asset
"rows": 10, // the number of rows and columns can control density
"cols": 15,
"padding": 10, // padding between individual images
"columnOffsets": [0, 0, 0, 0…], // the y-offset for each column
"rowOffsets": [0, -100, 0, -100, …], // the x-offset for each row
"size": [1920, 1080] // size in pixels
}

This recipe can then be issued via an HTTP POST request to the Asset Generation Service. The recipe can then be translated into ImageMagick commands that can do the heavy lifting. At a high level, the following diagram captures the necessary steps required to build an asset.

Generating a single localized asset is a big achievement, but we still need to store the asset somewhere and have the ability to search for it. This requires an asset storage solution.

Asset Storage

We refer to asset storage and management simply as asset management. We felt it would be beneficial to create a separate microservice for asset management for 2 reasons. First, asset generation is CPU intensive and bursty. We can leverage high performance VMs in AWS to generate the assets. We can scale up when generation is occurring and scale down when there is no batch in the queue. However, it would be cost-inefficient to leverage this same hardware for lightweight and more consistent traffic patterns that an asset management service requires.

Let’s take a look at the internals of the Asset Management Service.

At this point we’ve laid out all the details in order to generate a content-based asset and have it stored as part of an asset group, which is persisted and indexed. The next thing to consider is, how do we retrieve an asset in real time and surface it on the Netflix homepage?

If you recall in our previous blog post, Growth Engineering owns a service called the Orchestration Service. It is a mid-tier service that emits a custom JSON data structure that contains fields that are consumed by the UI. The UI can then use these fields to control the presentation in the UI layer. There are two approaches for adding fields to the Orchestration Service’s response. First, the fields can be coded by hand. Second, fields can be added via configuration via a service we call the Customization Service. Since assets will need to be periodically refreshed and we want this process to be entirely automated, it makes sense to pursue the configuration-based approach. To accomplish this, the Asset Management Service needs to translate an asset group into a rule definition for the Customization Service.

Customization Service

Let’s review the Orchestration Service and introduce the Customization Service. The Orchestration Service emits fields in response to upstream requests. For the homepage, there are typically only a small number of fields provided by the Orchestration Service. The following fields are supplied by application code. For example:

{
“fields”: {
“email” : {
“type”: “StringField”,
“value”: “”
},
“nextAction”: {
“type”: “Action”,
“withFields” [“email”]
}
}
}

The Orchestration Service also supports fields supplied by configuration. We call these adaptive fields. Adaptive fields are provided by the Customization Service. The Customization Service is a rules engine that emits the adaptive fields. For example, a rule to provide the background image for the homepage in the en-US locale would look as follows:

{
“country”: “US”,
“language”: “en”,
“platform”: “browser”,
“resolution”: “high”
}

The corresponding payload for such a rule might look as follows:

{
“backgroundImage”: “https://cdn.netflix.com/bgimageurl.jpg”
}

Bringing this all together, the response from the Orchestration Service would now look as follows:

{
“fields”: {
“email” : {
“type”: “StringField”,
“value”: “”
},
“nextAction”: {
“type”: “Action”,
“withFields” [“email”]
}
},
“adaptiveFields”: {
“backgroundImage”: “https://cdn.netflix.com/bgimageurl.jpg”
}
}

At this point, we are now able to generate an asset, persist it, search it, and generate customization rules for it. The generated rules then enable us to return a particular asset for a particular request. Let’s put it all together and review the system interaction diagram.

We now have all the pieces in place to automatically generate artwork and have that artwork appear on the Netflix homepage for a given request. At least one open question remains, how can we scale asset generation?

Scaling Asset Generation

Arguably, there are a number of approaches that could be used to scale asset generation. We decided to opt for an all-or-nothing approach. Meaning, all assets for a given recipe need to be generated as a single asset group. This enables smooth rollback in case of any errors. Additionally, asset generation is CPU intensive and each recipe can produce 1000s of assets as a result of the number of platform, region, and language permutations. Even with high performance VMs, generating 1000s of assets can take a long time. As a result, we needed to find a way to distribute asset generation across multiple VMs. Here’s what the final architecture looked like.

Briefly, let’s review the steps:

  1. The batch process is initiated by a cron job. The job executes a script that contains an asset generation recipe.
  2. The Asset Generation Service receives the request and creates asset generation tasks that can be distributed across any number of Asset Generation Worker nodes. One of the nodes is elected as the leader via Zookeeper. Its job is to coordinate asset generation across the other workers and ensure all assets get generated.
  3. Once the primary worker node has all the assets, it creates an asset group in the Asset Management Service. The Asset Management Service persists, indexes, and uploads the assets to the CDN.
  4. Finally, the Asset Management Service creates rules from the asset group and pushes the rules to the Customization Service. Once the data is published in the Customization Service, the Orchestration Service can supply the correct URLs in its JSON response by invoking the Customization Service with a request context that matches a given set of rules.

Conclusion

Automated asset generation has proven to be an extremely valuable investment. It is low-maintenance, high-leverage, and has allowed us to experiment with a variety of different types of assets on different platforms and on different parts of the product. This project was technically challenging and highly rewarding, both to the engineers involved in the project, and to the business. The Netflix homepage has come a long way over the last several years.

We’re hiring! Join Growth Engineering and help us build the future of Netflix.


Growth Engineering at Netflix — Automated Imagery Generation was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.