All posts by Netflix Technology Blog

Recommending for Long-Term Member Satisfaction at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/recommending-for-long-term-member-satisfaction-at-netflix-ac15cada49ef

By Jiangwei Pan, Gary Tang, Henry Wang, and Justin Basilico

Introduction

Our mission at Netflix is to entertain the world. Our personalization algorithms play a crucial role in delivering on this mission for all members by recommending the right shows, movies, and games at the right time. This goal extends beyond immediate engagement; we aim to create an experience that brings lasting enjoyment to our members. Traditional recommender systems often optimize for short-term metrics like clicks or engagement, which may not fully capture long-term satisfaction. We strive to recommend content that not only engages members in the moment but also enhances their long-term satisfaction, which increases the value they get from Netflix, and thus they’ll be more likely to continue to be a member.

Recommendations as Contextual Bandit

One simple way we can view recommendations is as a contextual bandit problem. When a member visits, that becomes a context for our system and it selects an action of what recommendations to show, and then the member provides various types of feedback. These feedback signals can be immediate (skips, plays, thumbs up/down, or adding items to their playlist) or delayed (completing a show or renewing their subscription). We can define reward functions to reflect the quality of the recommendations from these feedback signals and then train a contextual bandit policy on historical data to maximize the expected reward.

Improving Recommendations: Models and Objectives

There are many ways that a recommendation model can be improved. They may come from more informative input features, more data, different architectures, more parameters, and so forth. In this post, we focus on a less-discussed aspect about improving the recommender objective by defining a reward function that tries to better reflect long-term member satisfaction.

Retention as Reward?

Member retention might seem like an obvious reward for optimizing long-term satisfaction because members should stay if they’re satisfied, however it has several drawbacks:

  • Noisy: Retention can be influenced by numerous external factors, such as seasonal trends, marketing campaigns, or personal circumstances unrelated to the service.
  • Low Sensitivity: Retention is only sensitive for members on the verge of canceling their subscription, not capturing the full spectrum of member satisfaction.
  • Hard to Attribute: Members might cancel only after a series of bad recommendations.
  • Slow to Measure: We only get one signal per account per month.

Due to these challenges, optimizing for retention alone is impractical.

Proxy Rewards

Instead, we can train our bandit policy to optimize a proxy reward function that is highly aligned with long-term member satisfaction while being sensitive to individual recommendations. The proxy reward r(user, item) is a function of user interaction with the recommended item. For example, if we recommend “One Piece” and a member plays then subsequently completes and gives it a thumbs-up, a simple proxy reward might be defined as r(user, item) = f(play, complete, thumb).

Click-through rate (CTR)

Click-through rate (CTR), or in our case play-through rate, can be viewed as a simple proxy reward where r(user, item) = 1 if the user clicks a recommendation and 0 otherwise. CTR is a common feedback signal that generally reflects user preference expectations. It is a simple yet strong baseline for many recommendation applications. In some cases, such as ads personalization where the click is the target action, CTR may even be a reasonable reward for production models. However, in most cases, over-optimizing CTR can lead to promoting clickbaity items, which may harm long-term satisfaction.

Beyond CTR

To align the proxy reward function more closely with long-term satisfaction, we need to look beyond simple interactions, consider all types of user actions, and understand their true implications on user satisfaction.

We give a few examples in the Netflix context:

  • Fast season completion ✅: Completing a season of a recommended TV show in one day is a strong sign of enjoyment and long-term satisfaction.
  • Thumbs-down after completion ❌: Completing a TV show in several weeks followed by a thumbs-down indicates low satisfaction despite significant time spent.
  • Playing a movie for just 10 minutes ❓: In this case, the user’s satisfaction is ambiguous. The brief engagement might indicate that the user decided to abandon the movie, or it could simply mean the user was interrupted and plans to finish the movie later, perhaps the next day.
  • Discovering new genres ✅ ✅: Watching more Korean or game shows after “Squid Game” suggests the user is discovering something new. This discovery was likely even more valuable since it led to a variety of engagements in a new area for a member.

Reward Engineering

Reward engineering is the iterative process of refining the proxy reward function to align with long-term member satisfaction. It is similar to feature engineering, except that it can be derived from data that isn’t available at serving time. Reward engineering involves four stages: hypothesis formation, defining a new proxy reward, training a new bandit policy, and A/B testing. Below is a simple example.

Challenge: Delayed Feedback

User feedback used in the proxy reward function is often delayed or missing. For example, a member may decide to play a recommended show for just a few minutes on the first day and take several weeks to fully complete the show. This completion feedback is therefore delayed. Additionally, some user feedback may never occur; while we may wish otherwise, not all members provide a thumbs-up or thumbs-down after completing a show, leaving us uncertain about their level of enjoyment.

We could try and wait to give a longer window to observe feedback, but how long should we wait for delayed feedback before computing the proxy rewards? If we wait too long (e.g., weeks), we miss the opportunity to update the bandit policy with the latest data. In a highly dynamic environment like Netflix, a stale bandit policy can degrade the user experience and be particularly bad at recommending newer items.

Solution: predict missing feedback

We aim to update the bandit policy shortly after making a recommendation while also defining the proxy reward function based on all user feedback, including delayed feedback. Since delayed feedback has not been observed at the time of policy training, we can predict it. This prediction occurs for each training example with delayed feedback, using already observed feedback and other relevant information up to the training time as input features. Thus, the prediction also gets better as time progresses.

The proxy reward is then calculated for each training example using both observed and predicted feedback. These training examples are used to update the bandit policy.

But aren’t we still only relying on observed feedback in the proxy reward function? Yes, because delayed feedback is predicted based on observed feedback. However, it is simpler to reason about rewards using all feedback directly. For instance, the delayed thumbs-up prediction model may be a complex neural network that takes into account all observed feedback (e.g., short-term play patterns). It’s more straightforward to define the proxy reward as a simple function of the thumbs-up feedback rather than a complex function of short-term interaction patterns. It can also be used to adjust for potential biases in how feedback is provided.

The reward engineering diagram is updated with an optional delayed feedback prediction step.

Two types of ML models

It’s worth noting that this approach employs two types of ML models:

  • Delayed Feedback Prediction Models: These models predict p(final feedback | observed feedbacks). The predictions are used to define and compute proxy rewards for bandit policy training examples. As a result, these models are used offline during the bandit policy training.
  • Bandit Policy Models: These models are used in the bandit policy π(item | user; r) to generate recommendations online and in real-time.

Challenge: Online-Offline Metric Disparity

Improved input features or neural network architectures often lead to better offline model metrics (e.g., AUC for classification models). However, when these improved models are subjected to A/B testing, we often observe flat or even negative online metrics, which can quantify long-term member satisfaction.

This online-offline metric disparity usually occurs when the proxy reward used in the recommendation policy is not fully aligned with long-term member satisfaction. In such cases, a model may achieve higher proxy rewards (offline metrics) but result in worse long-term member satisfaction (online metrics).

Nevertheless, the model improvement is genuine. One approach to resolve this is to further refine the proxy reward definition to align better with the improved model. When this tuning results in positive online metrics, the model improvement can be effectively productized. See [1] for more discussions on this challenge.

Summary and Open Questions

In this post, we provided an overview of our reward engineering efforts to align Netflix recommendations with long-term member satisfaction. While retention remains our north star, it is not easy to optimize directly. Therefore, our efforts focus on defining a proxy reward that is aligned with long-term satisfaction and sensitive to individual recommendations. Finally, we discussed the unique challenge of delayed user feedback at Netflix and proposed an approach that has proven effective for us. Refer to [2] for an earlier overview of the reward innovation efforts at Netflix.

As we continue to improve our recommendations, several open questions remain:

  • Can we learn a good proxy reward function automatically by correlating behavior with retention?
  • How long should we wait for delayed feedback before using its predicted value in policy training?
  • How can we leverage Reinforcement Learning to further align the policy with long-term satisfaction?

References

[1] Deep learning for recommender systems: A Netflix case study. AI Magazine 2021. Harald Steck, Linas Baltrunas, Ehtsham Elahi, Dawen Liang, Yves Raimond, Justin Basilico.

[2] Reward innovation for long-term member satisfaction. RecSys 2023. Gary Tang, Jiangwei Pan, Henry Wang, Justin Basilico.


Recommending for Long-Term Member Satisfaction at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Improve Your Next Experiment by Learning Better Proxy Metrics From Past Experiments

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/improve-your-next-experiment-by-learning-better-proxy-metrics-from-past-experiments-64c786c2a3ac

By Aurélien Bibaut, Winston Chou, Simon Ejdemyr, and Nathan Kallus

We are excited to share our work on how to learn good proxy metrics from historical experiments at KDD 2024. This work addresses a fundamental question for technology companies and academic researchers alike: how do we establish that a treatment that improves short-term (statistically sensitive) outcomes also improves long-term (statistically insensitive) outcomes? Or, faced with multiple short-term outcomes, how do we optimally trade them off for long-term benefit?

For example, in an A/B test, you may observe that a product change improves the click-through rate. However, the test does not provide enough signal to measure a change in long-term retention, leaving you in the dark as to whether this treatment makes users more satisfied with your service. The click-through rate is a proxy metric (S, for surrogate, in our paper) while retention is a downstream business outcome or north star metric (Y). We may even have several proxy metrics, such as other types of clicks or the length of engagement after click. Taken together, these form a vector of proxy metrics.

The goal of our work is to understand the true relationship between the proxy metric(s) and the north star metric — so that we can assess a proxy’s ability to stand in for the north star metric, learn how to combine multiple metrics into a single best one, and better explore and compare different proxies.

Several intuitive approaches to understanding this relationship have surprising pitfalls:

  • Looking only at user-level correlations between the proxy S and north star Y. Continuing the example from above, you may find that users with a higher click-through rate also tend to have a higher retention. But this does not mean that a product change that improves the click-through rate will also improve retention (in fact, promoting clickbait may have the opposite effect). This is because, as any introductory causal inference class will tell you, there are many confounders between S and Y — many of which you can never reliably observe and control for.
  • Looking naively at treatment effect correlations between S and Y. Suppose you are lucky enough to have many historical A/B tests. Further imagine the ordinary least squares (OLS) regression line through a scatter plot of Y on S in which each point represents the (S,Y)-treatment effect from a previous test. Even if you find that this line has a positive slope, you unfortunately cannot conclude that product changes that improve S will also improve Y. The reason for this is correlated measurement error — if S and Y are positively correlated in the population, then treatment arms that happen to have more users with high S will also have more users with high Y.

Between these naive approaches, we find that the second one is the easier trap to fall into. This is because the dangers of the first approach are well-known, whereas covariances between estimated treatment effects can appear misleadingly causal. In reality, these covariances can be severely biased compared to what we actually care about: covariances between true treatment effects. In the extreme — such as when the negative effects of clickbait are substantial but clickiness and retention are highly correlated at the user level — the true relationship between S and Y can be negative even if the OLS slope is positive. Only more data per experiment could diminish this bias — using more experiments as data points will only yield more precise estimates of the badly biased slope. At first glance, this would appear to imperil any hope of using existing experiments to detect the relationship.

This figure shows a hypothetical treatment effect covariance matrix between S and Y (white line; negative correlation), a unit-level sampling covariance matrix creating correlated measurement errors between these metrics (black line; positive correlation), and the covariance matrix of estimated treatment effects which is a weighted combination of the first two (orange line; no correlation).

To overcome this bias, we propose better ways to leverage historical experiments, inspired by techniques from the literature on weak instrumental variables. More specifically, we show that three estimators are consistent for the true proxy/north-star relationship under different constraints (the paper provides more details and should be helpful for practitioners interested in choosing the best estimator for their setting):

  • A Total Covariance (TC) estimator allows us to estimate the OLS slope from a scatter plot of true treatment effects by subtracting the scaled measurement error covariance from the covariance of estimated treatment effects. Under the assumption that the correlated measurement error is the same across experiments (homogeneous covariances), the bias of this estimator is inversely proportional to the total number of units across all experiments, as opposed to the number of members per experiment.
  • Jackknife Instrumental Variables Estimation (JIVE) converges to the same OLS slope as the TC estimator but does not require the assumption of homogeneous covariances. JIVE eliminates correlated measurement error by removing each observation’s data from the computation of its instrumented surrogate values.
  • A Limited Information Maximum Likelihood (LIML) estimator is statistically efficient as long as there are no direct effects between the treatment and Y (that is, S fully mediates all treatment effects on Y). We find that LIML is highly sensitive to this assumption and recommend TC or JIVE for most applications.

Our methods yield linear structural models of treatment effects that are easy to interpret. As such, they are well-suited to the decentralized and rapidly-evolving practice of experimentation at Netflix, which runs thousands of experiments per year on many diverse parts of the business. Each area of experimentation is staffed by independent Data Science and Engineering teams. While every team ultimately cares about the same north star metrics (e.g., long-term revenue), it is highly impractical for most teams to measure these in short-term A/B tests. Therefore, each has also developed proxies that are more sensitive and directly relevant to their work (e.g., user engagement or latency). To complicate matters more, teams are constantly innovating on these secondary metrics to find the right balance of sensitivity and long-term impact.

In this decentralized environment, linear models of treatment effects are a highly useful tool for coordinating efforts around proxy metrics and aligning them towards the north star:

  1. Managing metric tradeoffs. Because experiments in one area can affect metrics in another area, there is a need to measure all secondary metrics in all tests, but also to understand the relative impact of these metrics on the north star. This is so we can inform decision-making when one metric trades off against another metric.
  2. Informing metrics innovation. To minimize wasted effort on metric development, it is also important to understand how metrics correlate with the north star “net of” existing metrics.
  3. Enabling teams to work independently. Lastly, teams need simple tools in order to iterate on their own metrics. Teams may come up with dozens of variations of secondary metrics, and slow, complicated tools for evaluating these variations are unlikely to be adopted. Conversely, our models are easy and fast to fit, and are actively used to develop proxy metrics at Netflix.

We are thrilled about the research and implementation of these methods at Netflix — while also continuing to strive for great and always better, per our culture. For example, we still have some way to go to develop a more flexible data architecture to streamline the application of these methods within Netflix. Interested in helping us? See our open job postings!

For feedback on this blog post and for supporting and making this work better, we thank Apoorva Lal, Martin Tingley, Patric Glynn, Richard McDowell, Travis Brooks, and Ayal Chen-Zion.


Improve Your Next Experiment by Learning Better Proxy Metrics From Past Experiments was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Investigation of a Cross-regional Network Performance Issue

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/investigation-of-a-cross-regional-network-performance-issue-422d6218fdf1

Hechao Li, Roger Cruz

Cloud Networking Topology

Netflix operates a highly efficient cloud computing infrastructure that supports a wide array of applications essential for our SVOD (Subscription Video on Demand), live streaming and gaming services. Utilizing Amazon AWS, our infrastructure is hosted across multiple geographic regions worldwide. This global distribution allows our applications to deliver content more effectively by serving traffic closer to our customers. Like any distributed system, our applications occasionally require data synchronization between regions to maintain seamless service delivery.

The following diagram shows a simplified cloud network topology for cross-region traffic.

The Problem At First Glance

Our Cloud Network Engineering on-call team received a request to address a network issue affecting an application with cross-region traffic. Initially, it appeared that the application was experiencing timeouts, likely due to suboptimal network performance. As we all know, the longer the network path, the more devices the packets traverse, increasing the likelihood of issues. For this incident, the client application is located in an internal subnet in the US region while the server application is located in an external subnet in a European region. Therefore, it is natural to blame the network since packets need to travel long distances through the internet.

As network engineers, our initial reaction when the network is blamed is typically, “No, it can’t be the network,” and our task is to prove it. Given that there were no recent changes to the network infrastructure and no reported AWS issues impacting other applications, the on-call engineer suspected a noisy neighbor issue and sought assistance from the Host Network Engineering team.

Blame the Neighbors

In this context, a noisy neighbor issue occurs when a container shares a host with other network-intensive containers. These noisy neighbors consume excessive network resources, causing other containers on the same host to suffer from degraded network performance. Despite each container having bandwidth limitations, oversubscription can still lead to such issues.

Upon investigating other containers on the same host — most of which were part of the same application — we quickly eliminated the possibility of noisy neighbors. The network throughput for both the problematic container and all others was significantly below the set bandwidth limits. We attempted to resolve the issue by removing these bandwidth limits, allowing the application to utilize as much bandwidth as necessary. However, the problem persisted.

Blame the Network

We observed some TCP packets in the network marked with the RST flag, a flag indicating that a connection should be immediately terminated. Although the frequency of these packets was not alarmingly high, the presence of any RST packets still raised suspicion on the network. To determine whether this was indeed a network-induced issue, we conducted a tcpdump on the client. In the packet capture file, we spotted one TCP stream that was closed after exactly 30 seconds.

SYN at 18:47:06

After the 3-way handshake (SYN,SYN-ACK,ACK), the traffic started flowing normally. Nothing strange until FIN at 18:47:36 (30 seconds later)

The packet capture results clearly indicated that it was the client application that initiated the connection termination by sending a FIN packet. Following this, the server continued to send data; however, since the client had already decided to close the connection, it responded with RST packets to all subsequent data from the server.

To ensure that the client wasn’t closing the connection due to packet loss, we also conducted a packet capture on the server side to verify that all packets sent by the server were received. This task was complicated by the fact that the packets passed through a NAT gateway (NGW), which meant that on the server side, the client’s IP and port appeared as those of the NGW, differing from those seen on the client side. Consequently, to accurately match TCP streams, we needed to identify the TCP stream on the client side, locate the raw TCP sequence number, and then use this number as a filter on the server side to find the corresponding TCP stream.

With packet capture results from both the client and server sides, we confirmed that all packets sent by the server were correctly received before the client sent a FIN.

Now, from the network point of view, the story is clear. The client initiated the connection requesting data from the server. The server kept sending data to the client with no problem. However, at a certain point, despite the server still having data to send, the client chose to terminate the reception of data. This led us to suspect that the issue might be related to the client application itself.

Blame the Application

In order to fully understand the problem, we now need to understand how the application works. As shown in the diagram below, the application runs in the us-east-1 region. It reads data from cross-region servers and writes the data to consumers within the same region. The client runs as containers, whereas the servers are EC2 instances.

Notably, the cross-region read was problematic while the write path was smooth. Most importantly, there is a 30-second application-level timeout for reading the data. The application (client) errors out if it fails to read an initial batch of data from the servers within 30 seconds. When we increased this timeout to 60 seconds, everything worked as expected. This explains why the client initiated a FIN — because it lost patience waiting for the server to transfer data.

Could it be that the server was updated to send data more slowly? Could it be that the client application was updated to receive data more slowly? Could it be that the data volume became too large to be completely sent out within 30 seconds? Sadly, we received negative answers for all 3 questions from the application owner. The server had been operating without changes for over a year, there were no significant updates in the latest rollout of the client, and the data volume had remained consistent.

Blame the Kernel

If both the network and the application weren’t changed recently, then what changed? In fact, we discovered that the issue coincided with a recent Linux kernel upgrade from version 6.5.13 to 6.6.10. To test this hypothesis, we rolled back the kernel upgrade and it did restore normal operation to the application.

Honestly speaking, at that time I didn’t believe it was a kernel bug because I assumed the TCP implementation in the kernel should be solid and stable (Spoiler alert: How wrong was I!). But we were also out of ideas from other angles.

There were about 14k commits between the good and bad kernel versions. Engineers on the team methodically and diligently bisected between the two versions. When the bisecting was narrowed to a couple of commits, a change with “tcp” in its commit message caught our attention. The final bisecting confirmed that this commit was our culprit.

Interestingly, while reviewing the email history related to this commit, we found that another user had reported a Python test failure following the same kernel upgrade. Although their solution was not directly applicable to our situation, it suggested that a simpler test might also reproduce our problem. Using strace, we observed that the application configured the following socket options when communicating with the server:

[pid 1699] setsockopt(917, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0
[pid 1699] setsockopt(917, SOL_SOCKET, SO_KEEPALIVE, [1], 4) = 0
[pid 1699] setsockopt(917, SOL_SOCKET, SO_SNDBUF, [131072], 4) = 0
[pid 1699] setsockopt(917, SOL_SOCKET, SO_RCVBUF, [65536], 4) = 0
[pid 1699] setsockopt(917, SOL_TCP, TCP_NODELAY, [1], 4) = 0

We then developed a minimal client-server C application that transfers a file from the server to the client, with the client configuring the same set of socket options. During testing, we used a 10M file, which represents the volume of data typically transferred within 30 seconds before the client issues a FIN. On the old kernel, this cross-region transfer completed in 22 seconds, whereas on the new kernel, it took 39 seconds to finish.

The Root Cause

With the help of the minimal reproduction setup, we were ultimately able to pinpoint the root cause of the problem. In order to understand the root cause, it’s essential to have a grasp of the TCP receive window.

TCP Receive Window

Simply put, the TCP receive window is how the receiver tells the sender “This is how many bytes you can send me without me ACKing any of them”. Assuming the sender is the server and the receiver is the client, then we have:

The Window Size

Now that we know the TCP receive window size could affect the throughput, the question is, how is the window size calculated? As an application writer, you can’t decide the window size, however, you can decide how much memory you want to use for buffering received data. This is configured using SO_RCVBUF socket option we saw in the strace result above. However, note that the value of this option means how much application data can be queued in the receive buffer. In man 7 socket, there is

SO_RCVBUF

Sets or gets the maximum socket receive buffer in bytes.
The kernel doubles this value (to allow space for
bookkeeping overhead) when it is set using setsockopt(2),
and this doubled value is returned by getsockopt(2). The
default value is set by the
/proc/sys/net/core/rmem_default file, and the maximum
allowed value is set by the /proc/sys/net/core/rmem_max
file. The minimum (doubled) value for this option is 256.

This means, when the user gives a value X, then the kernel stores 2X in the variable sk->sk_rcvbuf. In other words, the kernel assumes that the bookkeeping overhead is as much as the actual data (i.e. 50% of the sk_rcvbuf).

sysctl_tcp_adv_win_scale

However, the assumption above may not be true because the actual overhead really depends on a lot of factors such as Maximum Transmission Unit (MTU). Therefore, the kernel provided this sysctl_tcp_adv_win_scale which you can use to tell the kernel what the actual overhead is. (I believe 99% of people also don’t know how to set this parameter correctly and I’m definitely one of them. You’re the kernel, if you don’t know the overhead, how can you expect me to know?).

According to the sysctl doc,

tcp_adv_win_scale — INTEGER

Obsolete since linux-6.6 Count buffering overhead as bytes/2^tcp_adv_win_scale (if tcp_adv_win_scale > 0) or bytes-bytes/2^(-tcp_adv_win_scale), if it is <= 0.

Possible values are [-31, 31], inclusive.

Default: 1

For 99% of people, we’re just using the default value 1, which in turn means the overhead is calculated by rcvbuf/2^tcp_adv_win_scale = 1/2 * rcvbuf. This matches the assumption when setting the SO_RCVBUF value.

Let’s recap. Assume you set SO_RCVBUF to 65536, which is the value set by the application as shown in the setsockopt syscall. Then we have:

  • SO_RCVBUF = 65536
  • rcvbuf = 2 * 65536 = 131072
  • overhead = rcvbuf / 2 = 131072 / 2 = 65536
  • receive window size = rcvbuf — overhead = 131072–65536 = 65536

(Note, this calculation is simplified. The real calculation is more complex.)

In short, the receive window size before the kernel upgrade was 65536. With this window size, the application was able to transfer 10M data within 30 seconds.

The Change

This commit obsoleted sysctl_tcp_adv_win_scale and introduced a scaling_ratio that can more accurately calculate the overhead or window size, which is the right thing to do. With the change, the window size is now rcvbuf * scaling_ratio.

So how is scaling_ratio calculated? It is calculated using skb->len/skb->truesize where skb->len is the length of the tcp data length in an skb and truesize is the total size of the skb. This is surely a more accurate ratio based on real data rather than a hardcoded 50%. Now, here is the next question: during the TCP handshake before any data is transferred, how do we decide the initial scaling_ratio? The answer is, a magic and conservative ratio was chosen with the value being roughly 0.25.

Now we have:

  • SO_RCVBUF = 65536
  • rcvbuf = 2 * 65536 = 131072
  • receive window size = rcvbuf * 0.25 = 131072 * 0.25 = 32768

In short, the receive window size halved after the kernel upgrade. Hence the throughput was cut in half, causing the data transfer time to double.

Naturally, you may ask, I understand that the initial window size is small, but why doesn’t the window grow when we have a more accurate ratio of the payload later (i.e. skb->len/skb->truesize)? With some debugging, we eventually found out that the scaling_ratio does get updated to a more accurate skb->len/skb->truesize, which in our case is around 0.66. However, another variable, window_clamp, is not updated accordingly. window_clamp is the maximum receive window allowed to be advertised, which is also initialized to 0.25 * rcvbuf using the initial scaling_ratio. As a result, the receive window size is capped at this value and can’t grow bigger.

The Fix

In theory, the fix is to update window_clamp along with scaling_ratio. However, in order to have a simple fix that doesn’t introduce other unexpected behaviors, our final fix was to increase the initial scaling_ratio from 25% to 50%. This will make the receive window size backward compatible with the original default sysctl_tcp_adv_win_scale.

Meanwhile, notice that the problem is not only caused by the changed kernel behavior but also by the fact that the application sets SO_RCVBUF and has a 30-second application-level timeout. In fact, the application is Kafka Connect and both settings are the default configurations (receive.buffer.bytes=64k and request.timeout.ms=30s). We also created a kafka ticket to change receive.buffer.bytes to -1 to allow Linux to auto tune the receive window.

Conclusion

This was a very interesting debugging exercise that covered many layers of Netflix’s stack and infrastructure. While it technically wasn’t the “network” to blame, this time it turned out the culprit was the software components that make up the network (i.e. the TCP implementation in the kernel).

If tackling such technical challenges excites you, consider joining our Cloud Infrastructure Engineering teams. Explore opportunities by visiting Netflix Jobs and searching for Cloud Engineering positions.

Acknowledgments

Special thanks to our stunning colleagues Alok Tiagi, Artem Tkachuk, Ethan Adams, Jorge Rodriguez, Nick Mahilani, Tycho Andersen and Vinay Rayini for investigating and mitigating this issue. We would also like to thank Linux kernel network expert Eric Dumazet for reviewing and applying the patch.


Investigation of a Cross-regional Network Performance Issue was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Java 21 Virtual Threads – Dude, Where’s My Lock?

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/java-21-virtual-threads-dude-wheres-my-lock-3052540e231d

Getting real with virtual threads

By Vadim Filanovsky, Mike Huang, Danny Thomas and Martin Chalupa

Intro

Netflix has an extensive history of using Java as our primary programming language across our vast fleet of microservices. As we pick up newer versions of Java, our JVM Ecosystem team seeks out new language features that can improve the ergonomics and performance of our systems. In a recent article, we detailed how our workloads benefited from switching to generational ZGC as our default garbage collector when we migrated to Java 21. Virtual threads is another feature we are excited to adopt as part of this migration.

For those new to virtual threads, they are described as “lightweight threads that dramatically reduce the effort of writing, maintaining, and observing high-throughput concurrent applications.” Their power comes from their ability to be suspended and resumed automatically via continuations when blocking operations occur, thus freeing the underlying operating system threads to be reused for other operations. Leveraging virtual threads can unlock higher performance when utilized in the appropriate context.

In this article we discuss one of the peculiar cases that we encountered along our path to deploying virtual threads on Java 21.

The problem

Netflix engineers raised several independent reports of intermittent timeouts and hung instances to the Performance Engineering and JVM Ecosystem teams. Upon closer examination, we noticed a set of common traits and symptoms. In all cases, the apps affected ran on Java 21 with SpringBoot 3 and embedded Tomcat serving traffic on REST endpoints. The instances that experienced the issue simply stopped serving traffic even though the JVM on those instances remained up and running. One clear symptom characterizing the onset of this issue is a persistent increase in the number of sockets in closeWait state as illustrated by the graph below:

Collected diagnostics

Sockets remaining in closeWait state indicate that the remote peer closed the socket, but it was never closed on the local instance, presumably because the application failed to do so. This can often indicate that the application is hanging in an abnormal state, in which case application thread dumps may reveal additional insight.

In order to troubleshoot this issue, we first leveraged our alerts system to catch an instance in this state. Since we periodically collect and persist thread dumps for all JVM workloads, we can often retroactively piece together the behavior by examining these thread dumps from an instance. However, we were surprised to find that all our thread dumps show a perfectly idle JVM with no clear activity. Reviewing recent changes revealed that these impacted services enabled virtual threads, and we knew that virtual thread call stacks do not show up in jstack-generated thread dumps. To obtain a more complete thread dump containing the state of the virtual threads, we used the “jcmd Thread.dump_to_file” command instead. As a last-ditch effort to introspect the state of JVM, we also collected a heap dump from the instance.

Analysis

Thread dumps revealed thousands of “blank” virtual threads:

#119821 "" virtual

#119820 "" virtual

#119823 "" virtual

#120847 "" virtual

#119822 "" virtual
...

These are the VTs (virtual threads) for which a thread object is created, but has not started running, and as such, has no stack trace. In fact, there were approximately the same number of blank VTs as the number of sockets in closeWait state. To make sense of what we were seeing, we need to first understand how VTs operate.

A virtual thread is not mapped 1:1 to a dedicated OS-level thread. Rather, we can think of it as a task that is scheduled to a fork-join thread pool. When a virtual thread enters a blocking call, like waiting for a Future, it relinquishes the OS thread it occupies and simply remains in memory until it is ready to resume. In the meantime, the OS thread can be reassigned to execute other VTs in the same fork-join pool. This allows us to multiplex a lot of VTs to just a handful of underlying OS threads. In JVM terminology, the underlying OS thread is referred to as the “carrier thread” to which a virtual thread can be “mounted” while it executes and “unmounted” while it waits. A great in-depth description of virtual thread is available in JEP 444.

In our environment, we utilize a blocking model for Tomcat, which in effect holds a worker thread for the lifespan of a request. By enabling virtual threads, Tomcat switches to virtual execution. Each incoming request creates a new virtual thread that is simply scheduled as a task on a Virtual Thread Executor. We can see Tomcat creates a VirtualThreadExecutor here.

Tying this information back to our problem, the symptoms correspond to a state when Tomcat keeps creating a new web worker VT for each incoming request, but there are no available OS threads to mount them onto.

Why is Tomcat stuck?

What happened to our OS threads and what are they busy with? As described here, a VT will be pinned to the underlying OS thread if it performs a blocking operation while inside a synchronized block or method. This is exactly what is happening here. Here is a relevant snippet from a thread dump obtained from the stuck instance:

#119515 "" virtual
java.base/jdk.internal.misc.Unsafe.park(Native Method)
java.base/java.lang.VirtualThread.parkOnCarrierThread(VirtualThread.java:661)
java.base/java.lang.VirtualThread.park(VirtualThread.java:593)
java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
zipkin2.reporter.internal.CountBoundedQueue.offer(CountBoundedQueue.java:54)
zipkin2.reporter.internal.AsyncReporter$BoundedAsyncReporter.report(AsyncReporter.java:230)
zipkin2.reporter.brave.AsyncZipkinSpanHandler.end(AsyncZipkinSpanHandler.java:214)
brave.internal.handler.NoopAwareSpanHandler$CompositeSpanHandler.end(NoopAwareSpanHandler.java:98)
brave.internal.handler.NoopAwareSpanHandler.end(NoopAwareSpanHandler.java:48)
brave.internal.recorder.PendingSpans.finish(PendingSpans.java:116)
brave.RealSpan.finish(RealSpan.java:134)
brave.RealSpan.finish(RealSpan.java:129)
io.micrometer.tracing.brave.bridge.BraveSpan.end(BraveSpan.java:117)
io.micrometer.tracing.annotation.AbstractMethodInvocationProcessor.after(AbstractMethodInvocationProcessor.java:67)
io.micrometer.tracing.annotation.ImperativeMethodInvocationProcessor.proceedUnderSynchronousSpan(ImperativeMethodInvocationProcessor.java:98)
io.micrometer.tracing.annotation.ImperativeMethodInvocationProcessor.process(ImperativeMethodInvocationProcessor.java:73)
io.micrometer.tracing.annotation.SpanAspect.newSpanMethod(SpanAspect.java:59)
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
java.base/java.lang.reflect.Method.invoke(Method.java:580)
org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:637)
...

In this stack trace, we enter the synchronization in brave.RealSpan.finish(RealSpan.java:134). This virtual thread is effectively pinned — it is mounted to an actual OS thread even while it waits to acquire a reentrant lock. There are 3 VTs in this exact state and another VT identified as “<redacted> @DefaultExecutor – 46542” that also follows the same code path. These 4 virtual threads are pinned while waiting to acquire a lock. Because the app is deployed on an instance with 4 vCPUs, the fork-join pool that underpins VT execution also contains 4 OS threads. Now that we have exhausted all of them, no other virtual thread can make any progress. This explains why Tomcat stopped processing the requests and why the number of sockets in closeWait state keeps climbing. Indeed, Tomcat accepts a connection on a socket, creates a request along with a virtual thread, and passes this request/thread to the executor for processing. However, the newly created VT cannot be scheduled because all of the OS threads in the fork-join pool are pinned and never released. So these newly created VTs are stuck in the queue, while still holding the socket.

Who has the lock?

Now that we know VTs are waiting to acquire a lock, the next question is: Who holds the lock? Answering this question is key to understanding what triggered this condition in the first place. Usually a thread dump indicates who holds the lock with either “- locked <0x…> (at …)” or “Locked ownable synchronizers,” but neither of these show up in our thread dumps. As a matter of fact, no locking/parking/waiting information is included in the jcmd-generated thread dumps. This is a limitation in Java 21 and will be addressed in the future releases. Carefully combing through the thread dump reveals that there are a total of 6 threads contending for the same ReentrantLock and associated Condition. Four of these six threads are detailed in the previous section. Here is another thread:

#119516 "" virtual
java.base/java.lang.VirtualThread.park(VirtualThread.java:582)
java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
zipkin2.reporter.internal.CountBoundedQueue.offer(CountBoundedQueue.java:54)
zipkin2.reporter.internal.AsyncReporter$BoundedAsyncReporter.report(AsyncReporter.java:230)
zipkin2.reporter.brave.AsyncZipkinSpanHandler.end(AsyncZipkinSpanHandler.java:214)
brave.internal.handler.NoopAwareSpanHandler$CompositeSpanHandler.end(NoopAwareSpanHandler.java:98)
brave.internal.handler.NoopAwareSpanHandler.end(NoopAwareSpanHandler.java:48)
brave.internal.recorder.PendingSpans.finish(PendingSpans.java:116)
brave.RealScopedSpan.finish(RealScopedSpan.java:64)
...

Note that while this thread seemingly goes through the same code path for finishing a span, it does not go through a synchronized block. Finally here is the 6th thread:

#107 "AsyncReporter <redacted>"
java.base/jdk.internal.misc.Unsafe.park(Native Method)
java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:221)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1761)
zipkin2.reporter.internal.CountBoundedQueue.drainTo(CountBoundedQueue.java:81)
zipkin2.reporter.internal.AsyncReporter$BoundedAsyncReporter.flush(AsyncReporter.java:241)
zipkin2.reporter.internal.AsyncReporter$Flusher.run(AsyncReporter.java:352)
java.base/java.lang.Thread.run(Thread.java:1583)

This is actually a normal platform thread, not a virtual thread. Paying particular attention to the line numbers in this stack trace, it is peculiar that the thread seems to be blocked within the internal acquire() method after completing the wait. In other words, this calling thread owned the lock upon entering awaitNanos(). We know the lock was explicitly acquired here. However, by the time the wait completed, it could not reacquire the lock. Summarizing our thread dump analysis:

There are 5 virtual threads and 1 regular thread waiting for the lock. Out of those 5 VTs, 4 of them are pinned to the OS threads in the fork-join pool. There’s still no information on who owns the lock. As there’s nothing more we can glean from the thread dump, our next logical step is to peek into the heap dump and introspect the state of the lock.

Inspecting the lock

Finding the lock in the heap dump was relatively straightforward. Using the excellent Eclipse MAT tool, we examined the objects on the stack of the AsyncReporter non-virtual thread to identify the lock object. Reasoning about the current state of the lock was perhaps the trickiest part of our investigation. Most of the relevant code can be found in the AbstractQueuedSynchronizer.java. While we don’t claim to fully understand the inner workings of it, we reverse-engineered enough of it to match against what we see in the heap dump. This diagram illustrates our findings:

First off, the exclusiveOwnerThread field is null (2), signifying that no one owns the lock. We have an “empty” ExclusiveNode (3) at the head of the list (waiter is null and status is cleared) followed by another ExclusiveNode with waiter pointing to one of the virtual threads contending for the lock — #119516 (4). The only place we found that clears the exclusiveOwnerThread field is within the ReentrantLock.Sync.tryRelease() method (source link). There we also set state = 0 matching the state that we see in the heap dump (1).

With this in mind, we traced the code path to release() the lock. After successfully calling tryRelease(), the lock-holding thread attempts to signal the next waiter in the list. At this point, the lock-holding thread is still at the head of the list, even though ownership of the lock is effectively released. The next node in the list points to the thread that is about to acquire the lock.

To understand how this signaling works, let’s look at the lock acquire path in the AbstractQueuedSynchronizer.acquire() method. Grossly oversimplifying, it’s an infinite loop, where threads attempt to acquire the lock and then park if the attempt was unsuccessful:

while(true) {
if (tryAcquire()) {
return; // lock acquired
}
park();
}

When the lock-holding thread releases the lock and signals to unpark the next waiter thread, the unparked thread iterates through this loop again, giving it another opportunity to acquire the lock. Indeed, our thread dump indicates that all of our waiter threads are parked on line 754. Once unparked, the thread that managed to acquire the lock should end up in this code block, effectively resetting the head of the list and clearing the reference to the waiter.

To restate this more concisely, the lock-owning thread is referenced by the head node of the list. Releasing the lock notifies the next node in the list while acquiring the lock resets the head of the list to the current node. This means that what we see in the heap dump reflects the state when one thread has already released the lock but the next thread has yet to acquire it. It’s a weird in-between state that should be transient, but our JVM is stuck here. We know thread #119516 was notified and is about to acquire the lock because of the ExclusiveNode state we identified at the head of the list. However, thread dumps show that thread #119516 continues to wait, just like other threads contending for the same lock. How can we reconcile what we see between the thread and heap dumps?

The lock with no place to run

Knowing that thread #119516 was actually notified, we went back to the thread dump to re-examine the state of the threads. Recall that we have 6 total threads waiting for the lock with 4 of the virtual threads each pinned to an OS thread. These 4 will not yield their OS thread until they acquire the lock and proceed out of the synchronized block. #107 “AsyncReporter <redacted>” is a regular platform thread, so nothing should prevent it from proceeding if it acquires the lock. This leaves us with the last thread: #119516. It is a VT, but it is not pinned to an OS thread. Even if it’s notified to be unparked, it cannot proceed because there are no more OS threads left in the fork-join pool to schedule it onto. That’s exactly what happens here — although #119516 is signaled to unpark itself, it cannot leave the parked state because the fork-join pool is occupied by the 4 other VTs waiting to acquire the same lock. None of those pinned VTs can proceed until they acquire the lock. It’s a variation of the classic deadlock problem, but instead of 2 locks we have one lock and a semaphore with 4 permits as represented by the fork-join pool.

Now that we know exactly what happened, it was easy to come up with a reproducible test case.

Conclusion

Virtual threads are expected to improve performance by reducing overhead related to thread creation and context switching. Despite some sharp edges as of Java 21, virtual threads largely deliver on their promise. In our quest for more performant Java applications, we see further virtual thread adoption as a key towards unlocking that goal. We look forward to Java 23 and beyond, which brings a wealth of upgrades and hopefully addresses the integration between virtual threads and locking primitives.

This exploration highlights just one type of issue that performance engineers solve at Netflix. We hope this glimpse into our problem-solving approach proves valuable to others in their future investigations.


Java 21 Virtual Threads – Dude, Where’s My Lock? was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Maestro: Netflix’s Workflow Orchestrator

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/maestro-netflixs-workflow-orchestrator-ee13a06f9c78

By Jun He, Natallia Dzenisenka, Praneeth Yenugutala, Yingyi Zhang, and Anjali Norwood

TL;DR

We are thrilled to announce that the Maestro source code is now open to the public! Please visit the Maestro GitHub repository to get started. If you find it useful, please give us a star.

What is Maestro

Maestro is a general-purpose, horizontally scalable workflow orchestrator designed to manage large-scale workflows such as data pipelines and machine learning model training pipelines. It oversees the entire lifecycle of a workflow, from start to finish, including retries, queuing, task distribution to compute engines, etc.. Users can package their business logic in various formats such as Docker images, notebooks, bash script, SQL, Python, and more. Unlike traditional workflow orchestrators that only support Directed Acyclic Graphs (DAGs), Maestro supports both acyclic and cyclic workflows and also includes multiple reusable patterns, including foreach loops, subworkflow, and conditional branch, etc.

Our Journey with Maestro

Since we first introduced Maestro in this blog post, we have successfully migrated hundreds of thousands of workflows to it on behalf of users with minimal interruption. The transition was seamless, and Maestro has met our design goals by handling our ever-growing workloads. Over the past year, we’ve seen a remarkable 87.5% increase in executed jobs. Maestro now launches thousands of workflow instances and runs half a million jobs daily on average, and has completed around 2 million jobs on particularly busy days.

Scalability and Versatility

Maestro is a fully managed workflow orchestrator that provides Workflow-as-a-Service to thousands of end users, applications, and services at Netflix. It supports a wide range of workflow use cases, including ETL pipelines, ML workflows, AB test pipelines, pipelines to move data between different storages, etc. Maestro’s horizontal scalability ensures it can manage both a large number of workflows and a large number of jobs within a single workflow.

At Netflix, workflows are intricately connected. Splitting them into smaller groups and managing them across different clusters adds unnecessary complexity and degrades the user experience. This approach also requires additional mechanisms to coordinate these fragmented workflows. Since Netflix’s data tables are housed in a single data warehouse, we believe a single orchestrator should handle all workflows accessing it.

Join us on this exciting journey by exploring the Maestro GitHub repository and contributing to its ongoing development. Your support and feedback are invaluable as we continue to improve the Maestro project.

Introducing Maestro

Netflix Maestro offers a comprehensive set of features designed to meet the diverse needs of both engineers and non-engineers. It includes the common functions and reusable patterns applicable to various use cases in a loosely coupled way.

A workflow definition is defined in a JSON format. Maestro combines user-supplied fields with those managed by Maestro to form a flexible and powerful orchestration definition. An example can be found in the Maestro repository wiki.

A Maestro workflow definition comprises two main sections: properties and versioned workflow including its metadata. Properties include author and owner information, and execution settings. Maestro preserves key properties across workflow versions, such as author and owner information, run strategy, and concurrency settings. This consistency simplifies management and aids in trouble-shootings. If the ownership of the current workflow changes, the new owner can claim the ownership of the workflows without creating a new workflow version. Users can also enable the triggering or alerting features for a given workflow over the properties.

Versioned workflow includes attributes like a unique identifier, name, description, tags, timeout settings, and criticality levels (low, medium, high) for prioritization. Each workflow change creates a new version, enabling tracking and easy reversion, with the active or the latest version used by default. A workflow consists of steps, which are the nodes in the workflow graph defined by users. Steps can represent jobs, another workflow using subworkflow step, or a loop using foreach step. Steps consist of unique identifiers, step types, tags, input and output step parameters, step dependencies, retry policies, and failure mode, step outputs, etc. Maestro supports configurable retry policies based on error types to enhance step resilience.

This high-level overview of Netflix Maestro’s workflow definition and properties highlights its flexibility to define complex workflows. Next, we dive into some of the useful features in the following sections.

Workflow Run Strategy

Users want to automate data pipelines while retaining control over the execution order. This is crucial when workflows cannot run in parallel or must halt current executions when new ones occur. Maestro uses predefined run strategies to decide whether a workflow instance should run or not. Here is the list of predefined run strategies Maestro offers.

Sequential Run Strategy
This is the default strategy used by maestro, which runs workflows one at a time based on a First-In-First-Out (FIFO) order. With this run strategy, Maestro runs workflows in the order they are triggered. Note that an execution does not depend on the previous states. Once a workflow instance reaches one of the terminal states, whether succeeded or not, Maestro will start the next one in the queue.

Strict Sequential Run Strategy
With this run strategy, Maestro will run workflows in the order they are triggered but block execution if there’s a blocking error in the workflow instance history. Newly triggered workflow instances are queued until the error is resolved by manually restarting the failed instances or marking the failed ones unblocked.

An example of strict sequential run strategy

In the above example, run5 fails at 5AM, then later runs are queued but do not run. When someone manually marks run5 unblocked or restarts it, then the workflow execution will resume. This run strategy is useful for time insensitive but business critical workflows. This gives the workflow owners the option to review the failures at a later time and unblock the executions after verifying the correctness.

First-only Run Strategy
With this run strategy, Maestro ensures that the running workflow is complete before queueing a new workflow instance. If a new workflow instance is queued while the current one is still running, Maestro will remove the queued instance. Maestro will execute a new workflow instance only if there is no workflow instance currently running, effectively turning off queuing with this run strategy. This approach helps to avoid idempotency issues by not queuing new workflow instances.

Last-only Run Strategy
With this run strategy, Maestro ensures the running workflow is the latest triggered one and keeps only the last instance. If a new workflow instance is queued while there is an existing workflow instance already running, Maestro will stop the running instance and execute the newly triggered one. This is useful if a workflow is designed to always process the latest data, such as processing the latest snapshot of an entire table each time.

Parallel with Concurrency Limit Run Strategy
With this run strategy, Maestro will run multiple triggered workflow instances in parallel, constrained by a predefined concurrency limit. This helps to fan out and distribute the execution, enabling the processing of large amounts of data within the time limit. A common use case for this strategy is for backfilling the old data.

Parameters and Expression Language Support

In Maestro, parameters play an important role. Maestro supports dynamic parameters with code injection, which is super useful and powerful. This feature significantly enhances the flexibility and dynamism of workflows, allowing using parameters to control execution logic and enable state sharing between workflows and their steps, as well as between upstream and downstream steps. Together with other Maestro features, it makes the defining of workflows dynamic and enables users to define parameterized workflows for complex use cases.

However, code injection introduces significant security and safety concerns. For example, users might unintentionally write an infinite loop that creates an array and appends items to it, eventually crashing the server with out-of-memory (OOM) issues. While one approach could be to ask users to embed the injected code within their business logic instead of the workflow definition, this would impose additional work on users and tightly couple their business logic with the workflow. In certain cases, this approach blocks users to design some complex parameterized workflows.

To mitigate these risks and assist users to build parameterized workflows, we developed our own customized expression language parser, a simple, secure, and safe expression language (SEL). SEL supports code injection while incorporating validations during syntax tree parsing to protect the system. It leverages the Java Security Manager to restrict access, ensuring a secure and controlled environment for code execution.

Simple, Secure, and Safe Expression Language (SEL)
SEL is a homemade simple, secure, and safe expression language (SEL) to address the risks associated with code injection within Maestro parameterized workflows. It is a simple expression language and the grammar and syntax follow JLS (Java Language Specifications). SEL supports a subset of JLS, focusing on Maestro use cases. For example, it supports data types for all Maestro parameter types, raising errors, datetime handling, and many predefined utility methods. SEL also includes additional runtime checks, such as loop iteration limits, array size checks, object memory size limits and so on, to enhance security and reliability. For more details about SEL, please refer to the Maestro GitHub documentation.

Output Parameters
To further enhance parameter support, Maestro allows for callable step execution, which returns output parameters from user execution back to the system. The output data is transmitted to Maestro via its REST API, ensuring that the step runtime does not have direct access to the Maestro database. This approach significantly reduces security concerns.

Parameterized Workflows
Thanks to the powerful parameter support, users can easily create parameterized workflows in addition to static ones. Users enjoy defining parameterized workflows because they are easy to manage and troubleshoot while being powerful enough to solve complex use cases.

  • Static workflows are simple and easy to use but come with limitations. Often, users have to duplicate the same workflow multiple times to accommodate minor changes. Additionally, workflow and jobs cannot share the states without using parameters.
  • On the other hand, completely dynamic workflows can be challenging to manage and support. They are difficult to debug or troubleshoot and hard to be reused by others.
  • Parameterized workflows strike a balance by being initialized step by step at runtime based on user defined parameters. This approach provides great flexibility for users to control the execution at runtime while remaining easy to manage and understand.

As we described in the previous Maestro blog post, parameter support enables the creation of complex parameterized workflows, such as backfill data pipelines.

Workflow Execution Patterns

Maestro provides multiple useful building blocks that allow users to easily define dataflow patterns or other workflow patterns. It provides support for common patterns directly within the Maestro engine. Direct engine support not only enables us to optimize these patterns but also ensures a consistent approach to implementing them. Next, we will talk about the three major building blocks that Maestro provides.

Foreach Support
In Maestro, the foreach pattern is modeled as a dedicated step within the original workflow definition. Each iteration of the foreach loop is internally treated as a separate workflow instance, which scales similarly as any other Maestro workflow based on the step executions (i.e. a sub-graph) defined within the foreach definition block. The execution of sub-graph within a foreach step is delegated to a separate workflow instance. Foreach step then monitors and collects the status of these foreach workflow instances, each managing the execution of a single iteration. For more details, please refer to our previous Maestro blog post.

The foreach pattern is frequently used to repeatedly run the same jobs with different parameters, such as data backfilling or machine learning model tuning. It would be tedious and time consuming to request users to explicitly define each iteration in the workflow definition (potentially hundreds of thousands of iterations). Additionally, users would need to create new workflows if the foreach range changes, further complicating the process.

Conditional Branch Support
The conditional branch feature allows subsequent steps to run only if specific conditions in the upstream step are met. These conditions are defined using the SEL expression language, which is evaluated at runtime. Combined with other building blocks, users can build powerful workflows, e.g. doing some remediation if the audit check step fails and then run the job again.

Subworkflow Support
The subworkflow feature allows a workflow step to run another workflow, enabling the sharing of common functions across multiple workflows. This effectively enables “workflow as a function” and allows users to build a graph of workflows. For example, we have observed complex workflows consisting of hundreds of subworkflows to process data across hundreds tables, where subworkflows are provided by multiple teams.

These patterns can be combined together to build composite patterns for complex workflow use cases. For instance, we can loop over a set of subworkflows or run nested foreach loops. One example that Maestro users developed is an auto-recovery workflow that utilizes both conditional branch and subworkflow features to handle errors and retry jobs automatically.

An example of auto-recovery ETL workflows

In this example, subworkflow `job1` runs another workflow consisting of extract-transform-load (ETL) and audit jobs. Next, a status check job leverages the Maestro parameter and SEL support to retrieve the status of the previous job. Based on this status, it can decide whether to complete the workflow or to run a recovery job to address any data issues. After resolving the issue, it then executes subworkflow `job2`, which runs the same workflow as subworkflow `job1`.

Step Runtime and Step Parameter

Step Runtime Interface
In Maestro, we use step runtime to describe a job at execution time. The step runtime interface defines two pieces of information:

  1. A set of basic APIs to control the behavior of a step instance at execution runtime.
  2. Some simple data structures to track step runtime state and execution result.

Maestro offers a few step runtime implementations such as foreach step runtime, subworkflow step runtime (mentioned in previous section). Each implementation defines its own logic for start, execute and terminate operations. At runtime, these operations control the way to initialize a step instance, perform the business logic and terminate the execution under certain conditions (i.e. manual intervention by users).

Also, Maestro step runtime internally keeps track of runtime state as well as the execution result of the step. The runtime state is used to determine the next state transition of the step and tell if it has failed or terminated. The execution result hosts both step artifacts and the timeline of step execution history, which are accessible by subsequent steps.

Step Parameter Merging
To control step behavior in a dynamic way, Maestro supports both runtime parameters and tags injection in step runtime. This makes a Maestro step more flexible to absorb runtime changes (i.e. overridden parameters) before actually being started. Maestro internally maintains a step parameter map that is initially empty and is updated by merging step parameters in the order below:

  • Default General Parameters: Parameters merging starts from default parameters that in general every step should have. For example, workflow_instance_id, step_instance_uuid, step_attempt_id and step_id are required parameters for each maestro step. They are internally reserved by maestro and cannot be passed by users.
  • Injected Parameters: Maestro then merges injected parameters (if present) into the parameter map. The injected parameters come from step runtime, which are dynamically generated based on step schema. Each type of step can have its own schema with specific parameters associated with this step. The step schema can evolve independently with no need to update Maestro code.
  • Default Typed Parameters: After injecting runtime parameters, Maestro tries to merge default parameters that are related to a specific type of step. For example, foreach step has loop_params and loop_index default parameters which are internally set by maestro and used for foreach step only.
  • Workflow and Step Info Parameters: These parameters contain information about step and the workflow it belongs to. This can be identity information, i.e. workflow_id and will be merged to step parameter map if present.
  • Undefined New Parameters: When starting or restarting a maestro workflow instance, users can specify new step parameters that are not present in initial step definition. ParamsManager merges these parameters to ensure they are available at execution time.
  • Step Definition Parameters: These step parameters are defined by users at definition time and get merged if they are not empty.
  • Run and Restart Parameters: When starting or restarting a maestro workflow instance, users can override defined parameters by providing run or restart parameters. These two types of parameters are merged at the end so that step runtime can see the most recent and accurate parameter space.

The parameters merging logic can be visualized in the diagram below.

Diagram of the parameters merging logic

Step Dependencies and Signals

Steps in the Maestro execution workflow graph can express execution dependencies using step dependencies. A step dependency specifies the data-related conditions required by a step to start execution. These conditions are usually defined based on signals, which are pieces of messages carrying information such as parameter values and can be published through step outputs or external systems like SNS or Kafka messages.

Signals in Maestro serve both signal trigger pattern and signal dependencies (a publisher-subscriber) pattern. One step can publish an output signal (a sample example) that can unblock the execution of multiple other steps that depend on it. A signal definition includes a list of mapped parameters, allowing Maestro to perform “signal matching” on a subset of fields. Additionally, Maestro supports signal operators like <, >, etc., on signal parameter values.

Netflix has built various abstractions on top of the concept of signals. For instance, a ETL workflow can update a table with data and send signals that unblock steps in downstream workflows dependent on that data. Maestro supports “signal lineage,” which allows users to navigate all historical instances of signals and the workflow steps that match (i.e. publishing or consuming) those signals. Signal triggering guarantees exactly-once execution for the workflow subscribing a signal or a set of joined signals. This approach is efficient, as it conserves resources by only executing the workflow or step when the specified conditions in the signals are met. A signal service is implemented for those advanced abstractions. Please refer to the Maestro blog for further details on it.

Breakpoint

Maestro allows users to set breakpoints on workflow steps, functioning similarly to code-level breakpoints in an IDE. When a workflow instance executes and reaches a step with a breakpoint, that step enters a “paused” state. This halts the workflow graph’s progression until a user manually resumes from the breakpoint. If multiple instances of a workflow step are paused at a breakpoint, resuming one instance will only affect that specific instance, leaving the others in a paused state. Deleting the breakpoint will cause all paused step instances to resume.

This feature is particularly useful during the initial development of a workflow, allowing users to inspect step executions and output data. It is also beneficial when running a step multiple times in a “foreach” pattern with various input parameters. Setting a single breakpoint on a step will cause all iterations of the foreach loop to pause at that step for debugging purposes. Additionally, the breakpoint feature allows human intervention during the workflow execution and can also be used for other purposes, e.g. supporting mutating step states while the workflow is running.

Timeline

Maestro includes a step execution timeline, capturing all significant events such as execution state machine changes and the reasoning behind them. This feature is useful for debugging, providing insights into the status of a step. For example, it logs transitions such as “Created” and “Evaluating params”, etc. An example of a timeline is included here for reference. The implemented step runtimes can add the timeline events into the timeline to surface the execution information to the end users.

Retry Policies

Maestro supports retry policies for steps that reach a terminal state due to failure. Users can specify the number of retries and configure retry policies, including delays between retries and exponential backoff strategies, in addition to fixed interval retries. Maestro distinguishes between two types of retries: “platform” and “user.” Platform retries address platform-level errors unrelated to user logic, while user retries are for user-defined conditions. Each type can have its own set of retry policies.

Automatic retries are beneficial for handling transient errors that can be resolved without user intervention. Maestro provides the flexibility to set retries to zero for non-idempotent steps to avoid retry. This feature ensures that users have control over how retries are managed based on their specific requirements.

Aggregated View

Because a workflow instance can have multiple runs, it is important for users to see an aggregated state of all steps in the workflow instance. Aggregated view is computed by merging base aggregated view with current runs instance step statuses. For example, as you can see on the figure below simulating a simple case, there is a first run, where step1 and step2 succeeded, step3 failed, and step4 and step5 have not started. When the user restarts the run, the run starts from step3 in run 2 with step1 and step2 skipped which succeeded in the previous run. After all steps succeed, the aggregated view shows the run states for all steps.

An example of aggregated views

Rollup

Rollup provides a high-level summary of a workflow instance, detailing the status of each step and the count of steps in each status. It flattens steps across the current instance and any nested non-inline workflows like subworkflows or foreach steps. For instance, if a successful workflow has three steps, one of which is a subworkflow corresponding to a five-step workflow, the rollup will indicate that seven steps succeeded. Only leaf steps are counted in the rollup, as other steps serve merely as pointers to concrete workflows.

Rollup also retains references to any non-successful steps, offering a clear overview of step statuses and facilitating easy navigation to problematic steps, even within nested workflows. The aggregated rollup for a workflow instance is calculated by combining the current run’s runtime data with a base rollup. The current state is derived from the statuses of active steps, including aggregated rollups for foreach and subworkflow steps. The base rollup is established when the workflow instance begins and includes statuses of inline steps (excluding foreach and subworkflows) from the previous run that are not part of the current run.

For subworkflow steps, the rollup simply reflects the rollup of the subworkflow instance. For foreach steps, the rollup combines the base rollup of the foreach step with the current state rollup. The base is derived from the previous run’s aggregated rollup, excluding the iterations to be restarted in the new run. The current state is periodically updated by aggregating rollups of running iterations until all iterations reach a terminal state.

Due to these processes, the rollup model is eventually consistent. While the figure below illustrates a straightforward example of rollup, the calculations can become complex and recursive, especially with multiple levels of nested foreaches and subworkflows.

An example of the rollup model

Maestro Event Publishing

When workflow definition, workflow instance or step instance is changed, Maestro generates an event, processes it internally and publishes the processed event to external system(s). Maestro has both internal and external events. The internal event tracks changes within the life cycle of workflow, workflow instance or step instance. It is published to an internal queue and processed within Maestro. After internal events are processed, some of them will be transformed into external event and sent out to the external queue (i.e. SNS, Kafka). The external event carries maestro status change information for downstream services. The event publishing flow is illustrated in the diagram below:

A diagram of the event publishing flow

As shown in the diagram, the Maestro event processor bridges the two aforementioned Maestro events. It listens on the internal queue to get the published internal events. Within the processor, the internal job event is processed based on its type and gets converted to an external event if needed. The notification publisher at the end emits the external event so that downstream services can consume.

The downstream services are mostly event-driven. The Maestro event carries the most useful message for downstream services to capture different changes in Maestro. In general, these changes can be classified into two categories: workflow change and instance status change. The workflow change event is associated with actions at workflow level, i.e definition or properties of a workflow has changed. Meanwhile, instance status change tracks status transition on workflow instance or step instance.

Get Started with Maestro

Maestro has been extensively used within Netflix, and today, we are excited to make the Maestro source code publicly available. We hope that the scalability and usability that Maestro offers can expedite workflow development outside Netflix. We invite you to try Maestro, use it within your organization, and contribute to its development.

You can find the Maestro code repository at github.com/Netflix/maestro. If you have any questions, thoughts, or comments about Maestro, please feel free to create a GitHub issue in the Maestro repository. We are eager to hear from you.

We are taking workflow orchestration to the next level and constantly solving new problems and challenges, please stay tuned for updates. If you are passionate about solving large scale orchestration problems, please join us.

Acknowledgements

Thanks to other Maestro team members, Binbing Hou, Zhuoran Dong, Brittany Truong, Deepak Ramalingam, Moctar Ba, for their contributions to the Maestro project. Thanks to our Product Manager Ashim Pokharel for driving the strategy and requirements. We’d also like to thank Andrew Seier, Romain Cledat, Olek Gorajek, and other stunning colleagues at Netflix for their contributions to the Maestro project. We also thank Prashanth Ramdas, Eva Tse, David Noor, Charles Smith and other leaders of Netflix engineering organizations for their constructive feedback and suggestions on the Maestro project.


Maestro: Netflix’s Workflow Orchestrator was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Enhancing Netflix Reliability with Service-Level Prioritized Load Shedding

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/enhancing-netflix-reliability-with-service-level-prioritized-load-shedding-e735e6ce8f7d

Applying Quality of Service techniques at the application level

Anirudh Mendiratta, Kevin Wang, Joey Lynch, Javier Fernandez-Ivern, Benjamin Fedorka

Introduction

In November 2020, we introduced the concept of prioritized load shedding at the API gateway level in our blog post, Keeping Netflix Reliable Using Prioritized Load Shedding. Today, we’re excited to dive deeper into how we’ve extended this strategy to the individual service level, focusing on the video streaming control plane and data plane, to further enhance user experience and system resilience.

The Evolution of Load Shedding at Netflix

At Netflix, ensuring a seamless viewing experience for millions of users simultaneously is paramount. Our initial approach for prioritized load shedding was implemented at the Zuul API gateway layer. This system effectively manages different types of network traffic, ensuring that critical playback requests receive priority over less critical telemetry traffic.

Building on this foundation, we recognized the need to apply a similar prioritization logic deeper within our architecture, specifically at the service layer where different types of requests within the same service could be prioritized differently. The advantages of applying these techniques at the service level in addition to our edge API gateway are:

  1. Service teams can own their prioritization logic and can apply finer grained prioritization.
  2. This can be used for backend to backend communication, i.e. for services not sitting behind our edge API gateway.
  3. Services can use cloud capacity more efficiently by combining different request types into one cluster and shedding low priority requests when necessary instead of maintaining separate clusters for failure isolation.

Introducing Service-Level Prioritized Load Shedding

PlayAPI is a critical backend service on the video streaming control plane, responsible for handling device initiated manifest and license requests necessary to start playback. We categorize these requests into two types based on their criticality:

  1. User-Initiated Requests (critical): These requests are made when a user hits play and directly impact the user’s ability to start watching a show or a movie.
  2. Pre-fetch Requests (non-critical): These requests are made optimistically when a user browses content without the user hitting play, to reduce latency should the user decide to watch a particular title. A failure in only pre-fetch requests does not result in a playback failure, but slightly increases the latency between pressing play and video appearing on screen.
Netflix on Chrome making pre-fetch requests to PlayAPI while the user is browsing content

The Problem

In order to handle large traffic spikes, high backend latency, or an under-scaled backend service, PlayAPI previously used a concurrency limiter to throttle requests that would reduce the availability of both user-initiated and prefetch requests equally. This was not ideal because:

  1. Spikes in pre-fetch traffic reduced availability for user-initiated requests
  2. Increased backend latency reduced availability for user-initiated requests and pre-fetch requests equally, when the system had enough capacity to serve all user-initiated requests.

Sharding the critical and non-critical requests into separate clusters was an option, which addressed problem 1 and added failure isolation between the two types of requests, however it came with a higher compute cost. Another disadvantage of sharding is that it adds some operational overhead — engineers need to make sure CI/CD, auto-scaling, metrics, and alerts are enabled for the new cluster.

Option 1 — No isolation
Option 2 — Isolation but higher compute cost

Our Solution

We implemented a concurrency limiter within PlayAPI that prioritizes user-initiated requests over prefetch requests without physically sharding the two request handlers. This mechanism uses the partitioning functionality of the open source Netflix/concurrency-limits Java library. We create two partitions in our limiter:

  • User-Initiated Partition: Guaranteed 100% throughput.
  • Pre-fetch Partition: Utilizes only excess capacity.
Option 3 — Single cluster with prioritized load-shedding offers application-level isolation with lower compute cost. Each instance serves both types of requests and has a partition whose size adjusts dynamically to ensure that pre-fetch requests only get excess capacity. This allows user-initiated requests to “steal” pre-fetch capacity when necessary.

The partitioned limiter is configured as a pre-processing Servlet Filter that uses HTTP headers sent by devices to determine a request’s criticality, thus avoiding the need to read and parse the request body for rejected requests. This ensures that the limiter is not itself a bottleneck and can effectively reject requests while using minimal CPU. As an example, the filter can be initialized as

Filter filter = new ConcurrencyLimitServletFilter(
new ServletLimiterBuilder()
.named("playapi")
.partitionByHeader("X-Netflix.Request-Name")
.partition("user-initiated", 1.0)
.partition("pre-fetch", 0.0)
.build());

Note that in steady state, there is no throttling and the prioritization has no effect on the handling of pre-fetch requests. The prioritization mechanism only kicks in when a server is at the concurrency limit and needs to reject requests.

Testing

In order to validate that our load-shedding worked as intended, we used Failure Injection Testing to inject 2 second latency in pre-fetch calls, where the typical p99 latency for these calls is < 200 ms. The failure was injected on one baseline instance with regular load shedding and one canary instance with prioritized load shedding. Some internal services that PlayAPI calls use separate clusters for user-initiated and pre-fetch requests and run pre-fetch clusters hotter. This test case simulates a scenario where a pre-fetch cluster for a downstream service is experiencing high latency.

Baseline — Without prioritized load-shedding. Both pre-fetch and user-initiated see an equal drop in availability
Canary — With prioritized load-shedding. Only pre-fetch availability drops while user-initiated availability stays at 100%

Without prioritized load-shedding, both user-initiated and prefetch availability drop when latency is injected. However, after adding prioritized load-shedding, user-initiated requests maintain a 100% availability and only prefetch requests are throttled.

We were ready to roll this out to production and see how it performed in the wild!

Real-World Application and Results

Netflix engineers work hard to keep our systems available, and it was a while before we had a production incident that tested the efficacy of our solution. A few months after deploying prioritized load shedding, we had an infrastructure outage at Netflix that impacted streaming for many of our users. Once the outage was fixed, we got a 12x spike in pre-fetch requests per second from Android devices, presumably because there was a backlog of queued requests built up.

Spike in Android pre-fetch RPS

This could have resulted in a second outage as our systems weren’t scaled to handle this traffic spike. Did prioritized load-shedding in PlayAPI help us here?

Yes! While the availability for prefetch requests dropped as low as 20%, the availability for user-initiated requests was > 99.4% due to prioritized load-shedding.

Availability of pre-fetch and user-initiated requests

At one point we were throttling more than 50% of all requests but the availability of user-initiated requests continued to be > 99.4%.

Generic service work prioritization

Based on the success of this approach, we have created an internal library to enable services to perform prioritized load shedding based on pluggable utilization measures, with multiple priority levels.

Unlike API gateway, which needs to handle a large volume of requests with varying priorities, most microservices typically receive requests with only a few distinct priorities. To maintain consistency across different services, we have introduced four predefined priority buckets inspired by the Linux tc-prio levels:

  • CRITICAL: Affect core functionality — These will never be shed if we are not in complete failure.
  • DEGRADED: Affect user experience — These will be progressively shed as the load increases.
  • BEST_EFFORT: Do not affect the user — These will be responded to in a best effort fashion and may be shed progressively in normal operation.
  • BULK: Background work, expect these to be routinely shed.

Services can either choose the upstream client’s priority or map incoming requests to one of these priority buckets by examining various request attributes, such as HTTP headers or the request body, for more precise control. Here is an example of how services can map requests to priority buckets:

ResourceLimiterRequestPriorityProvider requestPriorityProvider() {
return contextProvider -> {
if (contextProvider.getRequest().isCritical()) {
return PriorityBucket.CRITICAL;
} else if (contextProvider.getRequest().isHighPriority()) {
return PriorityBucket.DEGRADED;
} else if (contextProvider.getRequest().isMediumPriority()) {
return PriorityBucket.BEST_EFFORT;
} else {
return PriorityBucket.BULK;
}
};
}

Generic CPU based load-shedding

Most services at Netflix autoscale on CPU utilization, so it is a natural measure of system load to tie into the prioritized load shedding framework. Once a request is mapped to a priority bucket, services can determine when to shed traffic from a particular bucket based on CPU utilization. In order to maintain the signal to autoscaling that scaling is needed, prioritized shedding only starts shedding load after hitting the target CPU utilization, and as system load increases, more critical traffic is progressively shed in an attempt to maintain user experience.

For example, if a cluster targets a 60% CPU utilization for auto-scaling, it can be configured to start shedding requests when the CPU utilization exceeds this threshold. When a traffic spike causes the cluster’s CPU utilization to significantly surpass this threshold, it will gradually shed low-priority traffic to conserve resources for high-priority traffic. This approach also allows more time for auto-scaling to add additional instances to the cluster. Once more instances are added, CPU utilization will decrease, and low-priority traffic will resume being served normally.

Percentage of requests (Y-axis) being load-shed based on CPU utilization (X-axis) for different priority buckets

Experiments with CPU based load-shedding

We ran a series of experiments sending a large request volume at a service which normally targets 45% CPU for auto scaling but which was prevented from scaling up for the purpose of monitoring CPU load shedding under extreme load conditions. The instances were configured to shed noncritical traffic after 60% CPU and critical traffic after 80%.

As RPS was dialed up past 6x the autoscale volume, the service was able to shed first noncritical and then critical requests. Latency remained within reasonable limits throughout, and successful RPS throughput remained stable.

Experimental behavior of CPU based load-shedding using synthetic traffic.
P99 latency stayed within a reasonable range throughout the experiment, even as RPS surpassed 6x the autoscale target.

Anti-patterns with load-shedding

Anti-pattern 1 — No shedding

In the above graphs, the limiter does a good job keeping latency low for the successful requests. If there was no shedding here, we’d see latency increase for all requests, instead of a fast failure in some requests that can be retried. Further, this can result in a death spiral where one instance becomes unhealthy, resulting in more load on other instances, resulting in all instances becoming unhealthy before auto-scaling can kick in.

No load-shedding: In the absence of load-shedding, increased latency can degrade all requests instead of rejecting some requests (that can be retried), and can make instances unhealthy

Anti-pattern 2 — Congestive failure

Another anti-pattern to watch out for is congestive failure or shedding too aggressively. If the load-shedding is due to an increase in traffic, the successful RPS should not drop after load-shedding. Here is an example of what congestive failure looks like:

Congestive failure: After 16:57, the service starts rejecting most requests and is not able to sustain a successful 240 RPS that it was before load-shedding kicked in. This can be seen in fixed concurrency limiters or when load-shedding consumes too much CPU preventing any other work from being done

We can see in the Experiments with CPU based load-shedding section above that our load-shedding implementation avoids both these anti-patterns by keeping latency low and sustaining as much successful RPS during load-shedding as before.

Generic IO based load-shedding

Some services are not CPU-bound but instead are IO-bound by backing services or datastores that can apply back pressure via increased latency when they are overloaded either in compute or in storage capacity. For these services we re-use the prioritized load shedding techniques, but we introduce new utilization measures to feed into the shedding logic. Our initial implementation supports two forms of latency based shedding in addition to standard adaptive concurrency limiters (themselves a measure of average latency):

  1. The service can specify per-endpoint target and maximum latencies, which allow the service to shed when the service is abnormally slow regardless of backend.
  2. The Netflix storage services running on the Data Gateway return observed storage target and max latency SLO utilization, allowing services to shed when they overload their allocated storage capacity.

These utilization measures provide early warning signs that a service is generating too much load to a backend, and allow it to shed low priority work before it overwhelms that backend. The main advantage of these techniques over concurrency limits alone is they require less tuning as our services already must maintain tight latency service-level-objectives (SLOs), for example a p50 < 10ms and p100 < 500ms. So, rephrasing these existing SLOs as utilizations allows us to shed low priority work early to prevent further latency impact to high priority work. At the same time, the system will accept as much work as it can while maintaining SLO’s.

To create these utilization measures, we count how many requests are processed slower than our target and maximum latency objectives, and emit the percentage of requests failing to meet those latency goals. For example, our KeyValue storage service offers a 10ms target with 500ms max latency for each namespace, and all clients receive utilization measures per data namespace to feed into their prioritized load shedding. These measures look like:

utilization(namespace) = {
overall = 12
latency = {
slo_target = 12,
slo_max = 0
}
system = {
storage = 17,
compute = 10,
}
}

In this case, 12% of requests are slower than the 10ms target, 0% are slower than the 500ms max latency (timeout), and 17% of allocated storage is utilized. Different use cases consult different utilizations in their prioritized shedding, for example batches that write data daily may get shed when system storage utilization is approaching capacity as writing more data would create further instability.

An example where the latency utilization is useful is for one of our critical file origin services which accepts writes of new files in the AWS cloud and acts as an origin (serves reads) for those files to our Open Connect CDN infrastructure. Writes are the most critical and should never be shed by the service, but when the backing datastore is getting overloaded, it is reasonable to progressively shed reads to files which are less critical to the CDN as it can retry those reads and they do not affect the product experience.

To achieve this goal, the origin service configured a KeyValue latency based limiter that starts shedding reads to files which are less critical to the CDN when the datastore reports a target latency utilization exceeding 40%. We then stress tested the system by generating over 50Gbps of read traffic, some of it to high priority files and some of it to low priority files:

In this test, there are a nominal number of critical writes and a high number of reads to both low and high priority files. In the top-left graph we ramp to 2000 read/second of ~4MiB files until we can trigger overload of the backend store at over 50Gbps in the top-center graph. When that happens, the top-right graph shows that even under significant load, the origin only sheds low priority read work to preserve high-priority writes and reads. Before this change when we hit breaking points, critical writes and reads would fail along with low priority reads. During this test the CPU load of the file serving service was nominal (<10%), so in this case only IO based limiters are able to protect the system. It is also important to note that the origin will serve more traffic as long as the backing datastore continues accepting it with low latency, preventing the problems we had with concurrency limits in the past where they would either shed too early when nothing was actually wrong or too late when we had entered congestive failure.

Conclusion and Future Directions

The implementation of service-level prioritized load shedding has proven to be a significant step forward in maintaining high availability and excellent user experience for Netflix customers, even during unexpected system stress.

Stay tuned for more updates as we innovate to keep your favorite shows streaming smoothly, no matter what SLO busters lie in wait.

Acknowledgements

We would like to acknowledge the many members of the Netflix consumer product, platform, and open connect teams who have designed, implemented, and tested these prioritization techniques. In particular: Xiaomei Liu, Raj Ummadisetty, Shyam Gala, Justin Guerra, William Schor, Tony Ghita et al.


Enhancing Netflix Reliability with Service-Level Prioritized Load Shedding was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

A Recap of the Data Engineering Open Forum at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/a-recap-of-the-data-engineering-open-forum-at-netflix-6b4d4410b88f

A summary of sessions at the first Data Engineering Open Forum at Netflix on April 18th, 2024

The Data Engineering Open Forum at Netflix on April 18th, 2024.

At Netflix, we aspire to entertain the world, and our data engineering teams play a crucial role in this mission by enabling data-driven decision-making at scale. Netflix is not the only place where data engineers are solving challenging problems with creative solutions. On April 18th, 2024, we hosted the inaugural Data Engineering Open Forum at our Los Gatos office, bringing together data engineers from various industries to share, learn, and connect.

At the conference, our speakers share their unique perspectives on modern developments, immediate challenges, and future prospects of data engineering. We are excited to share the recordings of talks from the conference with the rest of the world.

Opening Remarks

Recording

Speaker: Max Schmeiser (Vice President of Studio and Content Data Science & Engineering)

Summary: Max Schmeiser extends a warm welcome to all attendees, marking the beginning of our inaugural Data Engineering Open Forum.

Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data Platform

Recording

Speakers:

Summary: At Netflix, hundreds of thousands of workflows and millions of jobs are running every day on our big data platform, but diagnosing and remediating job failures can impose considerable operational burdens. To handle errors efficiently, Netflix developed a rule-based classifier for error classification called “Pensive.” However, as the system has increased in scale and complexity, Pensive has been facing challenges due to its limited support for operational automation, especially for handling memory configuration errors and unclassified errors. To address these challenges, we have developed a new feature called “Auto Remediation,” which integrates the rules-based classifier with an ML service.

Automating the Data Architect: Generative AI for Enterprise Data Modeling

Recording

Speaker: Jide Ogunjobi (Founder & CTO at Context Data)

Summary: As organizations accumulate ever-larger stores of data across disparate systems, efficiently querying and gaining insights from enterprise data remain ongoing challenges. To address this, we propose developing an intelligent agent that can automatically discover, map, and query all data within an enterprise. This “Enterprise Data Model/Architect Agent” employs generative AI techniques for autonomous enterprise data modeling and architecture.

Tulika Bhatt, Senior Data Engineer at Netflix, shared how her team manages impression data at scale.

Real-Time Delivery of Impressions at Scale

Recording

Speaker: Tulika Bhatt (Senior Data Engineer at Netflix)

Summary: Netflix generates approximately 18 billion impressions daily. These impressions significantly influence a viewer’s browsing experience, as they are essential for powering video ranker algorithms and computing adaptive pages, With the evolution of user interfaces to be more responsive to in-session interactions, coupled with the growing demand for real-time adaptive recommendations, it has become highly imperative that these impressions are provided on a near real-time basis. This talk will delve into the creative solutions Netflix deploys to manage this high-volume, real-time data requirement while balancing scalability and cost.

Reflections on Building a Data Platform From the Ground Up in a Post-GDPR World

Recording

Speaker: Jessica Larson (Data Engineer & Author of “Snowflake Access Control”)

Summary: The requirements for creating a new data warehouse in the post-GDPR world are significantly different from those of the pre-GDPR world, such as the need to prioritize sensitive data protection and regulatory compliance over performance and cost. In this talk, Jessica Larson shares her takeaways from building a new data platform post-GDPR.

Unbundling the Data Warehouse: The Case for Independent Storage

Recording

Speaker: Jason Reid (Co-founder & Head of Product at Tabular)

Summary: Unbundling a data warehouse means splitting it into constituent and modular components that interact via open standard interfaces. In this talk, Jason Reid discusses the pros and cons of both data warehouse bundling and unbundling in terms of performance, governance, and flexibility, and he examines how the trend of data warehouse unbundling will impact the data engineering landscape in the next 5 years.

Clark Wright, Staff Analytics Engineer at Airbnb, talked about the concept of Data Quality Score at Airbnb.

Data Quality Score: How We Evolved the Data Quality Strategy at Airbnb

Recording

Speaker: Clark Wright (Staff Analytics Engineer at Airbnb)

Summary: Recently, Airbnb published a post to their Tech Blog called Data Quality Score: The next chapter of data quality at Airbnb. In this talk, Clark Wright shares the narrative of how data practitioners at Airbnb recognized the need for higher-quality data and then proposed, conceptualized, and launched Airbnb’s first Data Quality Score.

Data Productivity at Scale

Recording

Speaker: Iaroslav Zeigerman (Co-Founder and Chief Architect at Tobiko Data)

Summary: The development and evolution of data pipelines are hindered by outdated tooling compared to software development. Creating new development environments is cumbersome: Populating them with data is compute-intensive, and the deployment process is error-prone, leading to higher costs, slower iteration, and unreliable data. SQLMesh, an open-source project born from our collective experience at companies like Airbnb, Apple, Google, and Netflix, is designed to handle the complexities of evolving data pipelines at an internet scale. In this talk, Iaroslav Zeigerman discusses challenges faced by data practitioners today and how core SQLMesh concepts solve them.

Last but not least, thank you to the organizers of the Data Engineering Open Forum: Chris Colburn, Xinran Waibel, Jai Balani, Rashmi Shamprasad, and Patricia Ho.

Until next time!

If you are interested in attending a future Data Engineering Open Forum, we highly recommend you join our Google Group to stay tuned to event announcements.


A Recap of the Data Engineering Open Forum at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Video annotator: building video classifiers using vision-language models and active learning

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/video-annotator-building-video-classifiers-using-vision-language-models-and-active-learning-8ebdda0b2db4

Video annotator: a framework for efficiently building video classifiers using vision-language models and active learning

Amir Ziai, Aneesh Vartakavi, Kelli Griggs, Eugene Lok, Yvonne Jukes, Alex Alonso, Vi Iyengar, Anna Pulido

Introduction

Problem

High-quality and consistent annotations are fundamental to the successful development of robust machine learning models. Conventional techniques for training machine learning classifiers are resource intensive. They involve a cycle where domain experts annotate a dataset, which is then transferred to data scientists to train models, review outcomes, and make changes. This labeling process tends to be time-consuming and inefficient, sometimes halting after a few annotation cycles.

Implications

Consequently, less effort is invested in annotating high-quality datasets compared to iterating on complex models and algorithmic methods to improve performance and fix edge cases. As a result, ML systems grow rapidly in complexity.

Furthermore, constraints on time and resources often result in leveraging third-party annotators rather than domain experts. These annotators perform the labeling task without a deep understanding of the model’s intended deployment or usage, often making consistent labeling of borderline or hard examples, especially in more subjective tasks, a challenge.

This necessitates multiple review rounds with domain experts, leading to unexpected costs and delays. This lengthy cycle can also result in model drift, as it takes longer to fix edge cases and deploy new models, potentially hurting usefulness and stakeholder trust.

Solution

We suggest that more direct involvement of domain experts, using a human-in-the-loop system, can resolve many of these practical challenges. We introduce a novel framework, Video Annotator (VA), which leverages active learning techniques and zero-shot capabilities of large vision-language models to guide users to focus their efforts on progressively harder examples, enhancing the model’s sample efficiency and keeping costs low.

VA seamlessly integrates model building into the data annotation process, facilitating user validation of the model before deployment, therefore helping with building trust and fostering a sense of ownership. VA also supports a continuous annotation process, allowing users to rapidly deploy models, monitor their quality in production, and swiftly fix any edge cases by annotating a few more examples and deploying a new model version.

This self-service architecture empowers users to make improvements without active involvement of data scientists or third-party annotators, allowing for fast iteration.

Video understanding

We design VA to assist in granular video understanding which requires the identification of visuals, concepts, and events within video segments. Video understanding is fundamental for numerous applications such as search and discovery, personalization, and the creation of promotional assets. Our framework allows users to efficiently train machine learning models for video understanding by developing an extensible set of binary video classifiers, which power scalable scoring and retrieval of a vast catalog of content.

Video classification

Video classification is the task of assigning a label to an arbitrary-length video clip, often accompanied by a probability or prediction score, as illustrated in Fig 1.

Fig 1- Functional view of a binary video classifier. A few-second clip from ”Operation Varsity Blues: The College Admissions Scandal” is passed to a binary classifier for detecting the ”establishing shots” label. The classifier outputs a very high score (score is between 0 and 1), indicating that the video clip is very likely an establishing shot. In filmmaking, an establishing shot is a wide shot (i.e. video clip between two consecutive cuts) of a building or a landscape that is intended for establishing the time and location of the scene.

Video understanding via an extensible set of video classifiers

Binary classification allows for independence and flexibility, allowing us to add or improve one model independent of the others. It also has the additional benefit of being easier to understand and build for our users. Combining the predictions of multiple models allows us a deeper understanding of the video content at various levels of granularity, illustrated in Fig 2.

Fig 2- Three video clips and the corresponding binary classifier scores for three video understanding labels. Note that these labels are not mutually exclusive. Video clips are from Operation Varsity Blues: The College Admissions Scandal, 6 Underground, and Leave The World Behind, respectively.

Video Annotator (VA)

In this section, we describe VA’s three-step process for building video classifiers.

Step 1 — search

Users begin by finding an initial set of examples within a large, diverse corpus to bootstrap the annotation process. We leverage text-to-video search to enable this, powered by video and text encoders from a Vision-Language Model to extract embeddings. For example, an annotator working on the establishing shots model may start the process by searching for “wide shots of buildings”, illustrated in Fig 3.

Fig 3- Step 1 — Text-to-video search to bootstrap the annotation process.

Step 2 — active learning

The next stage involves a classic Active Learning loop. VA then builds a lightweight binary classifier over the video embeddings, which is subsequently used to score all clips in the corpus, and presents some examples within feeds for further annotation and refinement, as illustrated in Fig 4.

Fig 4- Step 2 — Active Learning loop. The annotator clicks on build, which initiates classifier training and scoring of all clips in a video corpus. Scored clips are organized in four feeds.

The top-scoring positive and negative feeds display examples with the highest and lowest scores respectively. Our users reported that this provided a valuable indication as to whether the classifier has picked up the correct concepts in the early stages of training and spot cases of bias in the training data that they were able to subsequently fix. We also include a feed of “borderline” examples that the model is not confident about. This feed helps with discovering interesting edge cases and inspires the need for labeling additional concepts. Finally, the random feed consists of randomly selected clips and helps to annotate diverse examples which is important for generalization.

The annotator can label additional clips in any of the feeds and build a new classifier and repeat as many times as desired.

Step 3 — review

The last step simply presents the user with all annotated clips. It’s a good opportunity to spot annotation mistakes and to identify ideas and concepts for further annotation via search in step 1. From this step, users often go back to step 1 or step 2 to refine their annotations.

Experiments

To evaluate VA, we asked three video experts to annotate a diverse set of 56 labels across a video corpus of 500k shots. We compared VA to the performance of a few baseline methods, and observed that VA leads to the creation of higher quality video classifiers. Fig 5 compares VA’s performance to baselines as a function of the number of annotated clips.

Fig 5- Model quality (i.e. Average Precision) as a function of the number of annotated clips for the “establishing shots” label. We observe that all methods outperform the baseline, and that all methods benefit from additional annotated data, albeit to varying degrees.

You can find more details about VA and our experiments in this paper.

Conclusion

We presented Video Annotator (VA), an interactive framework that addresses many challenges associated with conventional techniques for training machine learning classifiers. VA leverages the zero-shot capabilities of large vision-language models and active learning techniques to enhance sample efficiency and reduce costs. It offers a unique approach to annotating, managing, and iterating on video classification datasets, emphasizing the direct involvement of domain experts in a human-in-the-loop system. By enabling these users to rapidly make informed decisions on hard samples during the annotation process, VA increases the system’s overall efficiency. Moreover, it allows for a continuous annotation process, allowing users to swiftly deploy models, monitor their quality in production, and rapidly fix any edge cases.

This self-service architecture empowers domain experts to make improvements without the active involvement of data scientists or third-party annotators, and fosters a sense of ownership, thereby building trust in the system.

We conducted experiments to study the performance of VA, and found that it yields a median 8.3 point improvement in Average Precision relative to the most competitive baseline across a wide-ranging assortment of video understanding tasks. We release a dataset with 153k labels across 56 video understanding tasks annotated by three professional video editors using VA, and also release code to replicate our experiments.


Video annotator: building video classifiers using vision-language models and active learning was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Round 2: A Survey of Causal Inference Applications at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/round-2-a-survey-of-causal-inference-applications-at-netflix-fd78328ee0bb

At Netflix, we want to ensure that every current and future member finds content that thrills them today and excites them to come back for more. Causal inference is an essential part of the value that Data Science and Engineering adds towards this mission. We rely heavily on both experimentation and quasi-experimentation to help our teams make the best decisions for growing member joy.

Building off of our last successful Causal Inference and Experimentation Summit, we held another week-long internal conference this year to learn from our stunning colleagues. We brought together speakers from across the business to learn about methodological developments and innovative applications.

We covered a wide range of topics and are excited to share five talks from that conference with you in this post. This will give you a behind the scenes look at some of the causal inference research happening at Netflix!

Metrics Projection for Growth A/B Tests

Mihir Tendulkar, Simon Ejdemyr, Dhevi Rajendran, David Hubbard, Arushi Tomar, Steve Beckett, Judit Lantos, Cody Chapman, Ayal Chen-Zion, Apoorva Lal, Ekrem Kocaguneli, Kyoko Shimada

Experimentation is in Netflix’s DNA. When we launch a new product feature, we use — where possible — A/B test results to estimate the annualized incremental impact on the business.

Historically, that estimate has come from our Finance, Strategy, & Analytics (FS&A) partners. For each test cell in an experiment, they manually forecast signups, retention probabilities, and cumulative revenue on a one year horizon, using monthly cohorts. The process can be repetitive and time consuming.

We decided to build out a faster, automated approach that boils down to estimating two pieces of missing data. When we run an A/B test, we might allocate users for one month, and monitor results for only two billing periods. In this simplified example, we have one member cohort, and we have two billing period treatment effects (𝜏.cohort1,period1 and 𝜏.cohort1,period2, which we will shorten to 𝜏.1,1 and 𝜏.1,2, respectively).

To measure annualized impact, we need to estimate:

  1. Unobserved billing periods. For the first cohort, we don’t have treatment effects (TEs) for their third through twelfth billing periods (𝜏.1,j , where j = 3…12).
  2. Unobserved sign up cohorts. We only observed one monthly signup cohort, and there are eleven more cohorts in a year. We need to know both the size of these cohorts, and their TEs (𝜏.i,j, where i = 2…12 and j = 1…12).

For the first piece of missing data, we used a surrogate index approach. We make a standard assumption that the causal path from the treatment to the outcome (in this case, Revenue) goes through the surrogate of retention. We leverage our proprietary Retention Model and short-term observations — in the above example, 𝜏.1,2 — to estimate 𝜏.1,j , where j = 3…12.

For the second piece of missing data, we assume transportability: that each subsequent cohort’s billing-period TE is the same as the first cohort’s TE. Note that if you have long-running A/B tests, this is a testable assumption!

Fig. 1: Monthly cohort-based activity as measured in an A/B test. In green, we show the allocation window throughout January, while blue represents the January cohort’s observation window. From this, we can directly observe 𝜏.1 and 𝜏.2, and we can project later 𝜏.j forward using the surrogate-based approach. We can transport values from observed cohorts to unobserved cohorts.

Now, we can put the pieces together. For the first cohort, we project TEs forward. For unobserved cohorts, we transport the TEs from the first cohort and collapse our notation to remove the cohort index: 𝜏.1,1 is now written as just 𝜏.1. We estimate the annualized impact by summing the values from each cohort.

We empirically validated our results from this method by comparing to long-running AB tests and prior results from our FS&A partners. Now we can provide quicker and more accurate estimates of the longer term value our product features are delivering to members.

A Systematic Framework for Evaluating Game Events

Claire Willeck, Yimeng Tang

In Netflix Games DSE, we are asked many causal inference questions after an intervention has been implemented. For example, how did a product change impact a game’s performance? Or how did a player acquisition campaign impact a key metric?

While we would ideally conduct AB tests to measure the impact of an intervention, it is not always practical to do so. In the first scenario above, A/B tests were not planned before the intervention’s launch, so we needed to use observational causal inference to assess its effectiveness. In the second scenario, the campaign is at the country level, meaning everyone in the country is in the treatment group, which makes traditional A/B tests inviable.

To evaluate the impacts of various game events and updates and to help our team scale, we designed a framework and package around variations of synthetic control.

For most questions in Games, we have game-level or country-level interventions and relatively little data. This means most pre-existing packages that rely on time-series forecasting, unit-level data, or instrumental variables are not useful.

Our framework utilizes a variety of synthetic control (SC) models, including Augmented SC, Robust SC, Penalized SC, and synthetic difference-in-differences, since different approaches can work best in different cases. We utilize a scale-free metric to evaluate the performance of each model and select the one that minimizes pre-treatment bias. Additionally, we conduct robustness tests like backdating and apply inference measures based on the number of control units.

Fig. 2: Example of Augmented Synthetic Control model used to reduce pre-treatment bias by fitting the model in the training period and evaluating performance in the validation period. In this example, the Augmented Synthetic Control model reduced the pre-treatment bias in the validation period more than the other synthetic control variations.

This framework and package allows our team, and other teams, to tackle a broad set of causal inference questions using a consistent approach.

Double Machine Learning for Weighing Metrics Tradeoffs

Apoorva Lal, Winston Chou, Jordan Schafer

As Netflix expands into new business verticals, we’re increasingly seeing examples of metric tradeoffs in A/B tests — for example, an increase in games metrics may occur alongside a decrease in streaming metrics. To help decision-makers navigate scenarios where metrics disagree, we developed a method to compare the relative importance of different metrics (viewed as “treatments”) in terms of their causal effect on the north-star metric (Retention) using Double Machine Learning (DML).

In our first pass at this problem, we found that ranking treatments according to their Average Treatment Effects using DML with a Partially Linear Model (PLM) could yield an incorrect ranking when treatments have different marginal distributions. The PLM ranking would be correct if treatment effects were constant and additive. However, when treatment effects are heterogeneous, PLM upweights the effects for members whose treatment values are most unpredictable. This is problematic for comparing treatments with different baselines.

Instead, we discretized each treatment into bins and fit a multiclass propensity score model. This lets us estimate multiple Average Treatment Effects (ATEs) using Augmented Inverse-Propensity-Weighting (AIPW) to reflect different treatment contrasts, for example the effect of low versus high exposure.

We then weight these treatment effects by the baseline distribution. This yields an “apples-to-apples” ranking of treatments based on their ATE on the same overall population.

Fig. 3: Comparison of PLMs vs. AIPW in estimating treatment effects. Because PLMs do not estimate average treatment effects when effects are heterogeneous, they do not rank metrics by their Average Treatment Effects, whereas AIPW does.

In the example above, we see that PLM ranks Treatment 1 above Treatment 2, while AIPW correctly ranks the treatments in order of their ATEs. This is because PLM upweights the Conditional Average Treatment Effect for units that have more unpredictable treatment assignment (in this example, the group defined by x = 1), whereas AIPW targets the ATE.

Survey AB Tests with Heterogeneous Non-Response Bias

Andreas Aristidou, Carolyn Chu

To improve the quality and reach of Netflix’s survey research, we leverage a research-on-research program that utilizes tools such as survey AB tests. Such experiments allow us to directly test and validate new ideas like providing incentives for survey completion, varying the invitation’s subject-line, message design, time-of-day to send, and many other things.

In our experimentation program we investigate treatment effects on not only primary success metrics, but also on guardrail metrics. A challenge we face is that, in many of our tests, the intervention (e.g. providing higher incentives) and success metrics (e.g. percent of invited members who begin the survey) are upstream of guardrail metrics such as answers to specific questions designed to measure data quality (e.g. survey straightlining).

In such a case, the intervention may (and, in fact, we expect it to) distort upstream metrics (especially sample mix), the balance of which is a necessary component for the identification of our downstream guardrail metrics. This is a consequence of non-response bias, a common external validity concern with surveys that impacts how generalizable the results can be.

For example, if one group of members — group X — responds to our survey invitations at a significantly lower rate than another group — group Y — , then average treatment effects will be skewed towards the behavior of group Y. Further, in a survey AB test, the type of non-response bias can differ between control and treatment groups (e.g. different groups of members may be over/under represented in different cells of the test), thus threatening the internal validity of our test by introducing a covariate imbalance. We call this combination heterogeneous non-response bias.

To overcome this identification problem and investigate treatment effects on downstream metrics, we leverage a combination of several techniques. First, we look at conditional average treatment effects (CATE) for particular sub-populations of interest where confounding covariates are balanced in each strata.

In order to examine the average treatment effects, we leverage a combination of propensity scores to correct for internal validity issues and iterative proportional fitting to correct for external validity issues. With these techniques, we can ensure that our surveys are of the highest quality and that they accurately represent our members’ opinions, thus helping us build products that they want to see.

Design: The Intersection of Humans and Technology

Rina Chang

A design talk at a causal inference conference? Why, yes! Because design is about how a product works, it is fundamentally interwoven into the experimentation platform at Netflix. Our product serves the huge variety of internal users at Netflix who run — and consume the results of — A/B tests. Thus, choosing how to enable our users to take action and how we present data in the product is critical to decision-making via experimentation.

If you were to display some numbers and text, you might opt to show it in a tabular format.

While there is nothing inherently wrong with this presentation, it is not as easily digested as something more visual.

If your goal is to illustrate that those three numbers add up to 100%, and thus are parts of a whole, then you might choose a pie chart.

If you wanted to show how these three numbers combine to illustrate progress toward a goal, then you might choose a stacked bar chart.

Alternatively, if your goal was to compare these three numbers against each other, then you might choose a bar chart instead.

All of these show the same information, but the choice of presentation changes how easily a consumer of an infographic understands the “so what?” of the point you’re trying to convey. Note that there is no “right” solution here; rather, it depends on the desired takeaway.

Thoughtful design applies not only to static representations of data, but also to interactive experiences. In this example, a single item within a long form could be represented by having a pre-filled value.

Alternatively, the same functionality could be achieved by displaying a default value in text, with the ability to edit it.

While functionally equivalent, this UI change shifts the user’s narrative from “Is this value correct?” to “Do I need to do something that is not ‘normal’?” — which is a much easier question to answer. Zooming out even more, thoughtful design addresses product-level choices like if a person knows where to go to accomplish a task. In general, thoughtful design influences product strategy.

Design permeates all aspects of our experimentation product at Netflix, from small choices like color to strategic choices like our roadmap. By thoughtfully approaching design, we can ensure that tools help the team learn the most from our experiments.

External Speaker: Kosuke Imai

In addition to the amazing talks by Netflix employees, we also had the privilege of hearing from Kosuke Imai, Professor of Government and Statistics at Harvard, who delivered our keynote talk. He introduced the “cram method,” a powerful and efficient approach to learning and evaluating treatment policies using generic machine learning algorithms.

Measuring causality is a large part of the data science culture at Netflix, and we are proud to have many stunning colleagues who leverage both experimentation and quasi-experimentation to drive member impact. The conference was a great way to celebrate each other’s work and highlight the ways in which causal methodology can create value for the business.

To stay up to date on our work, follow the Netflix Tech Blog, and if you are interested in joining us, we are currently looking for new stunning colleagues to help us entertain the world!


Round 2: A Survey of Causal Inference Applications at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

The Making of VES: the Cosmos Microservice for Netflix Video Encoding

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/the-making-of-ves-the-cosmos-microservice-for-netflix-video-encoding-946b9b3cd300

Liwei Guo, Vinicius Carvalho, Anush Moorthy, Aditya Mavlankar, Lishan Zhu

This is the second post in a multi-part series from Netflix. See here for Part 1 which provides an overview of our efforts in rebuilding the Netflix video processing pipeline with microservices. This blog dives into the details of building our Video Encoding Service (VES), and shares our learnings.

Cosmos is the next generation media computing platform at Netflix. Combining microservice architecture with asynchronous workflows and serverless functions, Cosmos aims to modernize Netflix’s media processing pipelines with improved flexibility, efficiency, and developer productivity. In the past few years, the video team within Encoding Technologies (ET) has been working on rebuilding the entire video pipeline on Cosmos.

This new pipeline is composed of a number of microservices, each dedicated to a single functionality. One such microservice is Video Encoding Service (VES). Encoding is an essential component of the video pipeline. At a high level, it takes an ingested mezzanine and encodes it into a video stream that is suitable for Netflix streaming or serves some studio/production use case. In the case of Netflix, there are a number of requirements for this service:

  • Given the wide range of devices from mobile phones to browsers to Smart TVs, multiple codec formats, resolutions, and quality levels need to be supported.
  • Chunked encoding is a must to meet the latency requirements of our business needs, and use cases with different levels of latency sensitivity need to be accommodated.
  • The capability of continuous release is crucial for enabling fast product innovation in both streaming and studio spaces.
  • There is a huge volume of encoding jobs every day. The service needs to be cost-efficient and make the most use of available resources.

In this tech blog, we will walk through how we built VES to achieve the above goals and will share a number of lessons we learned from building microservices. Please note that for simplicity, we have chosen to omit certain Netflix-specific details that are not integral to the primary message of this blog post.

Building Video Encoding Service on Cosmos

A Cosmos microservice consists of three layers: an API layer (Optimus) that takes in requests, a workflow layer (Plato) that orchestrates the media processing flows, and a serverless computing layer (Stratum) that processes the media. These three layers communicate asynchronously through a home-grown, priority-based messaging system called Timestone. We chose Protobuf as the payload format for its high efficiency and mature cross-platform support.

To help service developers get a head start, the Cosmos platform provides a powerful service generator. This generator features an intuitive UI. With a few clicks, it creates a basic yet complete Cosmos service: code repositories for all 3 layers are created; all platform capabilities, including discovery, logging, tracing, etc., are enabled; release pipelines are set up and dashboards are readily accessible. We can immediately start adding video encoding logic and deploy the service to the cloud for experimentation.

Optimus

As the API layer, Optimus serves as the gateway into VES, meaning service users can only interact with VES through Optimus. The defined API interface is a strong contract between VES and the external world. As long as the API is stable, users are shielded from internal changes in VES. This decoupling is instrumental in enabling faster iterations of VES internals.

As a single-purpose service, the API of VES is quite clean. We defined an endpoint encodeVideo that takes an EncodeRequest and returns an EncodeResponse (in an async way through Timestone messages). The EncodeRequest object contains information about the source video as well as the encoding recipe. All the requirements of the encoded video (codec, resolution, etc.) as well as the controls for latency (chunking directives) are exposed through the data model of the encoding recipe.

//protobuf definition 

message EncodeRequest {
VideoSource video_source = 1;//source to be encoded
Recipe recipe = 2; //including encoding format, resolution, etc.
}

message EncodeResponse {
OutputVideo output_video = 1; //encoded video
Error error = 2; //error message (optional)
}

message Recipe {
Codec codec = 1; //including codec format, profile, level, etc.
Resolution resolution = 2;
ChunkingDirectives chunking_directives = 3;
...
}

Like any other Cosmos service, the platform automatically generates an RPC client based on the VES API data model, which users can use to build the request and invoke VES. Once an incoming request is received, Optimus performs validations, and (when applicable) converts the incoming data into an internal data model before passing it to the next layer, Plato.

Like any other Cosmos service, the platform automatically generates an RPC client based on the VES API data model, which users can use to build the request and invoke VES. Once an incoming request is received, Optimus performs validations, and (when applicable) converts the incoming data into an internal data model before passing it to the next layer, Plato.

Plato

The workflow layer, Plato, governs the media processing steps. The Cosmos platform supports two programming paradigms for Plato: forward chaining rule engine and Directed Acyclic Graph (DAG). VES has a linear workflow, so we chose DAG for its simplicity.

In a DAG, the workflow is represented by nodes and edges. Nodes represent stages in the workflow, while edges signify dependencies — a stage is only ready to execute when all its dependencies have been completed. VES requires parallel encoding of video chunks to meet its latency and resilience goals. This workflow-level parallelism is facilitated by the DAG through a MapReduce mode. Nodes can be annotated to indicate this relationship, and a Reduce node will only be triggered when all its associated Map nodes are ready.

For the VES workflow, we defined five Nodes and their associated edges, which are visualized in the following graph:

  • Splitter Node: This node divides the video into chunks based on the chunking directives in the recipe.
  • Encoder Node: This node encodes a video chunk. It is a Map node.
  • Assembler Node: This node stitches the encoded chunks together. It is a Reduce node.
  • Validator Node: This node performs the validation of the encoded video.
  • Notifier Node: This node notifies the API layer once the entire workflow is completed.

In this workflow, nodes such as the Notifier perform very lightweight operations and can be directly executed in the Plato runtime. However, resource-intensive operations need to be delegated to the computing layer (Stratum), or another service. Plato invokes Stratum functions for tasks such as encoding and assembling, where the nodes (Encoder and Assembler) post messages to the corresponding message queues. The Validator node calls another Cosmos service, the Video Validation Service, to validate the assembled encoded video.

Stratum

The computing layer, Stratum, is where media samples can be accessed. Developers of Cosmos services create Stratum Functions to process the media. They can bring their own media processing tools, which are packaged into Docker images of the Functions. These Docker images are then published to our internal Docker registry, part of Titus. In production, Titus automatically scales instances based on the depths of job queues.

VES needs to support encoding source videos into a variety of codec formats, including AVC, AV1, and VP9, to name a few. We use different encoder binaries (referred to simply as “encoders”) for different codec formats. For AVC, a format that is now 20 years old, the encoder is quite stable. On the other hand, the newest addition to Netflix streaming, AV1, is continuously going through active improvements and experimentations, necessitating more frequent encoder upgrades. ​​To effectively manage this variability, we decided to create multiple Stratum Functions, each dedicated to a specific codec format and can be released independently. This approach ensures that upgrading one encoder will not impact the VES service for other codec formats, maintaining stability and performance across the board.

Within the Stratum Function, the Cosmos platform provides abstractions for common media access patterns. Regardless of file formats, sources are uniformly presented as locally mounted frames. Similarly, for output that needs to be persisted in the cloud, the platform presents the process as writing to a local file. All details, such as streaming of bytes and retrying on errors, are abstracted away. With the platform taking care of the complexity of the infrastructure, the essential code for video encoding in the Stratum Function could be as simple as follows.

ffmpeg -i input/source%08d.j2k -vf ... -c:v libx264 ... output/encoding.264

Encoding is a resource-intensive process, and the resources required are closely related to the codec format and the encoding recipe. We conducted benchmarking to understand the resource usage pattern, particularly CPU and RAM, for different encoding recipes. Based on the results, we leveraged the “container shaping” feature from the Cosmos platform.

We defined a number of different “container shapes”, specifying the allocations of resources like CPU and RAM.

# an example definition of container shape
group: containerShapeExample1
resources:
numCpus: 2
memoryInMB: 4000
networkInMbp: 750
diskSizeInMB: 12000

Routing rules are created to assign encoding jobs to different shapes based on the combination of codec format and encoding resolution. This helps the platform perform “bin packing”, thereby maximizing resource utilization.

An example of “bin-packing”. The circles represent CPU cores and the area represents the RAM. This 16-core EC2 instance is packed with 5 encoding containers (rectangles) of 3 different shapes (indicated by different colors).

Continuous Release

After we completed the development and testing of all three layers, VES was launched in production. However, this did not mark the end of our work. Quite the contrary, we believed and still do that a significant part of a service’s value is realized through iterations: supporting new business needs, enhancing performance, and improving resilience. An important piece of our vision was for Cosmos services to have the ability to continuously release code changes to production in a safe manner.

Focusing on a single functionality, code changes pertaining to a single feature addition in VES are generally small and cohesive, making them easy to review. Since callers can only interact with VES through its API, internal code is truly “implementation details” that are safe to change. The explicit API contract limits the test surface of VES. Additionally, the Cosmos platform provides a pyramid-based testing framework to guide developers in creating tests at different levels.

After testing and code review, changes are merged and are ready for release. The release pipeline is fully automated: after the merge, the pipeline checks out code, compiles, builds, runs unit/integration/end-to-end tests as prescribed, and proceeds to full deployment if no issues are encountered. Typically, it takes around 30 minutes from code merge to feature landing (a process that took 2–4 weeks in our previous generation platform!). The short release cycle provides faster feedback to developers and helps them make necessary updates while the context is still fresh.

Screenshot of a release pipeline run in our production environment

When running in production, the service constantly emits metrics and logs. They are collected by the platform to visualize dashboards and to drive monitoring/alerting systems. Metrics deviating too much from the baseline will trigger alerts and can lead to automatic service rollback (when the “canary” feature is enabled).

The Learnings:

VES was the very first microservice that our team built. We started with basic knowledge of microservices and learned a multitude of lessons along the way. These learnings deepened our understanding of microservices and have helped us improve our design choices and decisions.

Define a Proper Service Scope

A principle of microservice architecture is that a service should be built for a single functionality. This sounds straightforward, but what exactly qualifies a “single functionality”? “Encoding video” sounds good but wouldn’t “encode video into the AVC format” be an even more specific single-functionality?

When we started building the VES, we took the approach of creating a separate encoding service for each codec format. While this has advantages such as decoupled workflows, quickly we were overwhelmed by the development overhead. Imagine that a user requested us to add the watermarking capability to the encoding. We needed to make changes to multiple microservices. What is worse, changes in all these services are very similar and essentially we are adding the same code (and tests) again and again. Such kind of repetitive work can easily wear out developers.

The service presented in this blog is our second iteration of VES (yes, we already went through one iteration). In this version, we consolidated encodings for different codec formats into a single service. They share the same API and workflow, while each codec format has its own Stratum Functions. So far this seems to strike a good balance: the common API and workflow reduces code repetition, while separate Stratum Functions guarantee independent evolution of each codec format.

The changes we made are not irreversible. If someday in the future, the encoding of one particular codec format evolves into a totally different workflow, we have the option to spin it off into its own microservice.

Be Pragmatic about Data Modeling

In the beginning, we were very strict about data model separation — we had a strong belief that sharing equates to coupling, and coupling could lead to potential disasters in the future. To avoid this, for each service as well as the three layers within a service, we defined its own data model and built converters to translate between different data models.

We ended up creating multiple data models for aspects such as bit-depth and resolution across our system. To be fair, this does have some merits. For example, our encoding pipeline supports different bit-depths for AVC encoding (8-bit) and AV1 encoding (10-bit). By defining both AVC.BitDepth and AV1.BitDepth, constraints on the bit-depth can be built into the data models. However, it is debatable whether the benefits of this differentiation power outweigh the downsides, namely multiple data model translations.

Eventually, we created a library to host data models for common concepts in the video domain. Examples of such concepts include frame rate, scan type, color space, etc. As you can see, they are extremely common and stable. This “common” data model library is shared across all services owned by the video team, avoiding unnecessary duplications and data conversions. Within each service, additional data models are defined for service-specific objects.

Embrace Service API Changes

This may sound contradictory. We have been saying that an API is a strong contract between the service and its users, and keeping an API stable shields users from internal changes. This is absolutely true. However, none of us had a crystal ball when we were designing the very first version of the service API. It is inevitable that at a certain point, this API becomes inadequate. If we hold the belief that “the API cannot change” too dearly, developers would be forced to find workarounds, which are almost certainly sub-optimal.

There are many great tech articles about gracefully evolving API. We believe we also have a unique advantage: VES is a service internal to Netflix Encoding Technologies (ET). Our two users, the Streaming Workflow Orchestrator and the Studio Workflow Orchestrator, are owned by the workflow team within ET. Our teams share the same contexts and work towards common goals. If we believe updating API is in the best interest of Netflix, we meet with them to seek alignment. Once a consensus to update the API is reached, teams collaborate to ensure a smooth transition.

Stay Tuned…

This is the second part of our tech blog series Rebuilding Netflix Video Pipeline with Microservices. In this post, we described the building process of the Video Encoding Service (VES) in detail as well as our learnings. Our pipeline includes a few other services that we plan to share about as well. Stay tuned for our future blogs on this topic of microservices!


The Making of VES: the Cosmos Microservice for Netflix Video Encoding was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Reverse Searching Netflix’s Federated Graph

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/reverse-searching-netflixs-federated-graph-222ac5d23576

By Ricky Gardiner, Alex Hutter, and Katie Lefevre

Since our previous posts regarding Content Engineering’s role in enabling search functionality within Netflix’s federated graph (the first post, where we identify the issue and elaborate on the indexing architecture, and the second post, where we detail how we facilitate querying) there have been significant developments. We’ve opened up Studio Search beyond Content Engineering to the entirety of the Engineering organization at Netflix and renamed it Graph Search. There are over 100 applications integrated with Graph Search and nearly 50 indices we support. We continue to add functionality to the service. As promised in the previous post, we’ll share how we partnered with one of our Studio Engineering teams to build reverse search. Reverse search inverts the standard querying pattern: rather than finding documents that match a query, it finds queries that match a document.

Intro

Tiffany is a Netflix Post Production Coordinator who oversees a slate of nearly a dozen movies in various states of pre-production, production, and post-production. Tiffany and her team work with various cross-functional partners, including Legal, Creative, and Title Launch Management, tracking the progression and health of her movies.

So Tiffany subscribes to notifications and calendar updates specific to certain areas of concern, like “movies shooting in Mexico City which don’t have a key role assigned”, or “movies that are at risk of not being ready by their launch date”.

Tiffany is not subscribing to updates of particular movies, but subscribing to queries that return a dynamic subset of movies. This poses an issue for those of us responsible for sending her those notifications. When a movie changes, we don’t know who to notify, since there’s no association between employees and the movies they’re interested in.

We could save these searches, and then repeatedly query for the results of every search, but because we’re part of a large federated graph, this would have heavy traffic implications for every service we’re connected to. We’d have to decide if we wanted timely notifications or less load on our graph.

If we could answer the question “would this movie be returned by this query”, we could re-query based on change events with laser precision and not impact the broader ecosystem.

The Solution

Graph Search is built on top of Elasticsearch, which has the exact capabilities we require:

Instead of taking a search (like “spanish-language movies shot in Mexico City”) and returning the documents that match (One for Roma, one for Familia), a percolate query takes a document (one for Roma) and returns the searches that match that document, like “spanish-language movies” and “scripted dramas”.

We’ve communicated this functionality as the ability to save a search, called SavedSearches, which is a persisted filter on an existing index.

type SavedSearch {
id: ID!
filter: String
index: SearchIndex!
}

That filter, written in Graph Search DSL, is converted to an Elasticsearch query and indexed in a percolator field. To learn more about Graph Search DSL and why we created it rather than using Elasticsearch query language directly, see the Query Language section of “How Netflix Content Engineering makes a federated graph searchable (Part 2)”.

We’ve called the process of finding matching saved searches ReverseSearch. This is the most straightforward part of this offering. We added a new resolver to the Domain Graph Service (DGS) for Graph Search. It takes the index of interest and a document, and returns all the saved searches that match the document by issuing a percolate query.

"""
Query for retrieving all the registered saved searches, in a given index,
based on a provided document. The document in this case is an ElasticSearch
document that is generated based on the configuration of the index.
"""
reverseSearch(
after: String,
document: JSON!,
first: Int!,
index: SearchIndex!): SavedSearchConnection

Persisting a SavedSearch is implemented as a new mutation on the Graph Search DGS. This ultimately triggers the indexing of an Elasticsearch query in a percolator field.

"""
Mutation for registering and updating a saved search. They need to be updated
any time a user adjusts their search criteria.
"""
upsertSavedSearch(input: UpsertSavedSearchInput!): UpsertSavedSearchPayload

Supporting percolator fields fundamentally changed how we provision the indexing pipelines for Graph Search (see Architecture section of How Netflix Content Engineering makes a federated graph searchable). Rather than having a single indexing pipeline per Graph Search index we now have two: one to index documents and one to index saved searches to a percolate index. We chose to add percolator fields to a separate index in order to tune performance for the two types of queries separately.

Elasticsearch requires the percolate index to have a mapping that matches the structure of the queries it stores and therefore must match the mapping of the document index. Index templates define mappings that are applied when creating new indices. By using the index_patterns functionality of index templates, we’re able to share the mapping for the document index between the two. index_patterns also gives us an easy way to add a percolator field to every percolate index we create.

Example of document index mapping

Index pattern — application_*

{
"order": 1,
"index_patterns": ["application_*"],
"mappings": {
"properties": {
"movieTitle": {
"type": "keyword"
},
"isArchived": {
"type": "boolean"
}
}
}

Example of percolate index mappings

Index pattern — *_percolate

{
"order": 2,
"index_patterns": ["*_percolate*"],
"mappings": {
"properties": {
"percolate_query": {
"type": "percolator"
}
}
}
}

Example of generated mapping

Percolate index name is application_v1_percolate

{
"application_v1_percolate": {
"mappings": {
"_doc": {
"properties": {
"movieTitle": {
"type": "keyword"
},
"isArchived": {
"type": "boolean"
},
"percolate_query": {
"type": "percolator"
}
}
}
}
}
}

Percolate Indexing Pipeline

The percolate index isn’t as simple as taking the input from the GraphQL mutation, translating it to an Elasticsearch query, and indexing it. Versioning, which we’ll talk more about shortly, reared its ugly head and made things a bit more complicated. Here is the way the percolate indexing pipeline is set up.

See Data Mesh — A Data Movement and Processing Platform @ Netflix to learn more about Data Mesh.
  1. When SavedSearches are modified, we store them in our CockroachDB, and the source connector for the Cockroach database emits CDC events.
  2. A single table is shared for the storage of all SavedSearches, so the next step is filtering down to just those that are for *this* index using a filter processor.
  3. As previously mentioned, what is stored in the database is our custom Graph Search filter DSL, which is not the same as the Elasticsearch DSL, so we cannot directly index the event to the percolate index. Instead, we issue a mutation to the Graph Search DGS. The Graph Search DGS translates the DSL to an Elasticsearch query.
  4. Then we index the Elasticsearch query as a percolate field in the appropriate percolate index.
  5. The success or failure of the indexing of the SavedSearch is returned. On failure, the SavedSearch events are sent to a Dead Letter Queue (DLQ) that can be used to address any failures, such as fields referenced in the search query being removed from the index.

Now a bit on versioning to explain why the above is necessary. Imagine we’ve started tagging movies that have animals. If we want users to be able to create views of “movies with animals”, we need to add this new field to the existing search index to flag movies as such. However, the mapping in the current index doesn’t include it, so we can’t filter on it. To solve for this we have index versions.

Dalia & Forrest from the series Baby Animal Cam

When a change is made to an index definition that necessitates a new mapping, like when we add the animal tag, Graph Search creates a new version of the Elasticsearch index and a new pipeline to populate it. This new pipeline reads from a log-compacted Kafka topic in Data Mesh — this is how we can reindex the entire corpus without asking the data sources to resend all the old events. The new pipeline and the old pipeline run side by side, until the new pipeline has processed the backlog, at which point Graph Search cuts over to the version using Elasticsearch index aliases.

Creating a new index for our documents means we also need to create a new percolate index for our queries so they can have consistent index mappings. This new percolate index also needs to be backfilled when we change versions. This is why the pipeline works the way it does — we can again utilize the log compacted topics in Data Mesh to reindex the corpus of SavedSearches when we spin up a new percolate indexing pipeline.

We persist the user provided filter DSL to the database rather than immediately translating it to Elasticsearch query language. This enables us to make changes or fixes when we translate the saved search DSL to an Elasticsearch query . We can deploy those changes by creating a new version of the index as the bootstrapping process will re-translate every saved search.

Another Use Case

We hoped reverse search functionality would eventually be useful for other engineering teams. We were approached almost immediately with a problem that reverse searching could solve.

The way you make a movie can be very different based on the type of movie it is. One movie might go through a set of phases that are not applicable to another, or might need to schedule certain events that another movie doesn’t require. Instead of manually configuring the workflow for a movie based on its classifications, we should be able to define the means of classifying movies and use that to automatically assign them to workflows. But determining the classification of a movie is challenging: you could define these movie classifications based on genre alone, like “Action” or “Comedy”, but you likely require more complex definitions. Maybe it’s defined by the genre, region, format, language, or some nuanced combination thereof. The Movie Matching service provides a way to classify a movie based on any combination of matching criteria. Under the hood, the matching criteria are stored as reverse searches, and to determine which criteria a movie matches against, the movie’s document is submitted to the reverse search endpoint.

In short, reverse search is powering an externalized criteria matcher. It’s being used for movie criteria now, but since every Graph Search index is now reverse-search capable, any index could use this pattern.

A Possible Future: Subscriptions

Reverse searches also look like a promising foundation for creating more responsive UIs. Rather than fetching results once as a query, the search results could be provided via a GraphQL subscription. These subscriptions could be associated with a SavedSearch and, as index changes come in, reverse search can be used to determine when to update the set of keys returned by the subscription.


Reverse Searching Netflix’s Federated Graph was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Sequential Testing Keeps the World Streaming Netflix Part 2: Counting Processes

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/sequential-testing-keeps-the-world-streaming-netflix-part-2-counting-processes-da6805341642

Michael Lindon, Chris Sanden, Vache Shirikian, Yanjun Liu, Minal Mishra, Martin Tingley

Have you ever encountered a bug while streaming Netflix? Did your title stop unexpectedly, or not start at all? In the first installment of this blog series on sequential testing, we described our canary testing methodology for continuous metrics such as play-delay. One of our readers commented

What if the new release is not related to a new play/streaming feature? For example, what if the new release includes modified login functionality? Will you still monitor the “play-delay” metric?

Netflix monitors a large suite of metrics, many of which can be classified as counts. These include metrics such as the number of logins, errors, successful play starts, and even the number of customer call center contacts. In this second installment, we describe our sequential methodology for testing count metrics, outlined in the NeurIPS paper Anytime Valid Inference for Multinomial Count Data.

Spot the Difference

Suppose we are about to deploy new code that changes the login behavior. To de-risk the software rollout we A/B test the new code, known also as a canary test. Whenever an event such as a login occurs, a log flows through our real-time backend and the corresponding timestamp is recorded. Figure 1 illustrates the sequences of timestamps generated by devices assigned to the new (treatment) and existing (control) software versions. A question that naturally concerns us is whether there are fewer login events in the treatment. Can you tell?

Figure 1: Timestamps of events occurring in control and treatment

It is not immediately obvious by simple inspection of the point processes in Figure 1. The difference becomes immediately obvious when we visualize the observed counting processes, shown in Figure 2.

Figure 2: Visualizing the counting processes — the number of events observed by time t

The counting processes are functions that increment by 1 whenever a new event arrives. Clearly, there are fewer events occurring in the treatment than in the control. If these were login events, this would suggest that the new code contains a bug that prevents some users from being able to log in successfully.

This is a common situation when dealing with event timestamps. To give another example, if events corresponded to errors or crashes, we would like to know if these are accruing faster in the treatment than in the control. Moreover, we want to answer that question as quickly as possible to prevent any further disruption to the service. This necessitates sequential testing techniques which were introduced in part 1.

Time-Inhomogeneous Poisson Process

Our data for each treatment group is a realization of a one-dimensional point process, that is, a sequence of timestamps. As the rate at which the events arrive is time-varying (in both treatment and control), we model the point process as a time-inhomogeneous Poisson point process. This point process is defined by an intensity function λ: ℝ → [0, ∞). The number of events in the interval [0,t), denoted N(t), has the following Poisson distribution

N(t) ~ Poisson(Λ(t)), where Λ(t) = ∫₀ᵗ λ(s) ds.

We seek to test the null hypothesis H₀: λᴬ(t) = λᴮ(t) for all t i.e. the intensity functions for control (A) and treatment (B) are the same. This can be done semiparametrically without making any assumptions about the intensity functions λᴬ and λᴮ. Moreover, the novelty of the research is that this can be done sequentially, as described in section 4 of our paper. Conveniently, the only data required to test this hypothesis at time t is Nᴬ(t) and Nᴮ(t), the total number of events observed so far in control and treatment. In other words, all you need to test the null hypothesis is two integers, which can easily be updated as new events arrive. Here is an example from a simulated A/A test, in which we know by design that the intensity function is the same for the control (A) and the treatment (B), albeit nonstationary.

Figure 3: (Left) An A/A simulation of two inhomogeneous Poisson point processes. (Right) Confidence sequence on the log-difference of intensity functions, and sequential p-value.

Figure 3 provides an illustration of an A/A setting. The left figure presents the raw data and the intensity functions, and the right figure presents the sequential statistical analysis. The blue and red rug plots indicate the observed arrival timestamps of events from the treatment and control streams respectively. The dashed lines are the observed counting processes. As this data is simulated under the null, the intensity functions are identical and overlay each other. The left axis of the right figure visualizes the evolution of the confidence sequence on the log-difference of intensity functions. The right axis of the right figure visualizes the evolution of the sequential p-value. We can make the two following observations

  • Under the null, the difference of log intensities is zero, which is correctly covered by the 0.95 confidence sequence at all times.
  • The sequential p-value is greater than 0.05 at all times

Now let’s consider an illustration of an A/B setting. Figure 4 shows observed arrival times for treatment and control when the intensity functions differ. As this is a simulation, the true difference between log intensities is known.

Figure 4: (Left) An A/B simulation of two inhomogeneous Poisson point processes. (Right) Confidence sequence on the difference of log of intensity functions, and sequential p-value.

We can make the following observations

  • The 0.95 confidence sequence covers the true log-difference at all times
  • The sequential p-value falls below 0.05 at the same time the 0.95 confidence sequence excludes the null value of zero

Now we present a number of case studies where this methodology has rapidly detected serious problems in a number of count metrics

Case Study 1: Drop in Successful Title Starts

Figure 2 actually presents counts of title start events from a real canary test. Whenever a title starts successfully, an event is sent from the device to Netflix. We have a stream of title start events from treatment devices and a stream of title start events from control devices. Whenever fewer title starts are observed among treatment devices, there is usually a bug in the new client preventing playback.

In this case, the canary test detected a bug that was later determined to have prevented approximately 60% of treatment devices from being able to start their streams. The confidence sequence is shown in Figure 5, in addition to the (sequential) p-value. While the exact units of time have been omitted, this bug was detected at the sub-second level.

Figure 5: 0.99 Confidence sequence on the difference of log-intensities with sequential p-value.

Case Study 2: Increase in Abnormal Shutdowns

In addition to title start events, we also monitor whenever the Netflix client shuts down unexpectedly. As before, we have two streams of abnormal shutdown events, one from treatment devices, and one from control devices. The following screenshots are taken directly from our Lumen dashboards.

Figure 6: Counts of Abnormal Shutdowns over time, cumulative and non-cumulative. Treatment (Black) and Control (Blue)

Figure 6 illustrates two important points. There is clearly nonstationarity in the arrival of abnormal shutdown events. It is also not easy to visibly see any difference between treatment and control from the non-cumulative view. The difference is, however, much easier to see from the cumulative view by observing the counting process. There is a small but visible increase in the number of abnormal shutdowns in the treatment. Figure 7 shows how our sequential statistical methodology is even able to identify such small differences.

Figure 7: Abnormal Shutdowns. (Top Panel) Confidence sequences on λᴮ(t)/λᴬ(t) (shaded blue) with observed counting processes for treatment (black dashed) and control (blue dashed). (Bottom Panel) sequential p-values.

Case Study 3: Increase in Errors

Netflix also monitors the number of errors produced by treatment and control. This is a high cardinality metric as every error is annotated with a code indicating the type of error. Monitoring errors segmented by code helps developers diagnose issues quickly. Figure 8 shows the sequential p-values, on the log scale, for a set of error codes that Netflix monitors during client rollouts. In this example, we have detected a higher volume of 3.1.18 errors being produced by treatment devices. Devices experiencing this error are presented with the following message:

“We’re having trouble playing this title right now”

Figure 8: Sequential p-values for start play errors by error code
Figure 9: Observed error-3.1.18 timestamps and counting processes for treatment (blue) and control (red)

Knowing which errors increased can streamline the process of identifying the bug for our developers. We immediately send developers alerts through Slack integrations, such as the following

Figure 10: Notifications via Slack Integrations

The next time you are watching Netflix and encounter an error, know that we’re on it!

Try it Out!

The statistical approach outlined in our paper is remarkably easy to implement in practice. All you need are two integers, the number of events observed so far in the treatment and control. The code is available in this short GitHub gist. Here are two usage examples:

> counts = [100, 101]
> assignment_probabilities = [0.5, 0.5]
> sequential_p_value(counts, assignment_probabilities)
1

> counts = [100, 201]
> assignment_probabilities = [0.5, 0.5]
> sequential_p_value(counts, assignment_probabilities)
5.06061172163498e-06

The code generalizes to more than just two treatment groups. For full details, including hyperparameter tuning, see section 4 of the paper.

Further Reading


Sequential Testing Keeps the World Streaming Netflix Part 2: Counting Processes was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Supporting Diverse ML Systems at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/supporting-diverse-ml-systems-at-netflix-2d2e6b6d205d

David J. Berg, Romain Cledat, Kayla Seeley, Shashank Srikanth, Chaoying Wang, Darin Yu

Netflix uses data science and machine learning across all facets of the company, powering a wide range of business applications from our internal infrastructure and content demand modeling to media understanding. The Machine Learning Platform (MLP) team at Netflix provides an entire ecosystem of tools around Metaflow, an open source machine learning infrastructure framework we started, to empower data scientists and machine learning practitioners to build and manage a variety of ML systems.

Since its inception, Metaflow has been designed to provide a human-friendly API for building data and ML (and today AI) applications and deploying them in our production infrastructure frictionlessly. While human-friendly APIs are delightful, it is really the integrations to our production systems that give Metaflow its superpowers. Without these integrations, projects would be stuck at the prototyping stage, or they would have to be maintained as outliers outside the systems maintained by our engineering teams, incurring unsustainable operational overhead.

Given the very diverse set of ML and AI use cases we support — today we have hundreds of Metaflow projects deployed internally — we don’t expect all projects to follow the same path from prototype to production. Instead, we provide a robust foundational layer with integrations to our company-wide data, compute, and orchestration platform, as well as various paths to deploy applications to production smoothly. On top of this, teams have built their own domain-specific libraries to support their specific use cases and needs.

In this article, we cover a few key integrations that we provide for various layers of the Metaflow stack at Netflix, as illustrated above. We will also showcase real-life ML projects that rely on them, to give an idea of the breadth of projects we support. Note that all projects leverage multiple integrations, but we highlight them in the context of the integration that they use most prominently. Importantly, all the use cases were engineered by practitioners themselves.

These integrations are implemented through Metaflow’s extension mechanism which is publicly available but subject to change, and hence not a part of Metaflow’s stable API yet. If you are curious about implementing your own extensions, get in touch with us on the Metaflow community Slack.

Let’s go over the stack layer by layer, starting with the most foundational integrations.

Data: Fast Data

Our main data lake is hosted on S3, organized as Apache Iceberg tables. For ETL and other heavy lifting of data, we mainly rely on Apache Spark. In addition to Spark, we want to support last-mile data processing in Python, addressing use cases such as feature transformations, batch inference, and training. Occasionally, these use cases involve terabytes of data, so we have to pay attention to performance.

To enable fast, scalable, and robust access to the Netflix data warehouse, we have developed a Fast Data library for Metaflow, which leverages high-performance components from the Python data ecosystem:

As depicted in the diagram, the Fast Data library consists of two main interfaces:

  • The Table object is responsible for interacting with the Netflix data warehouse which includes parsing Iceberg (or legacy Hive) table metadata, resolving partitions and Parquet files for reading. Recently, we added support for the write path, so tables can be updated as well using the library.
  • Once we have discovered the Parquet files to be processed, MetaflowDataFrame takes over: it downloads data using Metaflow’s high-throughput S3 client directly to the process’ memory, which often outperforms reading of local files.

We use Apache Arrow to decode Parquet and to host an in-memory representation of data. The user can choose the most suitable tool for manipulating data, such as Pandas or Polars to use a dataframe API, or one of our internal C++ libraries for various high-performance operations. Thanks to Arrow, data can be accessed through these libraries in a zero-copy fashion.

We also pay attention to dependency issues: (Py)Arrow is a dependency of many ML and data libraries, so we don’t want our custom C++ extensions to depend on a specific version of Arrow, which could easily lead to unresolvable dependency graphs. Instead, in the style of nanoarrow, our Fast Data library only relies on the stable Arrow C data interface, producing a hermetically sealed library with no external dependencies.

Example use case: Content Knowledge Graph

Our knowledge graph of the entertainment world encodes relationships between titles, actors and other attributes of a film or series, supporting all aspects of business at Netflix.

A key challenge in creating a knowledge graph is entity resolution. There may be many different representations of slightly different or conflicting information about a title which must be resolved. This is typically done through a pairwise matching procedure for each entity which becomes non-trivial to do at scale.

This project leverages Fast Data and horizontal scaling with Metaflow’s foreach construct to load large amounts of title information — approximately a billion pairs — stored in the Netflix Data Warehouse, so the pairs can be matched in parallel across many Metaflow tasks.

We use metaflow.Table to resolve all input shards which are distributed to Metaflow tasks which are responsible for processing terabytes of data collectively. Each task loads the data using metaflow.MetaflowDataFrame, performs matching using Pandas, and populates a corresponding shard in an output Table. Finally, when all matching is done and data is written the new table is committed so it can be read by other jobs.

Compute: Titus

Whereas open-source users of Metaflow rely on AWS Batch or Kubernetes as the compute backend, we rely on our centralized compute-platform, Titus. Under the hood, Titus is powered by Kubernetes, but it provides a thick layer of enhancements over off-the-shelf Kubernetes, to make it more observable, secure, scalable, and cost-efficient.

By targeting @titus, Metaflow tasks benefit from these battle-hardened features out of the box, with no in-depth technical knowledge or engineering required from the ML engineers or data scientist end. However, in order to benefit from scalable compute, we need to help the developer to package and rehydrate the whole execution environment of a project in a remote pod in a reproducible manner (preferably quickly). Specifically, we don’t want to ask developers to manage Docker images of their own manually, which quickly results in more problems than it solves.

This is why Metaflow provides support for dependency management out of the box. Originally, we supported only @conda, but based on our work on Portable Execution Environments, open-source Metaflow gained support for @pypi a few months ago as well.

Example use case: Building model explainers

Here’s a fascinating example of the usefulness of portable execution environments. For many of our applications, model explainability matters. Stakeholders like to understand why models produce a certain output and why their behavior changes over time.

There are several ways to provide explainability to models but one way is to train an explainer model based on each trained model. Without going into the details of how this is done exactly, suffice to say that Netflix trains a lot of models, so we need to train a lot of explainers too.

Thanks to Metaflow, we can allow each application to choose the best modeling approach for their use cases. Correspondingly, each application brings its own bespoke set of dependencies. Training an explainer model therefore requires:

  1. Access to the original model and its training environment, and
  2. Dependencies specific to building the explainer model.

This poses an interesting challenge in dependency management: we need a higher-order training system, “Explainer flow” in the figure below, which is able to take a full execution environment of another training system as an input and produce a model based on it.

Explainer flow is event-triggered by an upstream flow, such Model A, B, C flows in the illustration. The build_environment step uses the metaflow environment command provided by our portable environments, to build an environment that includes both the requirements of the input model as well as those needed to build the explainer model itself.

The built environment is given a unique name that depends on the run identifier (to provide uniqueness) as well as the model type. Given this environment, the train_explainer step is then able to refer to this uniquely named environment and operate in an environment that can both access the input model as well as train the explainer model. Note that, unlike in typical flows using vanilla @conda or @pypi, the portable environments extension allows users to also fetch those environments directly at execution time as opposed to at deploy time which therefore allows users to, as in this case, resolve the environment right before using it in the next step.

Orchestration: Maestro

If data is the fuel of ML and the compute layer is the muscle, then the nerves must be the orchestration layer. We have talked about the importance of a production-grade workflow orchestrator in the context of Metaflow when we released support for AWS Step Functions years ago. Since then, open-source Metaflow has gained support for Argo Workflows, a Kubernetes-native orchestrator, as well as support for Airflow which is still widely used by data engineering teams.

Internally, we use a production workflow orchestrator called Maestro. The Maestro post shares details about how the system supports scalability, high-availability, and usability, which provide the backbone for all of our Metaflow projects in production.

A hugely important detail that often goes overlooked is event-triggering: it allows a team to integrate their Metaflow flows to surrounding systems upstream (e.g. ETL workflows), as well as downstream (e.g. flows managed by other teams), using a protocol shared by the whole organization, as exemplified by the example use case below.

Example use case: Content decision making

One of the most business-critical systems running on Metaflow supports our content decision making, that is, the question of what content Netflix should bring to the service. We support a massive scale of over 260M subscribers spanning over 190 countries representing hugely diverse cultures and tastes, all of whom we want to delight with our content slate. Reflecting the breadth and depth of the challenge, the systems and models focusing on the question have grown to be very sophisticated.

We approach the question from multiple angles but we have a core set of data pipelines and models that provide a foundation for decision making. To illustrate the complexity of just the core components, consider this high-level diagram:

In this diagram, gray boxes represent integrations to partner teams downstream and upstream, green boxes are various ETL pipelines, and blue boxes are Metaflow flows. These boxes encapsulate hundreds of advanced models and intricate business logic, handling massive amounts of data daily.

Despite its complexity, the system is managed by a relatively small team of engineers and data scientists autonomously. This is made possible by a few key features of Metaflow:

The team has also developed their own domain-specific libraries and configuration management tools, which help them improve and operate the system.

Deployment: Cache

To produce business value, all our Metaflow projects are deployed to work with other production systems. In many cases, the integration might be via shared tables in our data warehouse. In other cases, it is more convenient to share the results via a low-latency API.

Notably, not all API-based deployments require real-time evaluation, which we cover in the section below. We have a number of business-critical applications where some or all predictions can be precomputed, guaranteeing the lowest possible latency and operationally simple high availability at the global scale.

We have developed an officially supported pattern to cover such use cases. While the system relies on our internal caching infrastructure, you could follow the same pattern using services like Amazon ElasticCache or DynamoDB.

Example use case: Content performance visualization

The historical performance of titles is used by decision makers to understand and improve the film and series catalog. Performance metrics can be complex and are often best understood by humans with visualizations that break down the metrics across parameters of interest interactively. Content decision makers are equipped with self-serve visualizations through a real-time web application built with metaflow.Cache, which is accessed through an API provided with metaflow.Hosting.

A daily scheduled Metaflow job computes aggregate quantities of interest in parallel. The job writes a large volume of results to an online key-value store using metaflow.Cache. A Streamlit app houses the visualization software and data aggregation logic. Users can dynamically change parameters of the visualization application and in real-time a message is sent to a simple Metaflow hosting service which looks up values in the cache, performs computation, and returns the results as a JSON blob to the Streamlit application.

Deployment: Metaflow Hosting

For deployments that require an API and real-time evaluation, we provide an integrated model hosting service, Metaflow Hosting. Although details have evolved a lot, this old talk still gives a good overview of the service.

Metaflow Hosting is specifically geared towards hosting artifacts or models produced in Metaflow. This provides an easy to use interface on top of Netflix’s existing microservice infrastructure, allowing data scientists to quickly move their work from experimentation to a production grade web service that can be consumed over a HTTP REST API with minimal overhead.

Its key benefits include:

  • Simple decorator syntax to create RESTFull endpoints.
  • The back-end auto-scales the number of instances used to back your service based on traffic.
  • The back-end will scale-to-zero if no requests are made to it after a specified amount of time thereby saving cost particularly if your service requires GPUs to effectively produce a response.
  • Request logging, alerts, monitoring and tracing hooks to Netflix infrastructure

Consider the service similar to managed model hosting services like AWS Sagemaker Model Hosting, but tightly integrated with our microservice infrastructure.

Example use case: Media

We have a long history of using machine learning to process media assets, for instance, to personalize artwork and to help our creatives create promotional content efficiently. Processing large amounts of media assets is technically non-trivial and computationally expensive, so over the years, we have developed plenty of specialized infrastructure dedicated for this purpose in general, and infrastructure supporting media ML use cases in particular.

To demonstrate the benefits of Metaflow Hosting that provides a general-purpose API layer supporting both synchronous and asynchronous queries, consider this use case involving Amber, our feature store for media.

While Amber is a feature store, precomputing and storing all media features in advance would be infeasible. Instead, we compute and cache features in an on-demand basis, as depicted below:

When a service requests a feature from Amber, it computes the feature dependency graph and then sends one or more asynchronous requests to Metaflow Hosting, which places the requests in a queue, eventually triggering feature computations when compute resources become available. Metaflow Hosting caches the response, so Amber can fetch it after a while. We could have built a dedicated microservice just for this use case, but thanks to the flexibility of Metaflow Hosting, we were able to ship the feature faster with no additional operational burden.

Future Work

Our appetite to apply ML in diverse use cases is only increasing, so our Metaflow platform will keep expanding its footprint correspondingly and continue to provide delightful integrations to systems built by other teams at Netlfix. For instance, we have plans to work on improvements in the versioning layer, which wasn’t covered by this article, by giving more options for artifact and model management.

We also plan on building more integrations with other systems that are being developed by sister teams at Netflix. As an example, Metaflow Hosting models are currently not well integrated into model logging facilities — we plan on working on improving this to make models developed with Metaflow more integrated with the feedback loop critical in training new models. We hope to do this in a pluggable manner that would allow other users to integrate with their own logging systems.

Additionally we want to supply more ways Metaflow artifacts and models can be integrated into non-Metaflow environments and applications, e.g. JVM based edge service, so that Python-based data scientists can contribute to non-Python engineering systems easily. This would allow us to better bridge the gap between the quick iteration that Metaflow provides (in Python) with the requirements and constraints imposed by the infrastructure serving Netflix member facing requests.

If you are building business-critical ML or AI systems in your organization, join the Metaflow Slack community! We are happy to share experiences, answer any questions, and welcome you to contribute to Metaflow.

Acknowledgements:

Thanks to Wenbing Bai, Jan Florjanczyk, Michael Li, Aliki Mavromoustaki, and Sejal Rai for help with use cases and figures. Thanks to our OSS contributors for making Metaflow a better product.


Supporting Diverse ML Systems at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Bending pause times to your will with Generational ZGC

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/bending-pause-times-to-your-will-with-generational-zgc-256629c9386b

The surprising and not so surprising benefits of generations in the Z Garbage Collector.

By Danny Thomas, JVM Ecosystem Team

The latest long term support release of the JDK delivers generational support for the Z Garbage Collector.

More than half of our critical streaming video services are now running on JDK 21 with Generational ZGC, so it’s a good time to talk about our experience and the benefits we’ve seen. If you’re interested in how we use Java at Netflix, Paul Bakker’s talk How Netflix Really Uses Java, is a great place to start.

Reduced tail latencies

In both our GRPC and DGS Framework services, GC pauses are a significant source of tail latencies. That’s particularly true of our GRPC clients and servers, where request cancellations due to timeouts interact with reliability features such as retries, hedging and fallbacks. Each of these errors is a canceled request resulting in a retry so this reduction further reduces overall service traffic by this rate:

Errors rates per second. Previous week in white vs current cancellation rate in purple, as ZGC was enabled on a service cluster on November 15

Removing the noise of pauses also allows us to identify actual sources of latency end-to-end, which would otherwise be hidden in the noise, as maximum pause time outliers can be significant:

Maximum GC pause times by cause, for the same service cluster as above. Yes, those ZGC pauses really are usually under one millisecond

Efficiency

Even after we saw very promising results in our evaluation, we expected the adoption of ZGC to be a trade off: a little less application throughput, due to store and load barriers, work performed in thread local handshakes, and the GC competing with the application for resources. We considered that an acceptable trade off, as avoiding pauses provided benefits that would outweigh that overhead.

In fact, we’ve found for our services and architecture that there is no such trade off. For a given CPU utilization target, ZGC improves both average and P99 latencies with equal or better CPU utilization when compared to G1.

The consistency in request rates, request patterns, response time and allocation rates we see in many of our services certainly help ZGC, but we’ve found it’s equally capable of handling less consistent workloads (with exceptions of course; more on that below).

Operational simplicity

Service owners often reach out to us with questions about excessive pause times and for help with tuning. We have several frameworks that periodically refresh large amounts of on-heap data to avoid external service calls for efficiency. These periodic refreshes of on-heap data are great at taking G1 by surprise, resulting in pause time outliers well beyond the default pause time goal.

This long lived on-heap data was the major contributor to us not adopting non-generational ZGC previously. In the worst case we evaluated, non-generational ZGC caused 36% more CPU utilization than G1 for the same workload. That became a nearly 10% improvement with generational ZGC.

Half of all services required for streaming video use our Hollow library for on-heap metadata. Removing pauses as a concern allowed us to remove array pooling mitigations, freeing hundreds of megabytes of memory for allocations.

Operational simplicity also stems from ZGC’s heuristics and defaults. No explicit tuning has been required to achieve these results. Allocation stalls are rare, typically coinciding with abnormal spikes in allocation rates, and are shorter than the average pause times we saw with G1.

Memory overhead

We expected that losing compressed references on heaps < 32G, due to colored pointers requiring 64-bits object pointers, would be a major factor in the choice of a garbage collector.

We’ve found that while that’s an important consideration for stop-the-world GCs, that’s not the case for ZGC where even on small heaps, the increase in allocation rate is amortized by the efficiency and operational improvements. Our thanks to Erik Österlund at Oracle for explaining the less intuitive benefits of colored pointers when it comes to concurrent garbage collectors, which lead us to evaluating ZGC more broadly than initially planned.

In the majority of cases ZGC is also able to consistently make more memory available to the application:

Used vs available heap capacity following each GC cycle, for the same service cluster as above

ZGC has a fixed overhead 3% of the heap size, requiring more native memory than G1. Except in a couple of cases, there’s been no need to lower the maximum heap size to allow for more headroom, and those were services with greater than average native memory needs.

Reference processing is also only performed in major collections with ZGC. We paid particular attention to deallocation of direct byte buffers, but we haven’t seen any impact thus far. This difference in reference processing did cause a performance problem with JSON thread dump support, but that’s a unusual situation caused by a framework accidentally creating an unused ExecutorService instance for every request.

Transparent huge pages

Even if you’re not using ZGC, you probably should be using huge pages, and transparent huge pages is the most convenient way to use them.

ZGC uses shared memory for the heap and many Linux distributions configure shmem_enabled to never, which silently prevents ZGC from using huge pages with -XX:+UseTransparentHugePages.

Here we have a service deployed with no other change but shmem_enabled going from never to advise, reducing CPU utilization significantly:

Deployment moving from 4k to 2m pages. Ignore the gap, that’s our immutable deployment process temporarily doubling the cluster capacity

Our default configuration:

  • Sets heap minimum and maximums to equal size
  • Configures -XX:+UseTransparentHugePages -XX:+AlwaysPreTouch
  • Uses the following transparent_hugepage configuration:
echo madvise | sudo tee /sys/kernel/mm/transparent_hugepage/enabled
echo advise | sudo tee /sys/kernel/mm/transparent_hugepage/shmem_enabled
echo defer | sudo tee /sys/kernel/mm/transparent_hugepage/defrag
echo 1 | sudo tee /sys/kernel/mm/transparent_hugepage/khugepaged/defrag

What workloads weren’t a good fit?

There is no best garbage collector. Each trades off collection throughput, application latency and resource utilization depending on the goal of the garbage collector.

For the workloads that have performed better with G1 vs ZGC, we’ve found that they tend to be more throughput oriented, with very spiky allocation rates and long running tasks holding objects for unpredictable periods.

A notable example was a service where very spiky allocation rates and large numbers of long lived objects, which happened to be a particularly good fit for G1’s pause time goal and old region collection heuristics. It allowed G1 to avoid unproductive work in GC cycles that ZGC couldn’t.

The switch to ZGC by default has provided the perfect opportunity for application owners to think about their choice of garbage collector. Several batch/precompute cases had been using G1 by default, where they would have seen better throughput from the parallel collector. In one large precompute workload we saw a 6–8% improvement in application throughput, shaving an hour off the batch time, versus G1.

Try it for yourself!

Left unquestioned, assumptions and expectations could have caused us to miss one of the most impactful changes we’ve made to our operational defaults in a decade. We’d encourage you to try generational ZGC for yourself. It might surprise you as much as it surprised us.


Bending pause times to your will with Generational ZGC was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/evolving-from-rule-based-classifier-machine-learning-powered-auto-remediation-in-netflix-data-039d5efd115b

Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data Platform

by Binbing Hou, Stephanie Vezich Tamayo, Xiao Chen, Liang Tian, Troy Ristow, Haoyuan Wang, Snehal Chennuru, Pawan Dixit

This is the first of the series of our work at Netflix on leveraging data insights and Machine Learning (ML) to improve the operational automation around the performance and cost efficiency of big data jobs. Operational automation–including but not limited to, auto diagnosis, auto remediation, auto configuration, auto tuning, auto scaling, auto debugging, and auto testing–is key to the success of modern data platforms. In this blog post, we present our project on Auto Remediation, which integrates the currently used rule-based classifier with an ML service and aims to automatically remediate failed jobs without human intervention. We have deployed Auto Remediation in production for handling memory configuration errors and unclassified errors of Spark jobs and observed its efficiency and effectiveness (e.g., automatically remediating 56% of memory configuration errors and saving 50% of the monetary costs caused by all errors) and great potential for further improvements.

Introduction

At Netflix, hundreds of thousands of workflows and millions of jobs are running per day across multiple layers of the big data platform. Given the extensive scope and intricate complexity inherent to such a distributed, large-scale system, even if the failed jobs account for a tiny portion of the total workload, diagnosing and remediating job failures can cause considerable operational burdens.

For efficient error handling, Netflix developed an error classification service, called Pensive, which leverages a rule-based classifier for error classification. The rule-based classifier classifies job errors based on a set of predefined rules and provides insights for schedulers to decide whether to retry the job and for engineers to diagnose and remediate the job failure.

However, as the system has increased in scale and complexity, the rule-based classifier has been facing challenges due to its limited support for operational automation, especially for handling memory configuration errors and unclassified errors. Therefore, the operational cost increases linearly with the number of failed jobs. In some cases–for example, diagnosing and remediating job failures caused by Out-Of-Memory (OOM) errors–joint effort across teams is required, involving not only the users themselves, but also the support engineers and domain experts.

To address these challenges, we have developed a new feature, called Auto Remediation, which integrates the rule-based classifier with an ML service. Based on the classification from the rule-based classifier, it uses an ML service to predict retry success probability and retry cost and selects the best candidate configuration as recommendations; and a configuration service to automatically apply the recommendations. Its major advantages are below:

  • Integrated intelligence. Instead of completely deprecating the current rule-based classifier, Auto Remediation integrates the classifier with an ML service so that it can leverage the merits of both: the rule-based classifier provides static, deterministic classification results per error class, which is based on the context of domain experts; the ML service provides performance- and cost-aware recommendations per job, which leverages the power of ML. With the integrated intelligence, we can properly meet the requirements of remediating different errors.
  • Fully automated. The pipeline of classifying errors, getting recommendations, and applying recommendations is fully automated. It provides the recommendations together with the retry decision to the scheduler, and particularly uses an online configuration service to store and apply recommended configurations. In this way, no human intervention is required in the remediation process.
  • Multi-objective optimizations. Auto Remediation generates recommendations by considering both performance (i.e., the retry success probability) and compute cost efficiency (i.e., the monetary costs of running the job) to avoid blindly recommending configurations with excessive resource consumption. For example, for memory configuration errors, it searches multiple parameters related to the memory usage of job execution and recommends the combination that minimizes a linear combination of failure probability and compute cost.

These advantages have been verified by the production deployment for remediating Spark jobs’ failures. Our observations indicate that Auto Remediation can successfully remediate about 56% of all memory configuration errors by applying the recommended memory configurations online without human intervention; and meanwhile reduce the cost of about 50% due to its ability to recommend new configurations to make memory configurations successful and disable unnecessary retries for unclassified errors. We have also noted a great potential for further improvement by model tuning (see the section of Rollout in Production).

Rule-based Classifier: Basics and Challenges

Basics

Figure 1 illustrates the error classification service, i.e., Pensive, in the data platform. It leverages the rule-based classifier and is composed of three components:

  • Log Collector is responsible for pulling logs from different platform layers for error classification (e.g., the scheduler, job orchestrator, and compute clusters).
  • Rule Execution Engine is responsible for matching the collected logs against a set of predefined rules. A rule includes (1) the name, source, log, and summary, of the error and whether the error is restartable; and (2) the regex to identify the error from the log. For example, the rule with the name SparkDriverOOM includes the information indicating that if the stdout log of a Spark job can match the regex SparkOutOfMemoryError:, then this error is classified to be a user error, not restartable.
  • Result Finalizer is responsible for finalizing the error classification result based on the matched rules. If one or multiple rules are matched, then the classification of the first matched rule determines the final classification result (the rule priority is determined by the rule ordering, and the first rule has the highest priority). On the other hand, if no rules are matched, then this error will be considered unclassified.

Challenges

While the rule-based classifier is simple and has been effective, it is facing challenges due to its limited ability to handle the errors caused by misconfigurations and classify new errors:

  • Memory configuration errors. The rules-based classifier provides error classification results indicating whether to restart the job; however, for non-transient errors, it still relies on engineers to manually remediate the job. The most notable example is memory configuration errors. Such errors are generally caused by the misconfiguration of job memory. Setting an excessively small memory can result in Out-Of-Memory (OOM) errors while setting an excessively large memory can waste cluster memory resources. What’s more challenging is that some memory configuration errors require changing the configurations of multiple parameters. Thus, setting a proper memory configuration requires not only the manual operation but also the expertise of Spark job execution. In addition, even if a job’s memory configuration is initially well tuned, changes such as data size and job definition can cause performance to degrade. Given that about 600 memory configuration errors per month are observed in the data platform, timely remediation of memory configuration errors alone requires non-trivial engineering efforts.
  • Unclassified errors. The rule-based classifier relies on data platform engineers to manually add rules for recognizing errors based on the known context; otherwise, the errors will be unclassified. Due to the migrations of different layers of the data platform and the diversity of applications, existing rules can be invalid, and adding new rules requires engineering efforts and also depends on the deployment cycle. More than 300 rules have been added to the classifier, yet about 50% of all failures remain unclassified. For unclassified errors, the job may be retried multiple times with the default retry policy. If the error is non-transient, these failed retries incur unnecessary job running costs.

Evolving to Auto Remediation: Service Architecture

Methodology

To address the above-mentioned challenges, our basic methodology is to integrate the rule-based classifier with an ML service to generate recommendations, and use a configuration service to apply the recommendations automatically:

  • Generating recommendations. We use the rule-based classifier as the first pass to classify all errors based on predefined rules, and the ML service as the second pass to provide recommendations for memory configuration errors and unclassified errors.
  • Applying recommendations. We use an online configuration service to store and apply the recommended configurations. The pipeline is fully automated, and the services used to generate and apply recommendations are decoupled.

Service Integrations

Figure 2 illustrates the integration of the services generating and applying the recommendations in the data platform. The major services are as follows:

  • Nightingale is a service running the ML model trained using Metaflow and is responsible for generating a retry recommendation. The recommendation includes (1) whether the error is restartable; and (2) if so, the recommended configurations to restart the job.
  • ConfigService is an online configuration service. The recommended configurations are saved in ConfigService as a JSON patch with a scope defined to specify the jobs that can use the recommended configurations. When Scheduler calls ConfigService to get recommended configurations, Scheduler passes the original configurations to ConfigService and ConfigService returns the mutated configurations by applying the JSON patch to the original configurations. Scheduler can then restart the job with the mutated configurations (including the recommended configurations).
  • Pensive is an error classification service that leverages the rule-based classifier. It calls Nightingale to get recommendations and stores the recommendations to ConfigService so that it can be picked up by Scheduler to restart the job.
  • Scheduler is the service scheduling jobs (our current implementation is with Netflix Maestro). Each time when a job fails, it calls Pensive to get the error classification to decide whether to restart a job and calls ConfigServices to get the recommended configurations for restarting the job.

Figure 3 illustrates the sequence of service calls with Auto Remediation:

  1. Upon a job failure, Scheduler calls Pensive to get the error classification.
  2. Pensive classifies the error based on the rule-based classifier. If the error is identified to be a memory configuration error or an unclassified error, it calls Nightingale to get recommendations.
  3. With the obtained recommendations, Pensive updates the error classification result and saves the recommended configurations to ConfigService; and then returns the error classification result to Scheduler.
  4. Based on the error classification result received from Pensive, Scheduler determines whether to restart the job.
  5. Before restarting the job, Scheduler calls ConfigService to get the recommended configuration and retries the job with the new configuration.

Evolving to Auto Remediation: ML Service

Overview

The ML service, i.e., Nightingale, aims to generate a retry policy for a failed job that trades off between retry success probability and job running costs. It consists of two major components:

  • A prediction model that jointly estimates a) probability of retry success, and b) retry cost in dollars, conditional on properties of the retry.
  • An optimizer which explores the Spark configuration parameter space to recommend a configuration which minimizes a linear combination of retry failure probability and cost.

The prediction model is retrained offline daily, and is called by the optimizer to evaluate each candidate set of configuration parameter values. The optimizer runs in a RESTful service which is called upon job failure. If there is a feasible configuration solution from the optimization, the response includes this recommendation, which ConfigService uses to mutate the configuration for the retry. If there is no feasible solution–in other words, it is unlikely the retry will succeed by changing Spark configuration parameters alone–the response includes a flag to disable retries and thus eliminate wasted compute cost.

Prediction Model

Given that we want to explore how retry success and retry cost might change under different configuration scenarios, we need some way to predict these two values using the information we have about the job. Data Platform logs both retry success outcome and execution cost, giving us reliable labels to work with. Since we use a shared feature set to predict both targets, have good labels, and need to run inference quickly online to meet SLOs, we decided to formulate the problem as a multi-output supervised learning task. In particular, we use a simple Feedforward Multilayer Perceptron (MLP) with two heads, one to predict each outcome.

Training: Each record in the training set represents a potential retry which previously failed due to memory configuration errors or unclassified errors. The labels are: a) did retry fail, b) retry cost. The raw feature inputs are largely unstructured metadata about the job such as the Spark execution plan, the user who ran it, and the Spark configuration parameters and other job properties. We split these features into those that can be parsed into numeric values (e.g., Spark executor memory parameter) and those that cannot (e.g., user name). We used feature hashing to process the non-numeric values because they come from a high cardinality and dynamic set of values. We then create a lower dimensionality embedding which is concatenated with the normalized numeric values and passed through several more layers.

Inference: Upon passing validation audits, each new model version is stored in Metaflow Hosting, a service provided by our internal ML Platform. The optimizer makes several calls to the model prediction function for each incoming configuration recommendation request, described in more detail below.

Optimizer

When a job attempt fails, it sends a request to Nightingale with a job identifier. From this identifier, the service constructs the feature vector to be used in inference calls. As described previously, some of these features are Spark configuration parameters which are candidates to be mutated (e.g., spark.executor.memory, spark.executor.cores). The set of Spark configuration parameters was based on distilled knowledge of domain experts who work on Spark performance tuning extensively. We use Bayesian Optimization (implemented via Meta’s Ax library) to explore the configuration space and generate a recommendation. At each iteration, the optimizer generates a candidate parameter value combination (e.g., spark.executor.memory=7192 mb, spark.executor.cores=8), then evaluates that candidate by calling the prediction model to estimate retry failure probability and cost using the candidate configuration (i.e., mutating their values in the feature vector). After a fixed number of iterations is exhausted, the optimizer returns the “best” configuration solution (i.e., that which minimized the combined retry failure and cost objective) for ConfigService to use if it is feasible. If no feasible solution is found, we disable retries.

One downside of the iterative design of the optimizer is that any bottleneck can block completion and cause a timeout, which we initially observed in a non-trivial number of cases. Upon further profiling, we found that most of the latency came from the candidate generated step (i.e., figuring out which directions to step in the configuration space after the previous iteration’s evaluation results). We found that this issue had been raised to Ax library owners, who added GPU acceleration options in their API. Leveraging this option decreased our timeout rate substantially.

Rollout in Production

We have deployed Auto Remediation in production to handle memory configuration errors and unclassified errors for Spark jobs. Besides the retry success probability and cost efficiency, the impact on user experience is the major concern:

  • For memory configuration errors: Auto remediation improves user experience because the job retry is rarely successful without a new configuration for memory configuration errors. This means that a successful retry with the recommended configurations can reduce the operational loads and save job running costs, while a failed retry does not make the user experience worse.
  • For unclassified errors: Auto remediation recommends whether to restart the job if the error cannot be classified by existing rules in the rule-based classifier. In particular, if the ML model predicts that the retry is very likely to fail, it will recommend disabling the retry, which can save the job running costs for unnecessary retries. For cases in which the job is business-critical and the user prefers always retrying the job even if the retry success probability is low, we can add a new rule to the rule-based classifier so that the same error will be classified by the rule-based classifier next time, skipping the recommendations of the ML service. This presents the advantages of the integrated intelligence of the rule-based classifier and the ML service.

The deployment in production has demonstrated that Auto Remediation can provide effective configurations for memory configuration errors, successfully remediating about 56% of all memory configuration without human intervention. It also decreases compute cost of these jobs by about 50% because it can either recommend new configurations to make the retry successful or disable unnecessary retries. As tradeoffs between performance and cost efficiency are tunable, we can decide to achieve a higher success rate or more cost savings by tuning the ML service.

It is worth noting that the ML service is currently adopting a conservative policy to disable retries. As discussed above, this is to avoid the impact on the cases that users prefer always retrying the job upon job failures. Although these cases are expected and can be addressed by adding new rules to the rule-based classifier, we consider tuning the objective function in an incremental manner to gradually disable more retries is helpful to provide desirable user experience. Given the current policy to disable retries is conservative, Auto Remediation presents a great potential to eventually bring much more cost savings without affecting the user experience.

Beyond Error Handling: Towards Right Sizing

Auto Remediation is our first step in leveraging data insights and Machine Learning (ML) for improving user experience, reducing the operational burden, and improving cost efficiency of the data platform. It focuses on automating the remediation of failed jobs, but also paves the path to automate operations other than error handling.

One of the initiatives we are taking, called Right Sizing, is to reconfigure scheduled big data jobs to request the proper resources for job execution. For example, we have noted that the average requested executor memory of Spark jobs is about four times their max used memory, indicating a significant overprovision. In addition to the configurations of the job itself, the resource overprovision of the container that is requested to execute the job can also be reduced for cost savings. With heuristic- and ML-based methods, we can infer the proper configurations of job execution to minimize resource overprovisions and save millions of dollars per year without affecting the performance. Similar to Auto Remediation, these configurations can be automatically applied via ConfigService without human intervention. Right Sizing is in progress and will be covered with more details in a dedicated technical blog post later. Stay tuned.

Acknowledgements

Auto Remediation is a joint work of the engineers from different teams and organizations. This work would have not been possible without the solid, in-depth collaborations. We would like to appreciate all folks, including Spark experts, data scientists, ML engineers, the scheduler and job orchestrator engineers, data engineers, and support engineers, for sharing the context and providing constructive suggestions and valuable feedback (e.g., John Zhuge, Jun He, Holden Karau, Samarth Jain, Julian Jaffe, Batul Shajapurwala, Michael Sachs, Faisal Siddiqi).


Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Announcing bpftop: Streamlining eBPF performance optimization

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/announcing-bpftop-streamlining-ebpf-performance-optimization-6a727c1ae2e5

By Jose Fernandez

Today, we are thrilled to announce the release of bpftop, a command-line tool designed to streamline the performance optimization and monitoring of eBPF applications. As Netflix increasingly adopts eBPF [1, 2], applying the same rigor to these applications as we do to other managed services is imperative. Striking a balance between eBPF’s benefits and system load is crucial, ensuring it enhances rather than hinders our operational efficiency. This tool enables Netflix to embrace eBPF’s potential.

Introducing bpftop

bpftop provides a dynamic real-time view of running eBPF programs. It displays the average execution runtime, events per second, and estimated total CPU % for each program. This tool minimizes overhead by enabling performance statistics only while it is active.

bpftop simplifies the performance optimization process for eBPF programs by enabling an efficient cycle of benchmarking, code refinement, and immediate feedback. Without bpftop, optimization efforts would require manual calculations, adding unnecessary complexity to the process. With bpftop, users can quickly establish a baseline, implement improvements, and verify enhancements, streamlining the process.

A standout feature of this tool is its ability to display the statistics in time series graphs. This approach can uncover patterns and trends that could be missed otherwise.

How it works

bpftop uses the BPF_ENABLE_STATS syscall command to enable global eBPF runtime statistics gathering, which is disabled by default to reduce performance overhead. It collects these statistics every second, calculating the average runtime, events per second, and estimated CPU utilization for each eBPF program within that sample period. This information is displayed in a top-like tabular format or a time series graph over a 10s moving window. Once bpftop terminates, it turns off the statistics-gathering function. The tool is written in Rust, leveraging the libbpf-rs and ratatui crates.

Getting started

Visit the project’s GitHub page to learn more about using the tool. We’ve open-sourced bpftop under the Apache 2 license and look forward to contributions from the community.


Announcing bpftop: Streamlining eBPF performance optimization was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Sequential A/B Testing Keeps the World Streaming Netflix Part 1: Continuous Data

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/sequential-a-b-testing-keeps-the-world-streaming-netflix-part-1-continuous-data-cba6c7ed49df

Michael Lindon, Chris Sanden, Vache Shirikian, Yanjun Liu, Minal Mishra, Martin Tingley

Using sequential anytime-valid hypothesis testing procedures to safely release software

1. Spot the Difference

Can you spot any difference between the two data streams below? Each observation is the time interval between a Netflix member hitting the play button and playback commencing, i.e., play-delay. These observations are from a particular type of A/B test that Netflix runs called a software canary or regression-driven experiment. More on that below — for now, what’s important is that we want to quickly and confidently identify any difference in the distribution of play-delay — or conclude that, within some tolerance, there is no difference.

In this blog post, we will develop a statistical procedure to do just that, and describe the impact of these developments at Netflix. The key idea is to switch from a “fixed time horizon” to an “any-time valid” framing of the problem.

Sequentially comparing two streams of measurements from treatment and control
Figure 1. An example data stream for an A/B test where each observation represents play-delay for the control (left) and treatment (right). Can you spot any differences in the statistical distributions between the two data streams?

2. Safe software deployment, canary testing, and play-delay

Software engineering readers of this blog are likely familiar with unit, integration and load testing, as well as other testing practices that aim to prevent bugs from reaching production systems. Netflix also performs canary tests — software A/B tests between current and newer software versions. To learn more, see our previous blog post on Safe Updates of Client Applications.

The purpose of a canary test is twofold: to act as a quality-control gate that catches bugs prior to full release, and to measure performance of the new software in the wild. This is carried out by performing a randomized controlled experiment on a small subset of users, where the treatment group receives the new software update and the control group continues to run the existing software. If any bugs or performance regressions are observed in the treatment group, then the full-scale release can be prevented, limiting the “impact radius” among the user base.

One of the metrics Netflix monitors in canary tests is how long it takes for the video stream to start when a title is requested by a user. Monitoring this “play-delay” metric throughout releases ensures that the streaming performance of Netflix only ever improves as we release newer versions of the Netflix client. In Figure 1, the left side shows a real-time stream of play-delay measurements from users running the existing version of the Netflix client, while the right side shows play-delay measurements from users running the updated version. We ask ourselves: Are users of the updated client experiencing longer play-delays?

We consider any increase in play-delay to be a serious performance regression and would prevent the release if we detect an increase. Critically, testing for differences in means or medians is not sufficient and does not provide a complete picture. For example, one situation we might face is that the median or mean play-delay is the same in treatment and control, but the treatment group experiences an increase in the upper quantiles of play-delay. This corresponds to the Netflix experience being degraded for those who already experience high play delays — likely our members on slow or unstable internet connections. Such changes should not be ignored by our testing procedure.

For a complete picture, we need to be able to reliably and quickly detect an upward shift in any part of the play-delay distribution. That is, we must do inference on and test for any differences between the distributions of play-delay in treatment and control.

To summarize, here are the design requirements of our canary testing system:

  1. Identify bugs and performance regressions, as measured by play-delay, as quickly as possible. Rationale: To minimize member harm, if there is any problem with the streaming quality experienced by users in the treatment group we need to abort the canary and roll back the software change as quickly as possible.
  2. Strictly control false positive (false alarm) probabilities. Rationale: This system is part of a semi-automated process for all client deployments. A false positive test unnecessarily interrupts the software release process, reducing the velocity of software delivery and sending developers looking for bugs that do not exist.
  3. This system should be able to detect any change in the distribution. Rationale: We care not only about changes in the mean or median, but also about changes in tail behaviour and other quantiles.

We now build out a sequential testing procedure that meets these design requirements.

3. Sequential Testing: The Basics

Standard statistical tests are fixed-n or fixed-time horizon: the analyst waits until some pre-set amount of data is collected, and then performs the analysis a single time. The classic t-test, the Kolmogorov-Smirnov test, and the Mann-Whitney test are all examples of fixed-n tests. A limitation of fixed-n tests is that they can only be performed once — yet in situations like the above, we want to be testing frequently to detect differences as soon as possible. If you apply a fixed-n test more than once, then you forfeit the Type-I error or false positive guarantee.

Here’s a quick illustration of how fixed-n tests fail under repeated analysis. In the following figure, each red line traces out the p-value when the Mann-Whitney test is repeatedly applied to a data set as 10,000 observations accrue in both treatment and control. Each red line shows an independent simulation, and in each case, there is no difference between treatment and control: these are simulated A/A tests.

The black dots mark where the p-value falls below the standard 0.05 rejection threshold. An alarming 70% of simulations declare a significant difference at some point in time, even though, by construction, there is no difference: the actual false positive rate is much higher than the nominal 0.05. Exactly the same behaviour would be observed for the Kolmogorov-Smirnov test.

increased false positives when peeking at mann-whitney test
Figure 2. 100 Sample paths of the p-value process simulated under the null hypothesis shown in red. The dotted black line indicates the nominal alpha=0.05 level. Black dots indicate where the p-value process dips below the alpha=0.05 threshold, indicating a false rejection of the null hypothesis. A total of 66 out of 100 A/A simulations falsely rejected the null hypothesis.

This is a manifestation of “peeking”, and much has been written about the downside risks of this practice (see, for example, Johari et al. 2017). If we restrict ourselves to correctly applied fixed-n statistical tests, where we analyze the data exactly once, we face a difficult tradeoff:

  • Perform the test early on, after a small amount of data has been collected. In this case, we will only be powered to detect larger regressions. Smaller performance regressions will not be detected, and we run the risk of steadily eroding the member experience as small regressions accrue.
  • Perform the test later, after a large amount of data has been collected. In this case, we are powered to detect small regressions — but in the case of large regressions, we expose members to a bad experience for an unnecessarily long period of time.

Sequential, or “any-time valid”, statistical tests overcome these limitations. They permit for peeking –in fact, they can be applied after every new data point arrives– while providing false positive, or Type-I error, guarantees that hold throughout time. As a result, we can continuously monitor data streams like in the image above, using confidence sequences or sequential p-values, and rapidly detect large regressions while eventually detecting small regressions.

Despite relatively recent adoption in the context of digital experimentation, these methods have a long academic history, with initial ideas dating back to Abraham Wald’s Sequential Tests of Statistical Hypotheses from 1945. Research in this area remains active, and Netflix has made a number of contributions in the last few years (see the references in these papers for a more complete literature review):

In this and following blogs, we will describe both the methods we’ve developed and their applications at Netflix. The remainder of this post discusses the first paper above, which was published at KDD ’22 (and available on ArXiV). We will keep it high level — readers interested in the technical details can consult the paper.

4. A sequential testing solution

Differences in Distributions

At any point in time, we can estimate the empirical quantile functions for both treatment and control, based on the data observed so far.

empirical quantile functions for treatment and control data
Figure 3: Empirical quantile function for control (left) and treatment (right) at a snapshot in time after starting the canary experiment. This is from actual Netflix data, so we’ve suppressed numerical values on the y-axis.

These two plots look pretty close, but we can do better than an eyeball comparison — and we want the computer to be able to continuously evaluate if there is any significant difference between the distributions. Per the design requirements, we also wish to detect large effects early, while preserving the ability to detect small effects eventually — and we want to maintain the false positive probability at a nominal level while permitting continuous analysis (aka peeking).

That is, we need a sequential test on the difference in distributions.

Obtaining “fixed-horizon” confidence bands for the quantile function can be achieved using the DKWM inequality. To obtain time-uniform confidence bands, however, we use the anytime-valid confidence sequences from Howard and Ramdas (2022) [arxiv version]. As the coverage guarantee from these confidence bands holds uniformly across time, we can watch them become tighter without being concerned about peeking. As more data points stream in, these sequential confidence bands continue to shrink in width, which means any difference in the distribution functions — if it exists — will eventually become apparent.

Anytime-valid confidence bands on treatment and control quantile functions
Figure 4: 97.5% Time-Uniform Confidence bands on the quantile function for control (left) and treatment (right)

Note each frame corresponds to a point in time after the experiment began, not sample size. In fact, there is no requirement that each treatment group has the same sample size.

Differences are easier to see by visualizing the difference between the treatment and control quantile functions.

Confidence sequences on quantile differences and sequential p-value
Figure 5: 95% Time-Uniform confidence band on the quantile difference function Q_b(p) — Q_a(p) (left). The sequential p-value (right).

As the sequential confidence band on the treatment effect quantile function is anytime-valid, the inference procedure becomes rather intuitive. We can continue to watch these confidence bands tighten, and if at any point the band no longer covers zero at any quantile, we can conclude that the distributions are different and stop the test. In addition to the sequential confidence bands, we can also construct a sequential p-value for testing that the distributions differ. Note from the animation that the moment the 95% confidence band over quantile treatment effects excludes zero is the same moment that the sequential p-value falls below 0.05: as with fixed-n tests, there is consistency between confidence intervals and p-values.

There are many multiple testing concerns in this application. Our solution controls Type-I error across all quantiles, all treatment groups, and all joint sample sizes simultaneously (see our paper, or Howard and Ramdas for details). Results hold for all quantiles, and for all times.

5. Impact at Netflix

Releasing new software always carries risk, and we always want to reduce the risk of service interruptions or degradation to the member experience. Our canary testing approach is another layer of protection for preventing bugs and performance regressions from slipping into production. It’s fully automated and has become an integral part of the software delivery process at Netflix. Developers can push to production with peace of mind, knowing that bugs and performance regressions will be rapidly caught. The additional confidence empowers developers to push to production more frequently, reducing the time to market for upgrades to the Netflix client and increasing our rate of software delivery.

So far this system has successfully prevented a number of serious bugs from reaching our end users. We detail one example.

Case study: Safe Rollout of Netflix Client Application

Figures 3–5 are taken from a canary test in which the behaviour of the client application was modified application (actual numerical values of play-delay have been suppressed). As we can see, the canary test revealed that the new version of the client increases a number of quantiles of play-delay, with the median and 75% percentile of play experiencing relative increases of at least 0.5% and 1% respectively. The timeseries of the sequential p-value shows that, in this case, we were able to reject the null of no change in distribution at the 0.05 level after about 60 seconds. This provides rapid feedback in the software delivery process, allowing developers to test the performance of new software and quickly iterate.

6. What’s next?

If you are curious about the technical details of the sequential tests for quantiles developed here, you can learn all about the math in our KDD paper (also available on arxiv).

You might also be wondering what happens if the data are not continuous measurements. Errors and exceptions are critical metrics to log when deploying software, as are many other metrics which are best defined in terms of counts. Stay tuned — our next post will develop sequential testing procedures for count data.


Sequential A/B Testing Keeps the World Streaming Netflix
Part 1: Continuous Data
was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Introducing SafeTest: A Novel Approach to Front End Testing

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/introducing-safetest-a-novel-approach-to-front-end-testing-37f9f88c152d

by Moshe Kolodny

In this post, we’re excited to introduce SafeTest, a revolutionary library that offers a fresh perspective on End-To-End (E2E) tests for web-based User Interface (UI) applications.

The Challenges of Traditional UI Testing

Traditionally, UI tests have been conducted through either unit testing or integration testing (also referred to as End-To-End (E2E) testing). However, each of these methods presents a unique trade-off: you have to choose between controlling the test fixture and setup, or controlling the test driver.

For instance, when using react-testing-library, a unit testing solution, you maintain complete control over what to render and how the underlying services and imports should behave. However, you lose the ability to interact with an actual page, which can lead to a myriad of pain points:

  • Difficulty in interacting with complex UI elements like <Dropdown /> components.
  • Inability to test CORS setup or GraphQL calls.
  • Lack of visibility into z-index issues affecting click-ability of buttons.
  • Complex and unintuitive authoring and debugging of tests.

Conversely, using integration testing tools like Cypress or Playwright provides control over the page, but sacrifices the ability to instrument the bootstrapping code for the app. These tools operate by remotely controlling a browser to visit a URL and interact with the page. This approach has its own set of challenges:

  • Difficulty in making calls to an alternative API endpoint without implementing custom network layer API rewrite rules.
  • Inability to make assertions on spies/mocks or execute code within the app.
  • Testing something like dark mode entails clicking the theme switcher or knowing the localStorage mechanism to override.
  • Inability to test segments of the app, for example if a component is only visible after clicking a button and waiting for a 60 second timer to countdown, the test will need to run those actions and will be at least a minute long.

Recognizing these challenges, solutions like E2E Component Testing have emerged, with offerings from Cypress and Playwright. While these tools attempt to rectify the shortcomings of traditional integration testing methods, they have other limitations due to their architecture. They start a dev server with bootstrapping code to load the component and/or setup code you want, which limits their ability to handle complex enterprise applications that might have OAuth or a complex build pipeline. Moreover, updating TypeScript usage could break your tests until the Cypress/Playwright team updates their runner.

Welcome to SafeTest

SafeTest aims to address these issues with a novel approach to UI testing. The main idea is to have a snippet of code in our application bootstrapping stage that injects hooks to run our tests (see the How Safetest Works sections for more info on what this is doing). Note that how this works has no measurable impact on the regular usage of your app since SafeTest leverages lazy loading to dynamically load the tests only when running the tests (in the README example, the tests aren’t in the production bundle at all). Once that’s in place, we can use Playwright to run regular tests, thereby achieving the ideal browser control we want for our tests.

This approach also unlocks some exciting features:

  • Deep linking to a specific test without needing to run a node test server.
  • Two-way communication between the browser and test (node) context.
  • Access to all the DX features that come with Playwright (excluding the ones that come with @playwright/test).
  • Video recording of tests, trace viewing, and pause page functionality for trying out different page selectors/actions.
  • Ability to make assertions on spies in the browser in node, matching snapshot of the call within the browser.

Test Examples with SafeTest

SafeTest is designed to feel familiar to anyone who has conducted UI tests before, as it leverages the best parts of existing solutions. Here’s an example of how to test an entire application:

import { describe, it, expect } from 'safetest/jest';
import { render } from 'safetest/react';

describe('my app', () => {
it('loads the main page', async () => {
const { page } = await render();

await expect(page.getByText('Welcome to the app')).toBeVisible();
expect(await page.screenshot()).toMatchImageSnapshot();
});
});

We can just as easily test a specific component

import { describe, it, expect, browserMock } from 'safetest/jest';
import { render } from 'safetest/react';

describe('Header component', () => {
it('has a normal mode', async () => {
const { page } = await render(<Header />);

await expect(page.getByText('Admin')).not.toBeVisible();
});

it('has an admin mode', async () => {
const { page } = await render(<Header admin={true} />);

await expect(page.getByText('Admin')).toBeVisible();
});

it('calls the logout handler when signing out', async () => {
const spy = browserMock.fn();
const { page } = await render(<Header handleLogout={fn} />);

await page.getByText('logout').click();
expect(await spy).toHaveBeenCalledWith();
});
});

Leveraging Overrides

SafeTest utilizes React Context to allow for value overrides during tests. For an example of how this works, let’s assume we have a fetchPeople function used in a component:

import { useAsync } from 'react-use';
import { fetchPerson } from './api/person';

export const People: React.FC = () => {
const { data: people, loading, error } = useAsync(fetchPeople);

if (loading) return <Loader />;
if (error) return <ErrorPage error={error} />;
return <Table data={data} rows=[...] />;
}

We can modify the People component to use an Override:

 import { fetchPerson } from './api/person';
+import { createOverride } from 'safetest/react';

+const FetchPerson = createOverride(fetchPerson);

export const People: React.FC = () => {
+ const fetchPeople = FetchPerson.useValue();
const { data: people, loading, error } = useAsync(fetchPeople);

if (loading) return <Loader />;
if (error) return <ErrorPage error={error} />;
return <Table data={data} rows=[...] />;
}

Now, in our test, we can override the response for this call:

const pending = new Promise(r => { /* Do nothing */ });
const resolved = [{name: 'Foo', age: 23], {name: 'Bar', age: 32]}];
const error = new Error('Whoops');

describe('People', () => {
it('has a loading state', async () => {
const { page } = await render(
<FetchPerson.Override with={() => () => pending}>
<People />
</FetchPerson.Override>
);

await expect(page.getByText('Loading')).toBeVisible();
});

it('has a loaded state', async () => {
const { page } = await render(
<FetchPerson.Override with={() => async () => resolved}>
<People />
</FetchPerson.Override>
);

await expect(page.getByText('User: Foo, name: 23')).toBeVisible();
});

it('has an error state', async () => {
const { page } = await render(
<FetchPerson.Override with={() => async () => { throw error }}>
<People />
</FetchPerson.Override>
);

await expect(page.getByText('Error getting users: "Whoops"')).toBeVisible();
});
});

The render function also accepts a function that will be passed the initial app component, allowing for the injection of any desired elements anywhere in the app:

it('has a people loaded state', async () => {
const { page } = await render(app =>
<FetchPerson.Override with={() => async () => resolved}>
{app}
</FetchPerson.Override>
);
await expect(page.getByText('User: Foo, name: 23')).toBeVisible();
});

With overrides, we can write complex test cases such as ensuring a service method which combines API requests from /foo, /bar, and /baz, has the correct retry mechanism for just the failed API requests and still maps the return value correctly. So if /bar takes 3 attempts to resolve the method will make a total of 5 API calls.

Overrides aren’t limited to just API calls (since we can use also use page.route), we can also override specific app level values like feature flags or changing some static value:

+const UseFlags = createOverride(useFlags);
export const Admin = () => {
+ const useFlags = UseFlags.useValue();
const { isAdmin } = useFlags();
if (!isAdmin) return <div>Permission error</div>;
// ...
}

+const Language = createOverride(navigator.language);
export const LanguageChanger = () => {
- const language = navigator.language;
+ const language = Language.useValue();
return <div>Current language is { language } </div>;
}

describe('Admin', () => {
it('works with admin flag', async () => {
const { page } = await render(
<UseIsAdmin.Override with={oldHook => {
const oldFlags = oldHook();
return { ...oldFlags, isAdmin: true };
}}>
<MyComponent />
</UseIsAdmin.Override>
);

await expect(page.getByText('Permission error')).not.toBeVisible();
});
});

describe('Language', () => {
it('displays', async () => {
const { page } = await render(
<Language.Override with={old => 'abc'}>
<MyComponent />
</Language.Override>
);

await expect(page.getByText('Current language is abc')).toBeVisible();
});
});

Overrides are a powerful feature of SafeTest and the examples here only scratch the surface. For more information and examples, refer to the Overrides section on the README.

Reporting

SafeTest comes out of the box with powerful reporting capabilities, such as automatic linking of video replays, Playwright trace viewer, and even deep link directly to the mounted tested component. The SafeTest repo README links to all the example apps as well as the reports

Image of SafeTest report showing a video of a test run

SafeTest in Corporate Environments

Many large corporations need a form of authentication to use the app. Typically, navigating to localhost:3000 just results in a perpetually loading page. You need to go to a different port, like localhost:8000, which has a proxy server to check and/or inject auth credentials into underlying service calls. This limitation is one of the main reasons that Cypress/Playwright Component Tests aren’t suitable for use at Netflix.

However, there’s usually a service that can generate test users whose credentials we can use to log in and interact with the application. This facilitates creating a light wrapper around SafeTest to automatically generate and assume that test user. For instance, here’s basically how we do it at Netflix:

import { setup } from 'safetest/setup';
import { createTestUser, addCookies } from 'netflix-test-helper';

type Setup = Parameters<typeof setup>[0] & {
extraUserOptions?: UserOptions;
};


export const setupNetflix = (options: Setup) => {
setup({
...options,
hooks: { beforeNavigate: [async page => addCookies(page)] },
});

beforeAll(async () => {
createTestUser(options.extraUserOptions)
});
};

After setting this up, we simply import the above package in place of where we would have used safetest/setup.

Beyond React

While this post focused on how SafeTest works with React, it’s not limited to just React. SafeTest also works with Vue, Svelte, Angular, and even can run on NextJS or Gatsby. It also runs using either Jest or Vitest based on which test runner your scaffolding started you off with. The examples folder demonstrates how to use SafeTest with different tooling combinations, and we encourage contributions to add more cases.

At its core, SafeTest is an intelligent glue for a test runner, a UI library, and a browser runner. Though the most common usage at Netflix employs Jest/React/Playwright, it’s easy to add more adapters for other options.

Conclusion

SafeTest is a powerful testing framework that’s being adopted within Netflix. It allows for easy authoring of tests and provides comprehensive reports when and how any failures occurred, complete with links to view a playback video or manually run the test steps to see what broke. We’re excited to see how it will revolutionize UI testing and look forward to your feedback and contributions.


Introducing SafeTest: A Novel Approach to Front End Testing was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Rebuilding Netflix Video Processing Pipeline with Microservices

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/rebuilding-netflix-video-processing-pipeline-with-microservices-4e5e6310e359

Liwei Guo, Anush Moorthy, Li-Heng Chen, Vinicius Carvalho, Aditya Mavlankar, Agata Opalach, Adithya Prakash, Kyle Swanson, Jessica Tweneboah, Subbu Venkatrav, Lishan Zhu

This is the first blog in a multi-part series on how Netflix rebuilt its video processing pipeline with microservices, so we can maintain our rapid pace of innovation and continuously improve the system for member streaming and studio operations. This introductory blog focuses on an overview of our journey. Future blogs will provide deeper dives into each service, sharing insights and lessons learned from this process.

The Netflix video processing pipeline went live with the launch of our streaming service in 2007. Since then, the video pipeline has undergone substantial improvements and broad expansions:

  • Starting with Standard Dynamic Range (SDR) at Standard-Definitions, we expanded the encoding pipeline to 4K and High Dynamic Range (HDR) which enabled support for our premium offering.
  • We moved from centralized linear encoding to distributed chunk-based encoding. This architecture shift greatly reduced the processing latency and increased system resiliency.
  • Moving away from the use of dedicated instances that were constrained in quantity, we tapped into Netflix’s internal trough created due to autoscaling microservices, leading to significant improvements in computation elasticity as well as resource utilization efficiency.
  • We rolled out encoding innovations such as per-title and per-shot optimizations, which provided significant quality-of-experience (QoE) improvement to Netflix members.
  • By integrating with studio content systems, we enabled the pipeline to leverage rich metadata from the creative side and create more engaging member experiences like interactive storytelling.
  • We expanded pipeline support to serve our studio/content-development use cases, which had different latency and resiliency requirements as compared to the traditional streaming use case.

Our experience of the last decade-and-a-half has reinforced our conviction that an efficient, flexible video processing pipeline that allows us to innovate and support our streaming service, as well as our studio partners, is critical to the continued success of Netflix. To that end, the Video and Image Encoding team in Encoding Technologies (ET) has spent the last few years rebuilding the video processing pipeline on our next-generation microservice-based computing platform Cosmos.

From Reloaded to Cosmos

Reloaded

Starting in 2014, we developed and operated the video processing pipeline on our third-generation platform Reloaded. Reloaded was well-architected, providing good stability, scalability, and a reasonable level of flexibility. It served as the foundation for numerous encoding innovations developed by our team.

When Reloaded was designed, we focused on a single use case: converting high-quality media files (also known as mezzanines) received from studios into compressed assets for Netflix streaming. Reloaded was created as a single monolithic system, where developers from various media teams in ET and our platform partner team Content Infrastructure and Solutions (CIS)¹ worked on the same codebase, building a single system that handled all media assets. Over the years, the system expanded to support various new use cases. This led to a significant increase in system complexity, and the limitations of Reloaded began to show:

  • Coupled functionality: Reloaded was composed of a number of worker modules and an orchestration module. The setup of a new Reloaded module and its integration with the orchestration required a non-trivial amount of effort, which led to a bias towards augmentation rather than creation when developing new functionalities. For example, in Reloaded the video quality calculation was implemented inside the video encoder module. With this implementation, it was extremely difficult to recalculate video quality without re-encoding.
  • Monolithic structure: Since Reloaded modules were often co-located in the same repository, it was easy to overlook code-isolation rules and there was quite a bit of unintended reuse of code across what should have been strong boundaries. Such reuse created tight coupling and reduced development velocity. The tight coupling among modules further forced us to deploy all modules together.
  • Long release cycles: The joint deployment meant that there was increased fear of unintended production outages as debugging and rollback can be difficult for a deployment of this size. This drove the approach of the “release train”. Every two weeks, a “snapshot” of all modules was taken, and promoted to be a “release candidate”. This release candidate then went through exhaustive testing which attempted to cover as large a surface area as possible. This testing stage took about two weeks. Thus, depending on when the code change was merged, it could take anywhere between two and four weeks to reach production.

As time progressed and functionalities grew, the rate of new feature contributions in Reloaded dropped. Several promising ideas were abandoned owing to the outsized work needed to overcome architectural limitations. The platform that had once served us well was now becoming a drag on development.

Cosmos

As a response, in 2018 the CIS and ET teams started developing the next-generation platform, Cosmos. In addition to the scalability and the stability that the developers already enjoyed in Reloaded, Cosmos aimed to significantly increase system flexibility and feature development velocity. To achieve this, Cosmos was developed as a computing platform for workflow-driven, media-centric microservices.

The microservice architecture provides strong decoupling between services. Per-microservice workflow support eases the burden of implementing complex media workflow logic. Finally, relevant abstractions allow media algorithm developers to focus on the manipulation of video and audio signals rather than on infrastructural concerns. A comprehensive list of benefits offered by Cosmos can be found in the linked blog.

Building the Video Processing Pipeline in Cosmos

Service Boundaries

In the microservice architecture, a system is composed of a number of fine-grained services, with each service focusing on a single functionality. So the first (and arguably the most important) thing is to identify boundaries and define services.

In our pipeline, as media assets travel through creation to ingest to delivery, they go through a number of processing steps such as analyses and transformations. We analyzed these processing steps to identify “boundaries” and grouped them into different domains, which in turn became the building blocks of the microservices we engineered.

As an example, in Reloaded, the video encoding module bundles 5 steps:

1. divide the input video into small chunks

2. encode each chunk independently

3. calculate the quality score (VMAF) of each chunk

4. assemble all the encoded chunks into a single encoded video

5. aggregate quality scores from all chunks

From a system perspective, the assembled encoded video is of primary concern while the internal chunking and separate chunk encodings exist in order to fulfill certain latency and resiliency requirements. Further, as alluded to above, the video quality calculation provides a totally separate functionality as compared to the encoding service.

Thus, in Cosmos, we created two independent microservices: Video Encoding Service (VES) and Video Quality Service (VQS), each of which serves a clear, decoupled function. As implementation details, the chunked encoding and the assembling were abstracted away into the VES.

Video Services

The approach outlined above was applied to the rest of the video processing pipeline to identify functionalities and hence service boundaries, leading to the creation of the following video services².

  1. Video Inspection Service (VIS): This service takes a mezzanine as the input and performs various inspections. It extracts metadata from different layers of the mezzanine for downstream services. In addition, the inspection service flags issues if invalid or unexpected metadata is observed and provides actionable feedback to the upstream team.
  2. Complexity Analysis Service (CAS): The optimal encoding recipe is highly content-dependent. This service takes a mezzanine as the input and performs analysis to understand the content complexity. It calls Video Encoding Service for pre-encoding and Video Quality Service for quality evaluation. The results are saved to a database so they can be reused.
  3. Ladder Generation Service (LGS): This service creates an entire bitrate ladder for a given encoding family (H.264, AV1, etc.). It fetches the complexity data from CAS and runs the optimization algorithm to create encoding recipes. The CAS and LGS cover much of the innovations that we have previously presented in our tech blogs (per-title, mobile encodes, per-shot, optimized 4K encoding, etc.). By wrapping ladder generation into a separate microservice (LGS), we decouple the ladder optimization algorithms from the creation and management of complexity analysis data (which resides in CAS). We expect this to give us greater freedom for experimentation and a faster rate of innovation.
  4. Video Encoding Service (VES): This service takes a mezzanine and an encoding recipe and creates an encoded video. The recipe includes the desired encoding format and properties of the output, such as resolution, bitrate, etc. The service also provides options that allow fine-tuning latency, throughput, etc., depending on the use case.
  5. Video Validation Service (VVS): This service takes an encoded video and a list of expectations about the encode. These expectations include attributes specified in the encoding recipe as well as conformance requirements from the codec specification. VVS analyzes the encoded video and compares the results against the indicated expectations. Any discrepancy is flagged in the response to alert the caller.
  6. Video Quality Service (VQS): This service takes the mezzanine and the encoded video as input, and calculates the quality score (VMAF) of the encoded video.

Service Orchestration

Each video service provides a dedicated functionality and they work together to generate the needed video assets. Currently, the two main use cases of the Netflix video pipeline are producing assets for member streaming and for studio operations. For each use case, we created a dedicated workflow orchestrator so the service orchestration can be customized to best meet the corresponding business needs.

For the streaming use case, the generated videos are deployed to our content delivery network (CDN) for Netflix members to consume. These videos can easily be watched millions of times. The Streaming Workflow Orchestrator utilizes almost all video services to create streams for an impeccable member experience. It leverages VIS to detect and reject non-conformant or low-quality mezzanines, invokes LGS for encoding recipe optimization, encodes video using VES, and calls VQS for quality measurement where the quality data is further fed to Netflix’s data pipeline for analytics and monitoring purposes. In addition to video services, the Streaming Workflow Orchestrator uses audio and timed text services to generate audio and text assets, and packaging services to “containerize” assets for streaming.

For the studio use case, some example video assets are marketing clips and daily production editorial proxies. The requests from the studio side are generally latency-sensitive. For example, someone from the production team may be waiting for the video to review so they can decide the shooting plan for the next day. Because of this, the Studio Workflow Orchestrator optimizes for fast turnaround and focuses on core media processing services. At this time, the Studio Workflow Orchestrator calls VIS to extract metadata of the ingested assets and calls VES with predefined recipes. Compared to member streaming, studio operations have different and unique requirements for video processing. Therefore, the Studio Workflow Orchestrator is the exclusive user of some encoding features like forensic watermarking and timecode/text burn-in.

Where we are now

We have had the new video pipeline running alongside Reloaded in production for a few years now. During this time, we completed the migration of all necessary functionalities from Reloaded, began gradually shifting over traffic one use case at a time, and completed the switchover in September of 2023.

While it is still early days, we have already seen the benefits of the new platform, specifically the ease of feature delivery. Notably, Netflix launched the Advertising-supported plan in November 2022. Processing Ad creatives posed some new challenges: media formats of Ads are quite different from movie and TV mezzanines that the team was familiar with, and there was a new set of media processing requirements related to the business needs of Ads. With the modularity and developer productivity benefits of Cosmos, we were able to quickly iterate the pipeline to keep up with the changing requirements and support a successful product launch.

Summary

Rebuilding the video pipeline was a huge undertaking for the team. We are very proud of what we have achieved, and also eager to share our journey with the technical community. This blog has focused on providing an overview: a brief history of our pipeline and the platforms, why the rebuilding was necessary, what these new services look like, and how they are being used for Netflix businesses. In the next blog, we are going to delve into the details of the Video Encoding Service (VES), explaining step-by-step the service creation, and sharing lessons learned (we have A LOT!). We also plan to cover other video services in future tech blogs. Follow the Netflix Tech Blog to stay up to date.

Acknowledgments

A big shout out to the CIS team for their outstanding work in building the Cosmos platform and their receptiveness to feedback from service developers.

We want to express our appreciation to our users, the Streaming Encoding Pipeline team, and the Video Engineering team. Just like our feedback helps iron out the platform, the feedback from our users has been instrumental in building high-quality services.

We also want to thank Christos Bampis and Zhi Li for their significant contributions to video services, and our two former team members, Chao Chen and Megha Manohara for contributing to the early development of this project.

Footnotes

  1. Formerly known as Media Cloud Engineering/MCE team.
  2. The actual number of video services is more than listed here. Some of them are Netflix-specific and thus omitted from this blog.


Rebuilding Netflix Video Processing Pipeline with Microservices was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Our First Netflix Data Engineering Summit

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/our-first-netflix-data-engineering-summit-f326b0589102

Holden Karau Elizabeth Stone Pedro Duarte Chris Stephens Pallavi Phadnis Lee Woodridge Mark Cho Guil Pires Sujay Jain Tristan Reid Senthilnathan Athinarayanan Bharath Mummadisetty Abhinaya Shetty Judit Lantos Amanuel Kahsay Dao Mi Mick Dreeling Chris Colburn and Agata Gryzbek

Introduction

Earlier this summer Netflix held our first-ever Data Engineering Forum. Engineers from across the company came together to share best practices on everything from Data Processing Patterns to Building Reliable Data Pipelines. The result was a series of talks which we are now sharing with the rest of the Data Engineering community!

You can find each of the talks below with a short description of each, or you can go straight to the playlist on YouTube here.

The Talks

The Netflix Data Engineering Stack

Chris Stephens, Data Engineer, Content & Studio and Pedro Duarte, Software Engineer, Consolidated Logging walk engineers new to Netflix through the building blocks of the Netflix Data Engineering stack. Learn more about how batch and streaming data pipelines are built at Netflix.

Data Processing Patterns

Lee Woodridge and Pallavi Phadnis, Data Engineers at Netflix, talk about how you can apply different processing strategies for your batch pipelines by implementing generic abstractions to help scale, be more efficient, handle late-arriving data, and be more fault tolerant.

Streaming SQL on Data Mesh using Apache Flink

Mark Cho, Guil Pires and Sujay Jain, Engineers from the Netflix Data Platform talk about how a managed Streaming SQL using Apache Flink can help unlock new Stream Processing use cases at Netflix. You can read more about Data Mesh, Netflix’s next-generation stream processing platform, here

Building Reliable Data Pipelines

Holden Karau, OSS Engineer, Data Platform Engineering, talks about the importance of reliable data pipelines and how to build them covering tools from testing to validation and auditing. The talk uses Apache Spark as an example, but the concepts generalize regardless of your specific tools.

Knowledge Management — Leveraging Institutional Data

Tristan Reid, software engineer, shares experiences about the Knowledge Management project at Netflix, which seeks to leverage language modeling techniques and metadata from internal systems to improve the impact of the >100K memos that circulate within the company.

Psyberg, An Incremental ETL Framework Using Iceberg

Abhinaya Shetty and Bharath Mummadisetty, Data Engineers from Netflix’s Membership Data Engineering team, introduce Psyberg, an incremental ETL framework. Learn about how Psyberg leverages Iceberg metadata to handle late-arriving data, and improves data pipelines while simplifying on-call life!

Start/Stop/Continue for optimizing complex ETL jobs

Judit Lantos, Data Engineer, Member Experience Data Engineering, shares a case study to demonstrate an effective approach for optimizing complex ETL jobs.

Media Data for ML Studio Creative Production

In the last 2 decades, Netflix has revolutionized the way video content is consumed, however, there is significant work to be done in revolutionizing how movies and tv shows are made. In this video, Sr. Data Engineers Amanual Kahsay and Dao Mi showcase how data and insights are being utilized to accomplish such a vision.

We hope that our fellow members of the Data Engineering Community find these videos useful and engaging. Please follow our Netflix Data Twitter account for updates and notifications of future Data Engineering Summits!

Mick Dreeling, Chris Colburn


Our First Netflix Data Engineering Summit was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.