All posts by Netflix Technology Blog

Machine Learning for Fraud Detection in Streaming Services

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/machine-learning-for-fraud-detection-in-streaming-services-b0b4ef3be3f6

By Soheil Esmaeilzadeh, Negin Salajegheh, Amir Ziai, Jeff Boote

Introduction

Streaming services serve content to millions of users all over the world. These services allow users to stream or download content across a broad category of devices including mobile phones, laptops, and televisions. However, some restrictions are in place, such as the number of active devices, the number of streams, and the number of downloaded titles. Many users across many platforms make for a uniquely large attack surface that includes content fraud, account fraud, and abuse of terms of service. Detection of fraud and abuse at scale and in real-time is highly challenging.

Data analysis and machine learning techniques are great candidates to help secure large-scale streaming platforms. Even though such techniques can scale security solutions proportional to the service size, they bring their own set of challenges such as requiring labeled data samples, defining effective features, and finding appropriate algorithms. In this work, by relying on the knowledge and experience of streaming security experts, we define features based on the expected streaming behavior of the users and their interactions with devices. We present a systematic overview of the unexpected streaming behaviors together with a set of model-based and data-driven anomaly detection strategies to identify them.

Background on Anomaly Detection

Anomalies (also known as outliers) are defined as certain patterns (or incidents) in a set of data samples that do not conform to an agreed-upon notion of normal behavior in a given context.

There are two main anomaly detection approaches, namely, (i) rule-based, and (ii) model-based. Rule-based anomaly detection approaches use a set of rules which rely on the knowledge and experience of domain experts. Domain experts specify the characteristics of anomalous incidents in a given context and develop a set of rule-based functions to discover the anomalous incidents. As a result of this reliance, the deployment and use of rule-based anomaly detection methods become prohibitively expensive and time-consuming at scale, and cannot be used for real-time analyses. Furthermore, the rule-based anomaly detection approaches require constant supervision by experts in order to keep the underlying set of rules up-to-date for identifying novel threats. Reliance on experts can also make rule-based approaches biased or limited in scope and efficacy.

On the other hand, in model-based anomaly detection approaches, models are built and used to detect anomalous incidents in a fairly automated manner. Although model-based anomaly detection approaches are more scalable and suitable for real-time analysis, they highly rely on the availability of (often labeled) context-specific data. Model-based anomaly detection approaches, in general, are of three kinds, namely, (i) supervised, (ii) semi-supervised, and (iii) unsupervised. Given a labeled dataset, a supervised anomaly detection model can be built to distinguish between anomalous and benign incidents. In semi-supervised anomaly detection models, only a set of benign examples are required for training. These models learn the distributions of benign samples and leverage that knowledge for identifying anomalous samples at the inference time. Unsupervised anomaly detection models do not require any labeled data samples, but it is not straightforward to reliably evaluate their efficacy.

Figure 1. Schematic of a streaming service platform: (a) illustrates device types that can be used for streaming, (b) designates the set of authentication and authorization systems such as license and manifest servers for providing encrypted contents as well as decryption keys and manifests, and (c) shows the streaming service provider, as a surrogate entity for digital content providers, that interacts with the other two components.

Streaming Platforms

Commercial streaming platforms shown in Figure 1 mainly rely on Digital Rights Management (DRM) systems. DRM is a collection of access control technologies that are used for protecting the copyrights of digital media such as movies and music tracks. DRM helps the owners of digital products prevent illegal access, modification, and distribution of their copyrighted work. DRM systems provide continuous content protection against unauthorized actions on digital content and restrict it to streaming and in-time consumption. The backbone of DRM is the use of digital licenses, which specify a set of usage rights for the digital content and contain the permissions from the owner to stream the content via an on-demand streaming service.

On the client’s side, a request is sent to the streaming server to obtain the protected encrypted digital content. In order to stream the digital content, the user requests a license from the clearinghouse that verifies the user’s credentials. Once a license gets assigned to a user, using a Content Decryption Module (CDM), the protected content gets decrypted and becomes ready for preview according to the usage rights enforced by the license. A decryption key gets generated using the license, which is specific to a certain movie title, can only be used by a particular account on a given device, has a limited lifetime, and enforces a limit on how many concurrent streams are allowed.

Another relevant component that is involved in a streaming experience is the concept of manifest. Manifest is a list of video, audio, subtitles, etc. which comes in the form of a few Uniform Resource Locators (URLs) that are used by the clients to get the movie streams. Manifest is requested by the client and gets delivered to the player before the license request, and it itemizes the available streams.

Data

Data Labeling

For the task of anomaly detection in streaming platforms, as we have neither an already trained model nor any labeled data samples, we use structural a priori domain-specific rule-based assumptions, for data labeling. Accordingly, we define a set of rule-based heuristics used for identifying anomalous streaming behaviors of clients and label them as anomalous or benign. The fraud categories that we consider in this work are (i) content fraud, (ii) service fraud, and (iii) account fraud. With the help of security experts, we have designed and developed heuristic functions in order to discover a wide range of suspicious behaviors. We then use such heuristic functions for automatically labeling the data samples. In order to label a set of benign (non-anomalous) accounts a group of vetted users that are highly trusted to be free of any forms of fraud is used.

Next, we share three examples as a subset of our in-house heuristics that we have used for tagging anomalous accounts:

  • (i) Rapid license acquisition: a heuristic that is based on the fact that benign users usually watch one content at a time and it takes a while for them to move on to another content resulting in a relatively low rate of license acquisition. Based on this reasoning, we tag all the accounts that acquire licenses very quickly as anomalous.
  • (ii) Too many failed attempts at streaming: a heuristic that relies on the fact that most devices stream without errors while a device, in trial and error mode, in order to find the “right’’ parameters leaves a long trail of errors behind. Abnormally high levels of errors are an indicator of a fraud attempt.
  • (iii) Unusual combinations of device types and DRMs: a heuristic that is based on the fact that a device type (e.g., a browser) is normally matched with a certain DRM system (e.g., Widevine). Unusual combinations could be a sign of compromised devices that attempt to bypass security enforcements.

It should be noted that the heuristics, even though work as a great proxy to embed the knowledge of security experts in tagging anomalous accounts, may not be completely accurate and they might wrongly tag accounts as anomalous (i.e., false-positive incidents), for example in the case of a buggy client or device. That’s up to the machine learning model to discover and avoid such false-positive incidents.

Data Featurization

A complete list of features used in this work is presented in Table 1. The features mainly belong to two distinct classes. One class accounts for the number of distinct occurrences of a certain parameter/activity/usage in a day. For instance, the dist_title_cnt feature characterizes the number of distinct movie titles streamed by an account. The second class of features on the other hand captures the percentage of a certain parameter/activity/usage in a day.

Due to confidentiality reasons, we have partially obfuscated the features, for instance, dev_type_a_pct, drm_type_a_pct, and end_frmt_a_pct are intentionally obfuscated and we do not explicitly mention devices, DRM types, and encoding formats.

Table 1. The list of streaming related features with the suffixes pct and cnt respectively referring to percentage and count

Data Statistics

In this part, we present the statistics of the features presented in Table 1. Over 30 days, we have gathered 1,030,005 benign and 28,045 anomalous accounts. The anomalous accounts have been identified (labeled) using the heuristic-aware approach. Figure 2(a) shows the number of anomalous samples as a function of fraud categories with 8,741 (31%), 13,299 (47%), 6,005 (21%) data samples being tagged as content fraud, service fraud, and account fraud, respectively. Figure 2(b) shows that out of 28,045 data samples being tagged as anomalous by the heuristic functions, 23,838 (85%), 3,365 (12%), and 842 (3%) are respectively considered as incidents of one, two, and three fraud categories.

Figure 3 presents the correlation matrix of the 23 data features described in Table 1 for clean and anomalous data samples. As we can see in Figure 3 there are positive correlations between features that correspond to device signatures, e.g., dist_cdm_cnt and dist_dev_id_cnt, and between features that refer to title acquisition activities, e.g., dist_title_cnt and license_cnt.

Figure 2. Number of anomalous samples as a function of (a) fraud categories and (b) number of tagged categories.
Figure 3. Correlation matrix of the features presented in Table 1 for (a) clean and (b) anomalous data samples.

Label Imbalance Treatment

It is well known that class imbalance can compromise the accuracy and robustness of the classification models. Accordingly, in this work, we use the Synthetic Minority Over-sampling Technique (SMOTE) to over-sample the minority classes by creating a set of synthetic samples.

Figure 4 shows a high-level schematic of Synthetic Minority Over-sampling Technique (SMOTE) with two classes shown in green and red where the red class has fewer number of samples present, i.e., is the minority class, and gets synthetically upsampled.

Figure 4. Synthetic Minority Over-sampling Technique

Evaluation Metrics

For evaluating the performance of the anomaly detection models we consider a set of evaluation metrics and report their values. For the one-class as well as binary anomaly detection task, such metrics are accuracy, precision, recall, f0.5, f1, and f2 scores, and area under the curve of the receiver operating characteristic (ROC AUC). For the multi-class multi-label task we consider accuracy, precision, recall, f0.5, f1, and f2 scores together with a set of additional metrics, namely, exact match ratio (EMR) score, Hamming loss, and Hamming score.

Model Based Anomaly Detection

In this section, we briefly describe the modeling approaches that are used in this work for anomaly detection. We consider two model-based anomaly detection approaches, namely, (i) semi-supervised, and (ii) supervised as presented in Figure 5.

Figure 5. Model-based anomaly detection approaches: (a) semi-supervised and (b) supervised.

Semi-Supervised Anomaly Detection

The key point about the semi-supervised model is that at the training step the model is supposed to learn the distribution of the benign data samples so that at the inference time it would be able to distinguish between the benign samples (that has been trained on) and the anomalous samples (that has not observed). Then at the inference stage, the anomalous samples would simply be those that fall out of the distribution of the benign samples. The performance of One-Class methods could become sub-optimal when dealing with complex and high-dimensional datasets. However, supported by the literature, deep neural autoencoders can perform better than One-Class methods on complex and high-dimensional anomaly detection tasks.

As the One-Class anomaly detection approaches, in addition to a deep auto-encoder, we use the One-Class SVM, Isolation Forest, Elliptic Envelope, and Local Outlier Factor approaches.

Supervised Anomaly Detection

Binary Classification: In the anomaly detection task using binary classification, we only consider two classes of samples namely benign and anomalous and we do not make distinctions between the types of the anomalous samples, i.e., the three fraud categories. For the binary classification task we use multiple supervised classification approaches, namely, (i) Support Vector Classification (SVC), (ii) K-Nearest Neighbors classification, (iii) Decision Tree classification, (iv) Random Forest classification, (v) Gradient Boosting, (vi) AdaBoost, (vii) Nearest Centroid classification (viii) Quadratic Discriminant Analysis (QDA) classification (ix) Gaussian Naive Bayes classification (x) Gaussian Process Classifier (xi) Label Propagation classification (xii) XGBoost. Finally, upon doing stratified k-fold cross-validation, we carry out an efficient grid search to tune the hyper-parameters in each of the aforementioned models for the binary classification task and only report the performance metrics for the optimally tuned hyper-parameters.

Multi-Class Multi-Label Classification: In the anomaly detection task using multi-class multi-label classification, we consider the three fraud categories as the possible anomalous classes (hence multi-class), and each data sample is assigned one or more than one of the fraud categories as its set of labels (hence multi-label) using the heuristic-aware data labeling strategy presented earlier. For the multi-class multi-label classification task we use multiple supervised classification techniques, namely, (i) K-Nearest Neighbors, (ii) Decision Tree, (iii) Extra Trees, (iv) Random Forest, and (v) XGBoost.

Results and Discussion

Table 2 shows the values of the evaluation metrics for the semi-supervised anomaly detection methods. As we see from Table 2, the deep auto-encoder model performs the best among the semi-supervised anomaly detection approaches with an accuracy of around 96% and f1 score of 94%. Figure 6(a) shows the distribution of the Mean Squared Error (MSE) values for the anomalous and benign samples at the inference stage.

Table 2. The values of the evaluation metrics for a set of semi-supervised anomaly detection models.
Figure 6. For the deep auto-encoder model: (a) distribution of the Mean Squared Error (MSE) values for anomalous and benign samples at the inference stage — (b) confusion matrix across benign and anomalous samples- (c) Mean Squared Error (MSE) values averaged across the anomalous and benign samples for each of the 23 features.
Table 3. The values of the evaluation metrics for a set of supervised binary anomaly detection classifiers.
Table 4. The values of the evaluation metrics for a set of supervised multi-class multi-label anomaly detection approaches. The values in parenthesis refer to the performance of the models trained on the original (not upsampled) dataset.

Table 3 shows the values of the evaluation metrics for a set of supervised binary anomaly detection models. Table 4 shows the values of the evaluation metrics for a set of supervised multi-class multi-label anomaly detection models.

In Figure 7(a), for the content fraud category, the three most important features are the count of distinct encoding formats (dist_enc_frmt_cnt), the count of distinct devices (dist_dev_id_cnt), and the count of distinct DRMs (dist_drm_cnt). This implies that for content fraud the uses of multiple devices, as well as encoding formats, stand out from the other features. For the service fraud category in Figure 7(b) we see that the three most important features are the count of content licenses associated with an account (license_cnt), the count of distinct devices (dist_dev_id_cnt), and the percentage use of type (a) devices by an account (dev_type_a_pct). This shows that in the service fraud category the counts of content licenses and distinct devices of type (a) stand out from the other features. Finally, for the account fraud category in Figure 7(c), we see that the count of distinct devices (dist_dev_id_cnt) dominantly stands out from the other features.

Figure 7. The normalized feature importance values (NFIV) for the multi-class multi-label anomaly detection task using the XGBoost approach in Table 4 across the three anomaly classes, i.e., (a) content fraud, (b) service fraud, and (c) account fraud.

You can find more technical details in our paper here.

Are you interested in solving challenging problems at the intersection of machine learning and security? We are always looking for great people to join us.


Machine Learning for Fraud Detection in Streaming Services was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Seeing through hardware counters: a journey to threefold performance increase

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/seeing-through-hardware-counters-a-journey-to-threefold-performance-increase-2721924a2822

By Vadim Filanovsky and Harshad Sane

In one of our previous blogposts, A Microscope on Microservices we outlined three broad domains of observability (or “levels of magnification,” as we referred to them) — Fleet-wide, Microservice and Instance. We described the tools and techniques we use to gain insight within each domain. There is, however, a class of problems that requires an even stronger level of magnification going deeper down the stack to introspect CPU microarchitecture. In this blogpost we describe one such problem and the tools we used to solve it.

The problem

It started off as a routine migration. At Netflix, we periodically reevaluate our workloads to optimize utilization of available capacity. We decided to move one of our Java microservices — let’s call it GS2 — to a larger AWS instance size, from m5.4xl (16 vCPUs) to m5.12xl (48 vCPUs). The workload of GS2 is computationally heavy where CPU is the limiting resource. While we understand it’s virtually impossible to achieve a linear increase in throughput as the number of vCPUs grow, a near-linear increase is attainable. Consolidating on the larger instances reduces the amortized cost of background tasks, freeing up additional resources for serving requests and potentially offsetting the sub-linear scaling. Thus, we expected to roughly triple throughput per instance from this migration, as 12xl instances have three times the number of vCPUs compared to 4xl instances. A quick canary test was free of errors and showed lower latency, which is expected given that our standard canary setup routes an equal amount of traffic to both the baseline running on 4xl and the canary on 12xl. As GS2 relies on AWS EC2 Auto Scaling to target-track CPU utilization, we thought we just had to redeploy the service on the larger instance type and wait for the ASG (Auto Scaling Group) to settle on the CPU target. Unfortunately, the initial results were far from our expectations:

The first graph above represents average per-node throughput overlaid with average CPU utilization, while the second graph shows average request latency. We can see that as we reached roughly the same CPU target of 55%, the throughput increased only by ~25% on average, falling far short of our desired goal. What’s worse, average latency degraded by more than 50%, with both CPU and latency patterns becoming more “choppy.” GS2 is a stateless service that receives traffic through a flavor of round-robin load balancer, so all nodes should receive nearly equal amounts of traffic. Indeed, the RPS (Requests Per Second) data shows very little variation in throughput between nodes:

But as we started looking at the breakdown of CPU and latency by node, a strange pattern emerged:

Although we confirmed fairly equal traffic distribution between nodes, CPU and latency metrics surprisingly demonstrated a very different, bimodal distribution pattern. There is a “lower band” of nodes exhibiting much lower CPU and latency with hardly any variation; and there is an “upper band” of nodes with significantly higher CPU/latency and wide variation. We noticed only ~12% of the nodes fall into the lower band, a figure that was suspiciously consistent over time. In both bands, performance characteristics remain consistent for the entire uptime of the JVM on the node, i.e. nodes never jumped the bands. This was our starting point for troubleshooting.

First attempt at solving it

Our first (and rather obvious) step at solving the problem was to compare flame graphs for the “slow” and “fast” nodes. While flame graphs clearly reflected the difference in CPU utilization as the number of collected samples, the distribution across the stacks remained the same, thus leaving us with no additional insight. We turned to JVM-specific profiling, starting with the basic hotspot stats, and then switching to more detailed JFR (Java Flight Recorder) captures to compare the distribution of the events. Again, we came away empty-handed as there was no noticeable difference in the amount or the distribution of the events between the “slow” and “fast” nodes. Still suspecting something might be off with JIT behavior, we ran some basic stats against symbol maps obtained by perf-map-agent only to hit another dead end.

False Sharing

Convinced we’re not missing anything on the app-, OS- and JVM- levels, we felt the answer might be hidden at a lower level. Luckily, the m5.12xl instance type exposes a set of core PMCs (Performance Monitoring Counters, a.k.a. PMU counters), so we started by collecting a baseline set of counters using PerfSpect:

In the table above, the nodes showing low CPU and low latency represent a “fast node”, while the nodes with higher CPU/latency represent a “slow node”. Aside from obvious CPU differences, we can see that the slow node has almost 3x CPI (Cycles Per Instruction) of the fast node. We also see much higher L1 cache activity combined with 4x higher count of MACHINE_CLEARS. One common cause of these symptoms is so-called “false sharing” — a usage pattern occurring when 2 cores reading from / writing to unrelated variables that happen to share the same L1 cache line. Cache line is a concept similar to memory page — a contiguous chunk of data (typically 64 bytes on x86 systems) transferred to and from the cache. This diagram illustrates it:

Each core in this diagram has its own private cache. Since both cores are accessing the same memory space, caches have to be consistent. This consistency is ensured with so-called “cache coherency protocol.” As Thread 0 writes to the “red” variable, coherency protocol marks the whole cache line as “modified” in Thread 0’s cache and as “invalidated” in Thread 1’s cache. Later, when Thread 1 reads the “blue” variable, even though the “blue” variable is not modified, coherency protocol forces the entire cache line to be reloaded from the cache that had the last modification — Thread 0’s cache in this example. Resolving coherency across private caches takes time and causes CPU stalls. Additionally, ping-ponging coherency traffic has to be monitored through the last level shared cache’s controller, which leads to even more stalls. We take CPU cache consistency for granted, but this “false sharing” pattern illustrates there’s a huge performance penalty for simply reading a variable that is neighboring with some other unrelated data.

Armed with this knowledge, we used Intel vTune to run microarchitecture profiling. Drilling down into “hot” methods and further into the assembly code showed us blocks of code with some instructions exceeding 100 CPI, which is extremely slow. This is the summary of our findings:

Numbered markers from 1 to 6 denote the same code/variables across the sources and vTune assembly view. The red arrow indicates that the CPI value likely belongs to the previous instruction — this is due to the profiling skid in absence of PEBS (Processor Event-Based Sampling), and usually it’s off by a single instruction. Based on the fact that (5) “repne scan” is a rather rare operation in the JVM codebase, we were able to link this snippet to the routine for subclass checking (the same code exists in JDK mainline as of the writing of this blogpost). Going into the details of subtype checking in HotSpot is far beyond the scope of this blogpost, but curious readers can learn more about it from the 2002 publication Fast Subtype Checking in the HotSpot JVM. Due to the nature of the class hierarchy used in this particular workload, we keep hitting the code path that keeps updating (6) the “_secondary_super_cache” field, which is a single-element cache for the last-found secondary superclass. Note how this field is adjacent to the “_secondary_supers”, which is a list of all superclasses and is being read (1) in the beginning of the scan. Multiple threads do these read-write operations, and if fields (1) and (6) fall into the same cache line, then we hit a false sharing use case. We highlighted these fields with red and blue colors to connect to the false sharing diagram above.

Note that since the cache line size is 64 bytes and the pointer size is 8 bytes, we have a 1 in 8 chance of these fields falling on separate cache lines, and a 7 in 8 chance of them sharing a cache line. This 1-in-8 chance is 12.5%, matching our previous observation on the proportion of the “fast” nodes. Fascinating!

Although the fix involved patching the JDK, it was a simple change. We inserted padding between “_secondary_super_cache” and “_secondary_supers” fields to ensure they never fall into the same cache line. Note that we did not change the functional aspect of JDK behavior, but rather the data layout:

The results of deploying the patch were immediately noticeable. The graph below is a breakdown of CPU by node. Here we can see a red-black deployment happening at noon, and the new ASG with the patched JDK taking over by 12:15:

Both CPU and latency (graph omitted for brevity) showed a similar picture — the “slow” band of nodes was gone!

True Sharing

We didn’t have much time to marvel at these results, however. As the autoscaling reached our CPU target, we noticed that we still couldn’t push more than ~150 RPS per node — well short of our goal of ~250 RPS. Another round of vTune profiling on the patched JDK version showed the same bottleneck around secondary superclass cache lookup. It was puzzling at first to see seemingly the same problem coming back right after we put in a fix, but upon closer inspection we realized we’re dealing with “true sharing” now. Unlike “false sharing,” where 2 independent variables share a cache line, “true sharing” refers to the same variable being read and written by multiple threads/cores. In this case, CPU-enforced memory ordering is the cause of slowdown. We reasoned that removing the obstacle of false sharing and increasing the overall throughput resulted in increased execution of the same JVM superclass caching code path. Essentially, we have higher execution concurrency, causing excessive pressure on the superclass cache due to CPU-enforced memory ordering protocols. The common way to resolve this is to avoid writing to the shared variable altogether, effectively bypassing the JVM’s secondary superclass cache. Since this change altered the behavior of the JDK, we gated it behind a command line flag. This is the entirety of our patch:

And here are the results of running with disabled superclass cache writes:

Our fix pushed the throughput to ~350 RPS at the same CPU autoscaling target of 55%. To put this in perspective, that’s a 3.5x improvement over the throughput we initially reached on m5.12xl, along with a reduction in both average and tail latency.

Future work

Disabling writes to the secondary superclass cache worked well in our case, and even though this might not be a desirable solution in all cases, we wanted to share our methodology, toolset and the fix in the hope that it would help others encountering similar symptoms. While working through this problem, we came across JDK-8180450 — a bug that’s been dormant for more than five years that describes exactly the problem we were facing. It seems ironic that we could not find this bug until we actually figured out the answer. We believe our findings complement the great work that has been done in diagnosing and remediating it.

Conclusion

We tend to think of modern JVMs as highly optimized runtime environments, in many cases rivaling more “performance-oriented” languages like C++. While it holds true for the majority of workloads, we were reminded that performance of certain workloads running within JVMs can be affected not only by the design and implementation of the application code, but also by the implementation of the JVM itself. In this blogpost we described how we were able to leverage PMCs in order to find a bottleneck in the JVM’s native code, patch it, and subsequently realize better than a threefold increase in throughput for the workload in question. When it comes to this class of performance issues, the ability to introspect the execution at the level of CPU microarchitecture proved to be the only solution. Intel vTune provides valuable insight even with the core set of PMCs, such as those exposed by m5.12xl instance type. Exposing a more comprehensive set of PMCs along with PEBS across all instance types and sizes in the cloud environment would pave the way for deeper performance analysis and potentially even larger performance gains.

Special thanks to Sandhya Viswanathan, Jennifer Dimatteo, Brendan Gregg, Susie Xia, Jason Koch, Mike Huang, Amer Ather, Chris Berry, Chris Sanden, and Guy Cirino


Seeing through hardware counters: a journey to threefold performance increase was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Consistent caching mechanism in Titus Gateway

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/consistent-caching-mechanism-in-titus-gateway-6cb89b9ce296

by Tomasz Bak and Fabio Kung

Introduction

Titus is the Netflix cloud container runtime that runs and manages containers at scale. In the time since it was first presented as an advanced Mesos framework, Titus has transparently evolved from being built on top of Mesos to Kubernetes, handling an ever-increasing volume of containers. As the number of Titus users increased over the years, the load and pressure on the system increased substantially. The original assumptions and architectural choices were no longer viable. This blog post presents how our current iteration of Titus deals with high API call volumes by scaling out horizontally.

We introduce a caching mechanism in the API gateway layer, allowing us to offload processing from singleton leader elected controllers without giving up strict data consistency and guarantees clients observe. Titus API clients always see the latest (not stale) version of the data regardless of which gateway node serves their request, and in which order.

Overview

The figure below depicts a simplified high-level architecture of a single Titus cluster (a.k.a cell):

Titus Job Coordinator is a leader elected process managing the active state of the system. Active data includes jobs and tasks that are currently running. When a new leader is elected it loads all data from external storage. Mutations are first persisted to the active data store before in-memory state is changed. Data for completed jobs and tasks is moved to the archive store first, and only then removed from the active data store and from the leader memory.

Titus Gateway handles user requests. A user request could be a job creation request, a query to the active data store, or a query to the archive store (the latter handled directly in Titus Gateway). Requests are load balanced across all Titus Gateway nodes. All reads are consistent, so it does not matter which Titus Gateway instance is serving a query. For example, it is OK to send writes through one instance, and do reads from another one with full data read consistency guarantees. Titus Gateways always connect to the current Titus Job Coordinator leader. During leader failovers, all writes and reads of the active data are rejected until a connection to the active leader is re-established.

In the original version of the system, all queries to the active data set were forwarded to a singleton Titus Job Coordinator. The freshest data is served to all requests, and clients never observe read-your-write or monotonic-read consistency issues¹:

Data consistency on the Titus API is highly desirable as it simplifies client implementation. Causal consistency, which includes read-your-writes and monotonic-reads, frees clients from implementing client-side synchronization mechanisms. In PACELC terms we choose PC/EC and have the same level of availability for writes of our previous system while improving our theoretical availability for reads.

For example, a batch workflow orchestration system may create multiple jobs which are part of a single workflow execution. After the jobs are created, it monitors their execution progress. If the system creates a new job, followed immediately by a query to get its status, and there is a data propagation lag, it might decide that the job was lost and a replacement must be created. In that scenario, the system would need to deal with the data propagation latency directly, for example, by use of timeouts or client-originated update tracking mechanisms. As Titus API reads are always consistently reflecting the up-to-date state, such workarounds are not needed.

With traffic growth, a single leader node handling all request volume started becoming overloaded. We started seeing increased response latencies and leader servers running at dangerously high utilization. To mitigate this issue we decided to handle all query requests directly from Titus Gateway nodes but still preserve the original consistency guarantees:

The state from Titus Job Coordinator is replicated over a persistent stream connection, with low event propagation latencies. A new wire protocol provided by Titus Job Coordinator allows monitoring of the cache consistency level and guarantees that clients always receive the latest data version. The cache is kept in sync with the current leader process. When there is a failover (because of node failures with the current leader or a system upgrade), a new snapshot from the freshly elected leader is loaded, replacing the previous cache state. Titus Gateways handling client requests can now be horizontally scaled out. The details and workings of these mechanisms are the primary topics of this blog post.

How do I know that my cache is up to date?

It is an easy answer for systems that were built from the beginning with a consistent data versioning scheme and can depend on clients to follow the established protocol. Kubernetes is a good example here. Each object and each collection read from the Kubernetes cluster has a unique revision which is a monotonically increasing number. A user may request all changes since the last received revision. For more details, see Kubernetes API Concepts and the Shared Informer Pattern.

In our case, we did not want to change the API contract and impose additional constraints and requirements on our users. Doing so would require a substantial migration effort to move all clients off the old API with questionable value to the affected teams (except for helping us solve Titus' internal scalability problems). In our experience, such migrations require a nontrivial amount of work, particularly with the migration timeline not fully in our control.

To fulfill the existing API contract, we had to guarantee that for a request received at a time T₀, the data returned to the client is read from a cache that contains all state updates in Titus Job Coordinator up to time T₀.

The path over which data travels from Titus Job Coordinator to a Titus Gateway cache can be described as a sequence of event queues with different processing speeds:

A message generated by the event source may be buffered at any stage. Furthermore, as each event stream subscription from Titus Gateway to Titus Job Coordinator establishes a different instance of the processing pipeline, the state of the cache in each gateway instance may be vastly different.

Let’s assume a sequence of events E₁…E₁₀, and their location within the pipeline of two Titus Gateway instances at time T₁:

If a client makes a call to Titus Gateway 2 at the time T₁, it will read version E₈ of the data. If it immediately makes a request to Titus Gateway 1, the cache there is behind with respect to the other gateway so the client might read an older version of the data.

In both cases, data is not up to date in the caches. If a client created a new object at time T₀, and the object value is captured by an event update E₁₀, this object will be missing in both gateways at time T₁. A surprise to the client who successfully completed a create request, but the follow-up query returned a not-found error (read-your-write consistency violation).

The solution is to flush all the events created up to time T₁ and force clients to wait for the cache to receive them all. This work can be split into two different steps each with its own unique solution.

Implementation details

We solved the cache synchronization problem (as stated above) with a combination of two strategies:

  • Titus Gateway <-> Titus Job Coordinator synchronization protocol over the wire.
  • Usage of high-resolution monotonic time sources like Java’s nano time within a single server process. Java’s nano time is used as a logical time within a JVM to define an order for events happening in the JVM process. An alternative solution based on an atomic integer values generator to order the events would suffice as well. Having the local logical time source avoids issues with distributed clock synchronization.

If Titus Gateways subscribed to the Titus Job Coordinator event stream without synchronization steps, the amount of data staleness would be impossible to estimate. To guarantee that a Titus Gateway received all state updates that happened until some time Tₙ an explicit synchronization between the two services must happen. Here is what the protocol we implemented looks like:

  1. Titus Gateway receives a client request (queryₐ).
  2. Titus Gateway makes a request to the local cache to fetch the latest version of the data.
  3. The local cache in Titus Gateway records the local logical time and sends it to Titus Job Coordinator in a keep-alive message (keep-aliveₐ).
  4. Titus Job Coordinator saves the keep-alive request together with the local logical time Tₐ of the request arrival in a local queue (KAₐ, Tₐ).
  5. Titus Job Coordinator sends state updates to Titus Gateway until the former observes a state update (event) with a timestamp past the recorded local logical time (E1, E2).
  6. At that time, Titus Job Coordinator sends an acknowledgment event for the keep-alive message (KAₐ keep-alive ACK).
  7. Titus Gateway receives the keep-alive acknowledgment and consequently knows that its local cache contains all state changes that happened up to the time when the keep-alive request was sent.
  8. At this point the original client request can be handled from the local cache, guaranteeing that the client will get a fresh enough version of the data (responseₐ).

This process is illustrated by the figure below:

The procedure above explains how to synchronize a Titus Gateway cache with the source of truth in Titus Job Coordinator, but it does not address how the internal queues in Titus Job Coordinator are drained to the point where all relevant messages are processed. The solution here is to add a logical timestamp to each event and guarantee a minimum time interval between messages emitted inside the event stream. If not enough events are created because of data updates, a dummy message is generated and inserted into the stream. Dummy messages guarantee that each keep-alive request is acknowledged within a bounded time, and does not wait indefinitely until some change in the system happens. For example:

Ta, Tb, Tc, Td, and Te are high-resolution monotonic logical timestamps. At time Td a dummy message is inserted, so the interval between two consecutive events in the event stream is always below a configurable threshold. These timestamp values are compared with keep-alive request arrival timestamps to know when a keep-alive acknowledgment can be sent.

There are a few optimization techniques that can be used. Here are those implemented in Titus:

  • Before sending a keep-alive request for each new client request, wait a fixed interval and send a single keep-alive request for all requests that arrived during that time. So the maximum rate of keep-alive requests is constrained by 1 / max_interval. For example, if max_interval is set to 5ms, the max keep alive request rate is 200 req / sec.
  • Collapse multiple keep-alive requests in Titus Job Coordinator, sending a response to the latest one which has the arrival timestamp less than that of the timestamp of the last event sent over the network. On the Titus Gateway side, a keep-alive response with a given timestamp acknowledges all pending requests with keep-alive timestamps earlier or equal to the received one.
  • Do not wait for cache synchronization on requests that do not have ordering requirements, serving data from the local cache on each Titus Gateway. Clients that can tolerate eventual consistency can opt into this new API for lower response times and increased availability.

Given the mechanism described so far, let’s try to estimate the maximum wait time of a client request that arrived at Titus Gateway for different scenarios. Let’s assume that the maximum keep alive interval is 5ms, and the maximum interval between events emitted in Titus Job Coordinator is 2ms.

Assuming that the system runs idle (no changes made to the data), and the client request arrives at a time when a new keep-alive request wait time starts, the cache update latency is equal to 7 milliseconds + network propagation delay + processing time. If we ignore the processing time and assume that the network propagation delay is <1ms given we have to only send back a small keep-alive response, we should expect an 8ms delay in the typical case. If the client request does not have to wait for the keep-alive to be sent, and the keep-alive request is acknowledged immediately in Titus Job Coordinator, the delay is equal to network propagation delay + processing time, which we estimated to be <1ms. The average delay introduced by cache synchronization is around 4ms.

Network propagation delays and stream processing times start to become a more important factor as the number of state change events and client requests increases. However, Titus Job Coordinator can now dedicate its capacity for serving high bandwidth streams to a finite number of Titus Gateways, relying on the gateway instances to serve client requests, instead of serving payloads to all client requests itself. Titus Gateways can then be scaled out to match client request volumes.

We ran empirical tests for scenarios of low and high request volumes, and the results are presented in the next section.

Performance test results

To show how the system performs with and without the caching mechanism, we ran two tests:

  • A test with a low/moderate load showing a median latency increase due to overhead from the cache synchronization mechanism, but better 99th percentile latencies.
  • A test with load close to the peak of Titus Job Coordinator capacity, above which the original system collapses. Previous results hold, showing better scalability with the caching solution.

A single request in the tests below consists of one query. The query is of a moderate size, which is a collection of 100 records, with a serialized response size of ~256KB. The total payload (request size times the number of concurrently running requests) requires a network bandwidth of ~2Gbps in the first test and ~8Gbps in the second one.

Moderate load level

This test shows the impact of cache synchronization on query latency in a moderately loaded system. The query rate in this test is set to 1K requests/second.

Median latency without caching is half of what we observe with the introduction of the caching mechanism, due to the added synchronization delays. In exchange, the worst-case 99th percentile latencies are 90% lower, dropping from 292 milliseconds without a cache to 30 milliseconds with the cache.

Load level close to Titus Job Coordinator maximum

If Titus Job Coordinator has to handle all query requests (when the cache is not enabled), it handles the traffic well up to 4K test queries / second, and breaks down (sharp latency increase and a rapid drop of throughput) at around 4.5K queries/sec. The maximum load test is thus kept at 4K queries/second.

Without caching enabled the 99th percentile hovers around 1000ms, and the 80th percentile is around 336ms, compared with the cache-enabled 99th percentile at 46ms and 80th percentile at 22ms. The median still looks better on the setup with no cache at 17ms vs 19ms when the cache is enabled. It should be noted however that the system with caching enabled scales out linearly to more request load while keeping the same latency percentiles, while the no-cache setup collapses with a mere ~15% additional load increase.

Doubling the load when the caching is enabled does not increase the latencies at all. Here are latency percentiles when running 8K query requests/second:

Conclusion

After reaching the limit of vertical scaling of our previous system, we were pleased to implement a real solution that provides (in a practical sense) unlimited scalability of Titus read-only API. We were able to achieve better tail latencies with a minor sacrifice in median latencies when traffic is low, and gained the ability to horizontally scale out our API gateway processing layer to handle growth in traffic without changes to API clients. The upgrade process was completely transparent, and no single client observed any abnormalities or changes in API behavior during and after the migration.

The mechanism described here can be applied to any system relying on a singleton leader elected component as the source of truth for managed data, where the data fits in memory and latency is low.

As for prior art, there is ample coverage of cache coherence protocols in the literature, both in the context of multiprocessor architectures (Adve & Gharachorloo, 1996) and distributed systems (Gwertzman & Seltzer, 1996). Our work fits within mechanisms of client polling and invalidation protocols explored by Gwertzman and Seltzer (1996) in their survey paper. Central timestamping to facilitate linearizability in read replicas is similar to the Calvin system (example real-world implementations in systems like FoundationDB) as well as the replica watermarking in AWS Aurora.

¹ Designing Data-Intensive Applications is an excellent book that goes into detail about consistency models discussed in this blog post.

² Adve, S. V., & Gharachorloo, K. (1996). Shared memory consistency models: A tutorial. computer, 29(12), 66–76.

³ Gwertzman, J., & Seltzer, M. I. (1996, January). World Wide Web Cache Consistency. In USENIX annual technical conference (Vol. 141, p. 152).


Consistent caching mechanism in Titus Gateway was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Orchestrating Data/ML Workflows at Scale With Netflix Maestro

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/orchestrating-data-ml-workflows-at-scale-with-netflix-maestro-aaa2b41b800c

by Jun He, Akash Dwivedi, Natallia Dzenisenka, Snehal Chennuru, Praneeth Yenugutala, Pawan Dixit

At Netflix, Data and Machine Learning (ML) pipelines are widely used and have become central for the business, representing diverse use cases that go beyond recommendations, predictions and data transformations. A large number of batch workflows run daily to serve various business needs. These include ETL pipelines, ML model training workflows, batch jobs, etc. As Big data and ML became more prevalent and impactful, the scalability, reliability, and usability of the orchestrating ecosystem have increasingly become more important for our data scientists and the company.

In this blog post, we introduce and share learnings on Maestro, a workflow orchestrator that can schedule and manage workflows at a massive scale.

Motivation

Scalability and usability are essential to enable large-scale workflows and support a wide range of use cases. Our existing orchestrator (Meson) has worked well for several years. It schedules around 70 thousands of workflows and half a million jobs per day. Due to its popularity, the number of workflows managed by the system has grown exponentially. We started seeing signs of scale issues, like:

  • Slowness during peak traffic moments like 12 AM UTC, leading to increased operational burden. The scheduler on-call has to closely monitor the system during non-business hours.
  • Meson was based on a single leader architecture with high availability. As the usage increased, we had to vertically scale the system to keep up and were approaching AWS instance type limits.

With the high growth of workflows in the past few years — increasing at > 100% a year, the need for a scalable data workflow orchestrator has become paramount for Netflix’s business needs. After perusing the current landscape of workflow orchestrators, we decided to develop a next generation system that can scale horizontally to spread the jobs across the cluster consisting of 100’s of nodes. It addresses the key challenges we face with Meson and achieves operational excellence.

Challenges in Workflow Orchestration

Scalability

The orchestrator has to schedule hundreds of thousands of workflows, millions of jobs every day and operate with a strict SLO of less than 1 minute of scheduler introduced delay even when there are spikes in the traffic. At Netflix, the peak traffic load can be a few orders of magnitude higher than the average load. For example, a lot of our workflows are run around midnight UTC. Hence, the system has to withstand bursts in traffic while still maintaining the SLO requirements. Additionally, we would like to have a single scheduler cluster to manage most of user workflows for operational and usability reasons.

Another dimension of scalability to consider is the size of the workflow. In the data domain, it is common to have a super large number of jobs within a single workflow. For example, a workflow to backfill hourly data for the past five years can lead to 43800 jobs (24 * 365 * 5), each of which processes data for an hour. Similarly, ML model training workflows usually consist of tens of thousands of training jobs within a single workflow. Those large-scale workflows might create hotspots and overwhelm the orchestrator and downstream systems. Therefore, the orchestrator has to manage a workflow consisting of hundreds of thousands of jobs in a performant way, which is also quite challenging.

Usability

Netflix is a data-driven company, where key decisions are driven by data insights, from the pixel color used on the landing page to the renewal of a TV-series. Data scientists, engineers, non-engineers, and even content producers all run their data pipelines to get the necessary insights. Given the diverse backgrounds, usability is a cornerstone of a successful orchestrator at Netflix.

We would like our users to focus on their business logic and let the orchestrator solve cross-cutting concerns like scheduling, processing, error handling, security etc. It needs to provide different grains of abstractions for solving similar problems, high-level to cater to non-engineers and low-level for engineers to solve their specific problems. It should also provide all the knobs for configuring their workflows to suit their needs. In addition, it is critical for the system to be debuggable and surface all the errors for users to troubleshoot, as they improve the UX and reduce the operational burden.

Providing abstractions for the users is also needed to save valuable time on creating workflows and jobs. We want users to rely on shared templates and reuse their workflow definitions across their team, saving time and effort on creating the same functionality. Using job templates across the company also helps with upgrades and fixes: when the change is made in a template it’s automatically updated for all workflows that use it.

However, usability is challenging as it is often opinionated. Different users have different preferences and might ask for different features. Sometimes, the users might ask for the opposite features or ask for some niche cases, which might not necessarily be useful for a broader audience.

Introducing Maestro

Maestro is the next generation Data Workflow Orchestration platform to meet the current and future needs of Netflix. It is a general-purpose workflow orchestrator that provides a fully managed workflow-as-a-service (WAAS) to the data platform at Netflix. It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, for various use cases.

Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users. Figure 1 shows the high-level architecture.

Figure 1. Maestro high level architecture
Figure 1. Maestro high level architecture

In Maestro, a workflow is a DAG (Directed acyclic graph) of individual units of job definition called Steps. Steps can have dependencies, triggers, workflow parameters, metadata, step parameters, configurations, and branches (conditional or unconditional). In this blog, we use step and job interchangeably. A workflow instance is an execution of a workflow, similarly, an execution of a step is called a step instance. Instance data include the evaluated parameters and other information collected at runtime to provide different kinds of execution insights. The system consists of 3 main micro services which we will expand upon in the following sections.

Maestro ensures the business logic is run in isolation. Maestro launches a unit of work (a.k.a. Steps) in a container and ensures the container is launched with the users/applications identity. Launching with identity ensures the work is launched on-behalf-of the user/application, the identity is later used by the downstream systems to validate if an operation is allowed or not, for an example user/application identity is checked by the data warehouse to validate if a table read/write is allowed or not.

Workflow Engine

Workflow engine is the core component, which manages workflow definitions, the lifecycle of workflow instances, and step instances. It provides rich features to support:

  • Any valid DAG patterns
  • Popular data flow constructs like sub workflow, foreach, conditional branching etc.
  • Multiple failure modes to handle step failures with different error retry policies
  • Flexible concurrency control to throttle the number of executions at workflow/step level
  • Step templates for common job patterns like running a Spark query or moving data to Google sheets
  • Support parameter code injection using customized expression language
  • Workflow definition and ownership management.
    Timeline including all state changes and related debug info.

We use Netflix open source project Conductor as a library to manage the workflow state machine in Maestro. It ensures to enqueue and dequeue each step defined in a workflow with at least once guarantee.

Time-Based Scheduling Service

Time-based scheduling service starts new workflow instances at the scheduled time specified in workflow definitions. Users can define the schedule using cron expression or using periodic schedule templates like hourly, weekly etc;. This service is lightweight and provides an at-least-once scheduling guarantee. Maestro engine service will deduplicate the triggering requests to achieve an exact-once guarantee when scheduling workflows.

Time-based triggering is popular due to its simplicity and ease of management. But sometimes, it is not efficient. For example, the daily workflow should process the data when the data partition is ready, not always at midnight. Therefore, on top of manual and time-based triggering, we also provide event-driven triggering.

Signal Service

Maestro supports event-driven triggering over signals, which are pieces of messages carrying information such as parameter values. Signal triggering is efficient and accurate because we don’t waste resources checking if the workflow is ready to run, instead we only execute the workflow when a condition is met.

Signals are used in two ways:

  • A trigger to start new workflow instances
  • A gating function to conditionally start a step (e.g., data partition readiness)

Signal service goals are to

  • Collect and index signals
  • Register and handle workflow trigger subscriptions
  • Register and handle the step gating functions
  • Captures the lineage of workflows triggers and steps unblocked by a signal
Figure 2. Signal service high level architecture
Figure 2. Signal service high level architecture

The maestro signal service consumes all the signals from different sources, e.g. all the warehouse table updates, S3 events, a workflow releasing a signal, and then generates the corresponding triggers by correlating a signal with its subscribed workflows. In addition to the transformation between external signals and workflow triggers, this service is also responsible for step dependencies by looking up the received signals in the history. Like the scheduling service, the signal service together with Maestro engine achieves exactly-once triggering guarantees.

Signal service also provides the signal lineage, which is useful in many cases. For example, a table updated by a workflow could lead to a chain of downstream workflow executions. Most of the time the workflows are owned by different teams, the signal lineage helps the upstream and downstream workflow owners to see who depends on whom.

Orchestration at Scale

All services in the Maestro system are stateless and can be horizontally scaled out. All the requests are processed via distributed queues for message passing. By having a shared nothing architecture, Maestro can horizontally scale to manage the states of millions of workflow and step instances at the same time.

CockroachDB is used for persisting workflow definitions and instance state. We chose CockroachDB as it is an open-source distributed SQL database that provides strong consistency guarantees that can be scaled horizontally without much operational overhead.

It is hard to support super large workflows in general. For example, a workflow definition can explicitly define a DAG consisting of millions of nodes. With that number of nodes in a DAG, UI cannot render it well. We have to enforce some constraints and support valid use cases consisting of hundreds of thousands (or even millions) of step instances in a workflow instance.

Based on our findings and user feedback, we found that in practice

  • Users don’t want to manually write the definitions for thousands of steps in a single workflow definition, which is hard to manage and navigate over UI. When such a use case exists, it is always feasible to decompose the workflow into smaller sub workflows.
  • Users expect to repeatedly run a certain part of DAG hundreds of thousands (or even millions) times with different parameter settings in a given workflow instance. So at runtime, a workflow instance might include millions of step instances.

Therefore, we enforce a workflow DAG size limit (e.g. 1K) and we provide a foreach pattern that allows users to define a sub DAG within a foreach block and iterate the sub DAG with a larger limit (e.g. 100K). Note that foreach can be nested by another foreach. So users can run millions or billions of steps in a single workflow instance.

In Maestro, foreach itself is a step in the original workflow definition. Foreach is internally treated as another workflow which scales similarly as any other Maestro workflow based on the number of step executions in the foreach loop. The execution of sub DAG within foreach will be delegated to a separate workflow instance. Foreach step will then monitor and collect status of those foreach workflow instances, each of which manages the execution of one iteration.

Figure 3. Maestro’s scalable foreach design to support super large iterations
Figure 3. Maestro’s scalable foreach design to support super large iterations

With this design, foreach pattern supports sequential loop and nested loop with high scalability. It is easy to manage and troubleshoot as users can see the overall loop status at the foreach step or view each iteration separately.

Workflow Platform for Everyone

We aim to make Maestro user friendly and easy to learn for users with different backgrounds. We made some assumptions about user proficiency in programming languages and they can bring their business logic in multiple ways, including but not limited to, a bash script, a Jupyter notebook, a Java jar, a docker image, a SQL statement, or a few clicks in the UI using parameterized workflow templates.

User Interfaces

Maestro provides multiple domain specific languages (DSLs) including YAML, Python, and Java, for end users to define their workflows, which are decoupled from their business logic. Users can also directly talk to Maestro API to create workflows using the JSON data model. We found that human readable DSL is popular and plays an important role to support different use cases. YAML DSL is the most popular one due to its simplicity and readability.

Here is an example workflow defined by different DSLs.

Figure 4. An example workflow defined by YAML, Python, and Java DSLs
Figure 4. An example workflow defined by YAML, Python, and Java DSLs

Additionally, users can also generate certain types of workflows on UI or use other libraries, e.g.

  • In Notebook UI, users can directly schedule to run the chosen notebook periodically.
  • In Maestro UI, users can directly schedule to move data from one source (e.g. a data table or a spreadsheet) to another periodically.
  • Users can use Metaflow library to create workflows in Maestro to execute DAGs consisting of arbitrary Python code.

Parameterized Workflows

Lots of times, users want to define a dynamic workflow to adapt to different scenarios. Based on our experiences, a completely dynamic workflow is less favorable and hard to maintain and troubleshooting. Instead, Maestro provides three features to assist users to define a parameterized workflow

  • Conditional branching
  • Sub-workflow
  • Output parameters

Instead of dynamically changing the workflow DAG at runtime, users can define those changes as sub workflows and then invoke the appropriate sub workflow at runtime because the sub workflow id is a parameter, which is evaluated at runtime. Additionally, using the output parameter, users can produce different results from the upstream job step and then iterate through those within the foreach, pass it to the sub workflow, or use it in the downstream steps.

Here is an example (using YAML DSL) of backfill workflow with 2 steps. In step1, the step computes the backfill ranges and returns the dates back. Next, foreach step uses the dates from step1 to create foreach iterations. Finally, each of the backfill jobs gets the date from the foreach and backfills the data based on the date.

Workflow:
id: demo.pipeline
jobs:
- job:
id: step1
type: NoOp
'!dates': return new int[]{20220101,20220102,20220103}; #SEL
- foreach:
id: step2
params:
date: ${dates@step1} #reference a upstream step parameter
jobs:
- job:
id: backfill
type: Notebook
notebook:
input_path: s3://path/to/notebook.ipynb
arg1: $date #pass the foreach parameter into notebook
Figure 4. An example of using parameterized workflow for backfill data
Figure 5. An example of using parameterized workflow for backfill data

The parameter system in Maestro is completely dynamic with code injection support. Users can write the code in Java syntax as the parameter definition. We developed our own secured expression language (SEL) to ensure security. It only exposes limited functionality and includes additional checks (e.g. the number of iteration in the loop statement, etc.) in the language parser.

Execution Abstractions

Maestro provides multiple levels of execution abstractions. Users can choose to use the provided step type and set its parameters. This helps to encapsulate the business logic of commonly used operations, making it very easy for users to create jobs. For example, for spark step type, all users have to do is just specify needed parameters like spark sql query, memory requirements, etc, and Maestro will do all behind-the-scenes to create the step. If we have to make a change in the business logic of a certain step, we can do so seamlessly for users of that step type.

If provided step types are not enough, users can also develop their own business logic in a Jupyter notebook and then pass it to Maestro. Advanced users can develop their own well-tuned docker image and let Maestro handle the scheduling and execution.

Additionally, we abstract the common functions or reusable patterns from various use cases and add them to the Maestro in a loosely coupled way by introducing job templates, which are parameterized notebooks. This is different from step types, as templates provide a combination of various steps. Advanced users also leverage this feature to ship common patterns for their own teams. While creating a new template, users can define the list of required/optional parameters with the types and register the template with Maestro. Maestro validates the parameters and types at the push and run time. In the future, we plan to extend this functionality to make it very easy for users to define templates for their teams and for all employees. In some cases, sub-workflows are also used to define common sub DAGs to achieve multi-step functions.

Moving Forward

We are taking Big Data Orchestration to the next level and constantly solving new problems and challenges, please stay tuned. If you are motivated to solve large scale orchestration problems, please join us as we are hiring.


Orchestrating Data/ML Workflows at Scale With Netflix Maestro was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Product Teams Can Build Empathy Through Experimentation

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-product-teams-can-build-empathy-through-experimentation-6253603880a6

A conversation between Travis Brooks, Netflix Product Manager for Experimentation Platform, and George Khachatryan, OfferFit CEO

Note: I’ve known George for a little while now, and as we’ve talked a lot about the philosophy of experimentation, he kindly invited me to their office (virtually) for their virtual speaker series. We had a fun conversation with his team, and we realized that some parts of it might make a good blog post as well. So we jointly edited a bit for length and clarity, and are posting here as well as on OfferFit’s blog. Hope you enjoy the result. — Travis B.

George Khachatryan: Travis, could you tell us a bit about your background and how you came to your current role?

Travis Brooks: I’m the product manager (PM) for the experimentation platform. So my job is to make sure that all the tooling and infrastructure we have at Netflix for experimentation does what it needs to do, and to set the road map for the next year or more for what we’re building.

I started out in physics, but ended up not doing that. Instead, I started leading an information resource for particle physics literature. One of the things we ran up against was we didn’t really have enough users to run experiments. We were all experimental physicists at heart and we wanted to make decisions on some sort of principled basis, but we didn’t actually have enough users to get statistical significance.

At the same time, I had an opportunity to go join Yelp as the first product manager there for search, where there were many more users. And so I did that and spent some time building out search algorithms and recommendation engines at Yelp.

I came to Netflix about three years ago, and first led a team of data scientists responsible for front end experimentation — basically everything you see on the Netflix platform. And then in the last year, I’ve been the PM for all of our experimentation infrastructure and platform.

George Khachatryan: So over the last decade, a lot of tech companies have been increasingly embracing user centric design — it’s kind of become the accepted wisdom. And a lot of non-tech companies also are increasingly trying to be customer centric in their thinking. How would you define user centric design and what role do you think experimentation plays in it?

Travis Brooks: Let me say first that I’m talking here about my own experiences. I’m not speaking for Netflix.

But what I can say is that broadly, I think user centric design is really about empathy. And as a person who’s been both a user facing PM and a tools PM, having empathy for your user is one of the core traits that defines good product management. So when we say “user centric”, we’re just saying, “Hey, really lean into empathy.”

When you’re building things, whether you’re a visual designer, or a designer of an API, or a PM, or anybody who’s building something, lean into trying to put yourself in the shoes of the user. And if you can do that, not just at the beginning when you write down the specs, but all the way through the process, you make a better product in the end.

In the act of building we tend to get really entranced by the technical problem and solving that problem. And in fact, it’s probably necessary to lose sight of what the end user is going to experience in order to build the best technical solution. So to build products that are effective for the user we need that user perspective brought back into focus pretty regularly — “Oh, wait, here’s what the end user is going to experience” Or, “Oh yeah, actually we don’t even need to solve that really challenging, interesting technical problem over there, because the end user is only going to experience this part over here.” To me, that’s what user centric design means.

How do we make sure that in all aspects, whether it’s an API, or the front-end visual design, we’re centering the user? How are they going to experience this product? What are their pain points? Is what we’re doing actually connected to that end user?

George Khachatryan: And what role does experimentation play, if any, in building empathy?

Travis Brooks: This is really a PM’s role — to ensure that the team that’s building something is maintaining that level of user empathy. But then you have to ask, “How does the PM know what users want?” Right? They’re not magic. A good PM doesn’t spring fully formed from the head of Zeus with all the knowledge of what users want. How do they get that knowledge? I think there are four ways.

1. One way is if you’re PM-ing a product that you yourself use. It’s the cheapest and maybe the lowest fidelity way of building empathy. “Okay, well, I’m a user so I know what users feel because I use the product”. It’s low fidelity because it’s an N of 1, and you’re certainly not a typical user. You’re a PM. You have a way different way of interacting with products than most people.

2. Typically the next thing people do is they start talking to users. And if they’re smart, they start talking to people who are not like them. “Hey, how do you use this product? What do you value? What do you find painful about it? How often do you use it? Why don’t you use it more? When was the last time you used it? What were you trying to do? Did you achieve that?” — all those typical user research questions that PMs ask. Really good user researchers get into this sort of qualitative research, and that’s a great way to build broader empathy, at a higher fidelity level, than just, “I use my product.”

3. Then you get to a scale where you have a lot of users, and talking to them becomes an art of “How do I get a representative sample from this broad population?” And you start to worry that maybe their memory isn’t quite perfect. Users are self-reporting how they use things, but that’s not actually how they use things. We know people have a lot of cognitive biases in that way. So then you start getting into observational data, and you say, “well, okay, people report that they use the product once a week. If I go look at data, I can see people use the product three times a week, so I can tell that what they report isn’t quite what happened.” Adding this observational data layer makes user research much higher fidelity. Of course, it’s higher cost and may take some time and some effort and investment.

4. But even that observational data layer doesn’t really help you understand how people use the product at the level of a deep causal connection. The end game of trying to understand the user is, “if I do X, users respond this way”. And the only way to establish that causal connection — maybe not the only way, but the most reliable way, the highest fidelity way — is to show a random sample of your users X and see how they differ from the rest of your users who didn’t see X. That’s the core of experimentation: a high cost, high fidelity, arguably lower speed, way to build empathy. It’s probably not the first place you’re going to turn to build empathy, but you’re going to get there and you’ll eventually need to have it in your arsenal.

Different scales of usage demand different methods of learning
Different methods of learning work optimally at different scales, having all of them in your arsenal is useful.

George Khachatryan: Yeah. So you talk about the importance of building an experimentation culture. Can you explain what the main elements of such a culture would be, in your view?

Travis Brooks: I think having a sense of humility is super important. If you read our blog posts, or posts from anybody who does massive scale testing such as Microsoft, you see that they test, and most of their treatments fail. And those are treatments from expert designers and PMs and engineers who have the best context, the best user research. Especially as your product matures, it is hard to improve upon. Even a less mature product is hard to improve upon, because it turns out our intuition is pretty good. We understand what users need. It’s just not a very reliable mechanism. So most treatments that we come up with fail, which means you have to have a lot of humility.

You can’t get married to your ideas and say, “I’m going to do an experiment; this is going to blow the world away.” And you end up wasting a lot of time and effort trying to show that your treatment is good, even if it’s not. You miss the bigger picture, which is, “Hey, you tried something. It didn’t work. What can you learn from that experience to inform the next treatment?”

The cultures where people can be really successful in experimentation involve a lot of humility, which encourages that sort of iterative approach. “I’m guessing this is not going to work because I can see from history most of these things don’t work. What I’m going to do is put it out there and I’m going to learn from it. Maybe I’ll get lucky and it’ll work right off the bat, but maybe I won’t. I’ll learn from the next two tests, and I’ll get to someplace where I can actually solve this problem.”

The other thing I think is important is having a culture of open debate, where decisions are made out in the open. The more open your decision-making, the louder a voice data has. When decision-making gets closed, into one person’s office or one person’s head, it’s hard. Often when people debate and they can’t agree, they turn to data, because it’s a lot harder to disagree with that. And so if you want an experimentation culture, if you want data, have open debate. Have open decision making. Then people more clearly see that they need data, that they need to experiment.

So yes, I think that humility and open decision-making are really important.


How Product Teams Can Build Empathy Through Experimentation was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support for Non-Parallelizable Workloads

by Kostas Christidis

Introduction

Timestone is a high-throughput, low-latency priority queueing system we built in-house to support the needs of Cosmos, our media encoding platform. Over the past 2.5 years, its usage has increased, and Timestone is now also the priority queueing engine backing Conductor, our general-purpose workflow orchestration engine, and BDP Scheduler, the scheduler for large-scale data pipelines. All in all, millions of critical workflows within Netflix now flow through Timestone on a daily basis.

Timestone clients can create queues, enqueue messages with user-defined deadlines and metadata, then dequeue these messages in an earliest-deadline-first (EDF) fashion. Filtering for EDF messages with criteria (e.g. “messages that belong to queue X and have metadata Y”) is also supported.

One of the things that make Timestone different from other priority queues is its support for a construct we call exclusive queues — this is a means to mark chunks of work as non-parallelizable, without requiring any locking or coordination on the consumer side; everything is taken care of by the exclusive queue in the background. We explain the concept in detail in the sections that follow.

Why Timestone

When designing the successor to Reloaded — our media encoding system — back in 2018 (see “Background” section in The Netflix Cosmos Platform), we needed a priority queueing system that would provide queues between the three components in Cosmos (Figure 1):

  1. the API framework (Optimus),
  2. the forward chaining rule engine (Plato), and
  3. the serverless computing layer (Stratum)
Figure 1. A video encoding application built on top of Cosmos. Notice the three Cosmos subsystems: Optimus, an API layer mapping external requests to internal business models, Plato, a workflow layer for business rule modeling, and Stratum, the serverless layer for running stateless and computational-intensive functions. Source: The Netflix Cosmos Platform

Some of the key requirements this priority queueing system would need to satisfy:

1. A message can only be assigned to one worker at any given time. The work that tends to happen in Cosmos is resource-intensive, and can fan out to thousands of actions. Assume then, that there is replication lag between the replicas in our data store, and we present as dequeueable to worker B the message that was just dequeued by worker A via a different node. When we do that, we waste significant compute cycles. This requirement then throws eventually consistent solutions out of the window, and means we want linearizable consistency at the queue level.

2. Allow for non-parallelizable work.

Given that Plato is continuously polling all workflow queues for more work to execute —

While Plato is executing a workflow for a given project (a request for work on a given service) —

Then Plato should not be able to dequeue additional requests for work for that project on that workflow. Otherwise Plato’s inference engine will evaluate the workflow prematurely, and may move the workflow to an incorrect state.

There exists then, a certain type of work in Cosmos that should not be parallelizable, and the ask is for the queueing system to support this type of access pattern natively. This requirement gave birth to the exclusive queue concept. We explain how exclusive queues work in Timestone in the“Key Concepts” section.

3. Allow for dequeueing and queue depth querying using filters (metadata key-value pairs)

4. Allow for the automatic creation of a queue upon message ingestion

5. Render a message dequeueable within a second of ingestion

We built Timestone because we could not find an off-the-shelf solution that meets these requirements.

System Architecture

Timestone is a gRPC-based service. We use protocol buffers to define the interface of our service and the structure of our request and response messages. The system diagram for the application is shown in Figure 2.

Figure 2. Timestone system diagram. Arrows link all the components touched during a typical Timestone client-server interaction. Numbers in red indicate sequence steps. Identical numbers identify concurrent steps.

System of record

The system of record is a durable Redis cluster. Every write request (see Step 1 — note that this includes dequeue requests since they alter the state of the queue) that reaches the cluster (Step 2) is persisted to a transaction log before a response is sent back to the server (Step 3).

Inside the database, we represent each queue with a sorted set where we rank message IDs (see “Message” section) according to priority. We persist messages and queue configurations (see “Queues” section) in Redis as hashes. All data structures related to a queue — from the messages it contains to the in-memory secondary indexes needed to support dequeue-by-filter — are placed in the same Redis shard. We achieve this by having them share a common prefix, specific to the queue in question. We then codify this prefix as a Redis hash tag. Each message carries a payload (see “Message” section) that can weigh up to 32 KiB.

Almost all of the interactions between Timestone and Redis (see “Message States” section) are codified as Lua scripts. In most of these Lua scripts, we tend to update a number of data structures. Since Redis guarantees that each script is executed atomically, a successful script execution is guaranteed to leave the system in a consistent (in the ACID sense) state.

All API operations are queue-scoped. All API operations that modify state are idempotent.

Secondary indexes

For observability purposes, we capture information about incoming messages and their transition between states in two secondary indexes maintained on Elasticsearch.

When we get a write response from Redis, we concurrently (a) return the response to the client, and (b) convert this response into an event that we post to a Kafka cluster, as shown in Step 4. Two Flink jobs — one for each type of index we maintain — consume the events from the corresponding Kafka topics, and update the indexes in Elasticsearch.

One index (“current”) gives users a best-effort view into the current state of the system, while the other index (“historic”) gives users a best effort longitudinal view for messages, allowing them to trace the messages as they flow through Timestone, and answer questions such as time spent in a state, and number of processing errors. We maintain a version counter for each message; every write operation increments that counter. We rely on that version counter to order the events in the historic index.

Events are stored in the Elasticsearch cluster for a finite number of days.

Current Usage at Netflix

The system is dequeue heavy. We see 30K dequeue requests per second (RPS) with a P99 latency of 45ms. In comparison, we see 1.2K enqueue RPS at 25ms P99 latency. We regularly see 5K RPS enqueue bursts at 85ms P99 latency.

15B messages have been enqueued to Timestone since the beginning of the year; these messages have been dequeued 400B times. Pending messages regularly reach 10M.

Usage is expected to double next year, as we migrate the rest of Reloaded, our legacy media encoding system, to Cosmos.

Key Concepts

Message

A message carries an opaque payload, a user-defined priority (see “Priority” section), an optional (mandatory for exclusive queues) set of metadata key-value pairs that can be used for filter-based dequeueing, and an optional invisibility duration.

Any message that is placed into a queue can be dequeued a finite number of times. We call these attempts; each dequeue invocation on a message decreases the attempts left on it.

Priority

The priority of a message is expressed as an integer value; the lower the value, the higher the priority. While an application is free to use whatever range they see fit, the norm is to use Unix timestamps in milliseconds (e.g. 1661990400000 for 9/1/2022 midnight UTC).

Figure 3. A snippet from the PriorityClass enum used by a streaming encoding pipeline in Cosmos. The values in parentheses indicate the offset in days.

It is also entirely up to the application to define its own priority levels. For instance a streaming encoding pipeline within Cosmos uses mail priority classes, as shown in Figure 3. Messages belonging to the standard class use the time of enqueue as their priority, while all other classes have their priority values adjusted in multiples of ∼10 years. The priority is set at the workflow rule level, but can be overridden if the request carries a studio tag, such as DAY_OF_BROADCAST.

Message States

Within a queue, a Timestone message belongs to one of six states (Figure 4):

  1. invisible
  2. pending
  3. running
  4. completed
  5. canceled
  6. errored

In general, a message can be enqueued with or without invisibility, which makes the message invisible or pending respectively. Invisible messages become pending when their invisibility window elapses.

A worker can dequeue a pending earliest-deadline-first message from a queue by specifying the amount of time (lease duration) they will be processing it for. Dequeueing messages in batch is also supported. This moves the message to the running state.

The same worker can then issue a complete call to Timestone within the allotted lease window to move the message to the completed state, or issue a lease extension call if they want to maintain control of the message. (A worker can also move a typically running message to the canceled state to signal it is no longer need for processing.)

If none of these calls are issued on time, the message becomes dequeueable again, and this attempt on the message is spent. If there are no attempts left on the message, it is moved automatically to the errored state.

The terminal states (completed, errored, and canceled) are garbage-collected periodically by a background process.

Messages can move states either when a worker invokes an API operation, or when Timestone runs its background processes (Figure 4, marked in red — these run periodically). Figure 4 shows the complete state transition diagram.

Figure 4. State transition diagram for Timestone messages.

Queues

All incoming messages are stored in queues. Within a queue, messages are sorted by their priority date. Timestone can host an arbitrary number of user-created queues, and offers a set of API operations for queue management, all revolving around a queue configuration object. Data we store in this object includes the queue’s type (see rest of section), the lease duration that applies to dequeued messages, or the invisibility duration that applies to enqueued messages, the number of times a message can be dequeued, and whether enqueueing or dequeueing is temporarily blocked. Note that a message producer can override the default lease or invisibility duration by setting it at the message level during enqueue.

All queues in Timestone fall into two types, simple, and exclusive.

When an exclusive queue is created, it is associated with a user-defined exclusivity key — for example project. All messages posted to that queue must carry this key in their metadata. For instance, a message with project=foo will be accepted into the queue; a message without the project key will not be. In this example, we call foo, the value that corresponds to the exclusivity key, the message’s exclusivity value.

The contract for exclusive queues is that at any point in time, there can be only up to one consumer per exclusivity value. Therefore, if the project-based exclusive queue in our example has two messages with the key-value pair project=foo in it, and one of them is already leased out to a worker, the other one is not dequeueable. This is depicted in Figure 5.

Figure 5. When worker_2 issues a dequeue call, they lease msg_2 instead of msg_1, even though msg_1 has a higher priority. That happens because the queue is exclusive, and the exclusive value foo is already leased out.

In a simple queue no such contract applies, and there is no tight coupling with message metadata keys. A simple queue works as your typical priority queue, simply ordering messages in an earliest-deadline-first fashion.

What We Are Working On

Some of the things we’re working on:

  1. As the the usage of Timestone within Cosmos grows, so does the need to support a range of queue depth queries. To solve this, we are building a dedicated query service that uses a distinct query model.
  2. As noted above (see “System of record” section), a queue and its contents can only currently occupy one Redis shard. Hot queues however can grow big, esp. when compute capacity is scarce. We want to support arbitrarily large queues, which has us building support for queue sharding.
  3. Messages can carry up to 4 key-value pairs. We currently use all of these key-value pairs to populate the secondary indexes used during dequeue-by-filter. This operation is exponentially complex both in terms of time and space (O(2^n)). We are switching to lexicographical ordering on sorted sets to drop the number of indexes by half, and handle metadata in a more cost-efficient manner.

We may be covering our work on the above in follow-up posts. If these kinds of problems sound interesting to you, and if you like the challenges of building distributed systems for the Netflix Content and Studio ecosystem at scale in general, you should consider joining us.

Acknowledgements

Poorna Reddy, Kostas Christidis, Aravindan Ramkumar, Surafel Korse, Jiaofen Xu, Anoop Panicker, and Kishore Banala have contributed to this project. We thank Charles Zhao, Olof Johansson, Frank San Miguel, Dmitry Vasilyev, Prudhvi Chaganti, and the rest of the Cosmos team for their constructive feedback while developing and operating Timestone.


Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Reinforcement Learning for Budget Constrained Recommendations

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/reinforcement-learning-for-budget-constrained-recommendations-6cbc5263a32a

by Ehtsham Elahi
with
James McInerney, Nathan Kallus, Dario Garcia Garcia and Justin Basilico

Introduction

This writeup is about using reinforcement learning to construct an optimal list of recommendations when the user has a finite time budget to make a decision from the list of recommendations. Working within the time budget introduces an extra resource constraint for the recommender system. It is similar to many other decision problems (for e.g. in economics and operations research) where the entity making the decision has to find tradeoffs in the face of finite resources and multiple (possibly conflicting) objectives. Although time is the most important and finite resource, we think that it is an often ignored aspect of recommendation problems.

In addition to relevance of the recommendations, time budget also determines whether users will accept a recommendation or abandon their search. Consider the scenario that a user comes to the Netflix homepage looking for something to watch. The Netflix homepage provides a large number of recommendations and the user has to evaluate them to choose what to play. The evaluation process may include trying to recognize the show from its box art, watching trailers, reading its synopsis or in some cases reading reviews for the show on some external website. This evaluation process incurs a cost that can be measured in units of time. Different shows will require different amounts of evaluation time. If it’s a popular show like Stranger Things then the user may already be aware of it and may incur very little cost before choosing to play it. Given the limited time budget, the recommendation model should construct a slate of recommendations by considering both the relevance of the items to the user and their evaluation cost. Balancing both of these aspects can be difficult as a highly relevant item may have a much higher evaluation cost and it may not fit within the user’s time budget. Having a successful slate therefore depends on the user’s time budget, relevance of each item as well as their evaluation cost. The goal for the recommendation algorithm therefore is to construct slates that have a higher chance of engagement from the user with a finite time budget. It is important to point out that the user’s time budget, like their preferences, may not be directly observable and the recommender system may have to learn that in addition to the user’s latent preferences.

A typical slate recommender system

We are interested in settings where the user is presented with a slate of recommendations. Many recommender systems rely on a bandit style approach to slate construction. A bandit recommender system constructing a slate of K items may look like the following:

A bandit style recommender system for slate construction

To insert an element at slot k in the slate, the item scorer scores all of the available N items and may make use of the slate constructed so far (slate above) as additional context. The scores are then passed through a sampler (e.g. Epsilon-Greedy) to select an item from the available items. The item scorer and the sampling step are the main components of the recommender system.

Problem formulation

Let’s make the problem of budget constrained recommendations more concrete by considering the following (simplified) setting. The recommender system presents a one dimensional slate (a list) of K items and the user examines the slate sequentially from top to bottom.

A user with a fixed time budget evaluating a slate of recommendations with K items. Item 2 gets the click/response from the user. The item shaded in red falls outside of the user’s time budget.

The user has a time budget which is some positive real valued number. Let’s assume that each item has two features, relevance (a scalar, higher value of relevance means that the item is more relevant) and cost (measured in a unit of time). Evaluating each recommendation consumes from the user’s time budget and the user can no longer browse the slate once the time budget has exhausted. For each item examined, the user makes a probabilistic decision to consume the recommendation by flipping a coin with probability of success proportional to the relevance of the video. Since we want to model the user’s probability of consumption using the relevance feature, it is helpful to think of relevance as a probability directly (between 0 and 1). Clearly the probability to choose something from the slate of recommendations is dependent not only on the relevance of the items but also on the number of items the user is able to examine. A recommendation system trying to maximize the user’s engagement with the slate needs to pack in as many relevant items as possible within the user budget, by making a trade-off between relevance and cost.

Connection with the 0/1 Knapsack problem

Let’s look at it from another perspective. Consider the following definitions for the slate recommendation problem described above

Clearly the abandonment probability is small if the items are highly relevant (high relevance) or the list is long (since the abandonment probability is a product of probabilities). The abandonment option is sometimes referred to as the null choice/arm in bandit literature.

This problem has clear connections with the 0/1 Knapsack problem in theoretical computer science. The goal is to find the subset of items with the highest total utility such that the total cost of the subset is not greater than the user budget. If β_i and c_i are the utility and cost of the i-th item and u is the user budget, then the budget constrained recommendations can be formulated as

0/1 Knapsack formulation for Budget constrained recommendations

There is an additional requirement that optimal subset S be sorted in descending order according to the relevance of items in the subset.

The 0/1 Knapsack problem is a well studied problem and is known to be NP-Complete. There are many approximate solutions to the 0/1 Knapsack problem. In this writeup, we propose to model the budget constrained recommendation problem as a Markov Decision process and use algorithms from reinforcement learning (RL) to find a solution. It will become clear that the RL based solution to budget constrained recommendation problems fits well within the recommender system architecture for slate construction. To begin, we first model the budget constrained recommendation problem as a Markov Decision Process.

Budget constrained recommendations as a Markov Decision Process

In a Markov decision process, the key component is the state evolution of the environment as a function of the current state and the action taken by the agent. In the MDP formulation of this problem, the agent is the recommender system and the environment is the user interacting with the recommender system. The agent constructs a slate of K items by repeatedly selecting actions it deems appropriate at each slot in the slate. The state of the environment/user is characterized by the available time budget and the items examined in the slate at a particular step in the slate browsing process. Specifically, the following table defines the Markov Decision Process for the budget constrained recommendation problem,

Markov Decision Process for Budget constrained recommendations

In real world recommender systems, the user budget may not be observable. This problem can be solved by computing an estimate of the user budget from historical data (e.g. how long the user scrolled before abandoning in the historical data logs). In this writeup, we assume that the recommender system/agent has access to the user budget for sake of simplicity.

The slate generation task above is an episodic task i-e the recommender agent is tasked with choosing K items in the slate. The user provides feedback by choosing one or zero items from the slate. This can be viewed as a binary reward r per item in the slate. Let π be the recommender policy generating the slate and γ be the reward discount factor, we can then define the discounted return for each state, action pair as,

State, Action Value function estimation

The reinforcement learning algorithm we employ is based on estimating this return using a model. Specifically, we use Temporal Difference learning TD(0) to estimate the value function. Temporal difference learning uses Bellman’s equation to define the value function of current state and action in terms of value function of future state and action.

Bellman’s equation for state, action value function

Based on this Bellman’s equation, a squared loss for TD-Learning is,

Loss Function for TD(0) Learning

The loss function can be minimized using semi-gradient based methods. Once we have a model for q, we can use that as the item scorer in the above slate recommender system architecture. If the discount factor γ =0, the return for each (state, action) pair is simply the immediate user feedback r. Therefore q with γ = 0 corresponds to an item scorer for a contextual bandit agent whereas for γ > 0, the recommender corresponds to a (value function based) RL agent. Therefore simply using the model for the value function as the item scorer in the above system architecture makes it very easy to use an RL based solution.

Budget constrained Recommendation Simulation

As in other applications of RL, we find simulations to be a helpful tool for studying this problem. Below we describe the generative process for the simulation data,

Generative model for simulated data

Note that, instead of sampling the per-item Bernoulli, we can alternatively sample once from a categorical distribution with relative relevances for items and a fixed weight for the null arm. The above generative process for simulated data depends on many hyper-parameters (loc, scale etc.). Each setting of these hyper-parameters results in a different simulated dataset and it’s easy to realize many simulated datasets in parallel. For the experiments below, we fix the hyper-parameters for the cost and relevance distributions and sweep over the initial user budget distribution’s location parameter. The attached notebook contains the exact settings of the hyper-parameters used for the simulations.

Metric

A slate recommendation algorithm generates slates and then the user model is used to predict the success/failure of each slate. Given the simulation data, we can train various recommendation algorithms and compare their performance using a simple metric as the average number of successes of the generated slates (referred to as play-rate below). In addition to play-rate, we look at the effective-slate-size as well, which we define to be the number of items in the slate that fit the user’s time budget. As mentioned earlier, one of the ways play-rate can be improved is by constructing larger effective slates (with relevant items of-course) so looking at this metric helps understand the mechanism of the recommendation algorithms.

On-policy learning results

Given the flexibility of working in the simulation setting, we can learn to construct optimal slates in an on-policy manner. For this, we start with some initial random model for the value function, generate slates from it, get user feedback (using the user model) and then update the value function model using the feedback and keep repeating this loop until the value function model converges. This is known as the SARSA algorithm.

The following set of results show how the learned recommender policies behave in terms of metric of success, play-rate for different settings of the user budget distributions’s location parameter and the discount factor. In addition to the play rate, we also show the effective slate size, average number of items that fit within the user budget. While the play rate changes are statistically insignificant (the shaded areas are the 95% confidence intervals estimated using bootstrapping simulations 100 times), we see a clear trend in the increase in the effective slate size (γ > 0) compared to the contextual bandit (γ= 0)

Play-Rate and Effective slate sizes for different User Budget distributions. The user budget distribution’s location is on the same scale of the item cost and we are looking for changes in the metrics as we make changes to the user budget distribution

We can actually get a more statistically sensitive result by comparing the result of the contextual bandit with an RL model for each simulation setting (similar to a paired comparison in paired t-test). Below we show the change in play rate (delta play rate) between any RL model (shown with γ = 0.8 below as an example) and a contextual bandit (γ = 0). We compare the change in this metric for different user budget distributions. By performing this paired comparison, we see a statistically significant lift in play rate for small to medium budget user budget ranges. This makes intuitive sense as we would expect both approaches to work equally well when the user budget is too large (therefore the item’s cost is irrelevant) and the RL algorithm only outperforms the contextual bandit when the user budget is limited and finding the trade-off between relevance and cost is important. The increase in the effective slate size is even more dramatic. This result clearly shows that the RL agent is performing better by minimizing the abandonment probability by packing more items within the user budget.

Paired comparison between RL and Contextual bandit. For limited user budget settings, we see statistically significant lift in play rate for the RL algorithm.

Off-policy learning results

So far the results have shown that in the budget constrained setting, reinforcement learning outperforms contextual bandit. These results have been for the on-policy learning setting which is very easy to simulate but difficult to execute in realistic recommender settings. In a realistic recommender, we have data generated by a different policy (called a behavior policy) and we want to learn a new and better policy from this data (called the target policy). This is called the off-policy setting. Q-Learning is one well known technique that allows us to learn optimal value function in an off-policy setting. The loss function for Q-Learning is very similar to the TD(0) loss except that it uses Bellman’s optimality equation instead

Loss function for Q-Learning

This loss can again be minimized using semi-gradient techniques. We estimate the optimal value function using Q-Learning and compare its performance with the optimal policy learned using the on-policy SARSA setup. For this, we generate slates using Q-Learning based optimal value function model and compare the play-rate with the slates generated using the optimal policy learned with SARSA. Below is result of the paired comparison between SARSA and Q-Learning,

Paired comparison between Q-Learning and SARSA. Play rates are similar between the two approaches but effective slate sizes are very different.

In this result, the change in the play-rate between on-policy and off-policy models is close to zero (see the error bars crossing the zero-axis). This is a favorable result as this shows that Q-Learning results in similar performance as the on-policy algorithm. However, the effective slate size is quite different between Q-Learning and SARSA. Q-Learning seems to be generating very large effective slate sizes without much difference in the play rate. This is an intriguing result and needs a little more investigation to fully uncover. We hope to spend more time understanding this result in future.

Conclusion:

To conclude, in this writeup we presented the budget constrained recommendation problem and showed that in order to generate slates with higher chances of success, a recommender system has to balance both the relevance and cost of items so that more of the slate fits within the user’s time budget. We showed that the problem of budget constrained recommendation can be modeled as a Markov Decision Process and we can find a solution to optimal slate construction under budget constraints using reinforcement learning based methods. We showed that the RL outperforms contextual bandits in this problem setting. Moreover, we compared the performance of On-policy and Off-policy approaches and found the results to be comparable in terms of metrics of success.

Code

Github repo


Reinforcement Learning for Budget Constrained Recommendations was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Virtual Production — A Validation Framework For Unreal Engine

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/virtual-production-a-validation-framework-for-unreal-engine-aab780b2f8c8

Virtual Production — A Validation Framework For Unreal Engine

By Adam Davis, Jimmy Fusil, Bhanu Srikanth and Girish Balakrishnan

Game Engines in Virtual Production

The use of Virtual Production and real time technologies has markedly accelerated in the past few years. At Netflix, we are always thrilled to see technology enable new ways of telling stories, and the use of these techniques on some of our shows like 1899 and Super Giant Robot Brothers has given us a front row seat to this exciting evolution in filmmaking. Each production that deploys these methods is an opportunity for the crew, tech manufacturers and us–the Netflix Production Innovation team–to learn, innovate and collaborate towards a common goal: universally accessible workflows that will enable creative opportunities and technical success for all filmmakers regardless of the size, location or scope of their project.

Game engines have been a major part of driving the democratization of advanced virtual production techniques such as pre-visualization and in-camera visual effects. While these platforms were initially developed to build games, their open and versatile design and development environment offer other capabilities. In the hands of film and TV artists and technicians–combined with techniques and technologies such as LED Panels and tracking systems–they’ve become powerful creation tools for filmmakers.

The incredible depth and flexibility of game engines also means there are infinite permutations and combinations of configurations and settings available to the operator. But not getting the configuration right can have a significant impact on the results. Could we harness the benefits of this depth and flexibility while also supporting the predictability and repeatability expected from high pressure shooting environments? Or put more simply: could we offer productions a shortcut to achieving the right configuration for their scenario every time?

The Netflix Production Innovation Team is always asking itself versions of this question: “How do we ensure a high level of creative opportunity and flexibility, while achieving technical consistency and quality? How do we reduce the potential for accidents and errors, while making sure artists and technicians still have the agency to stay true to their vision and their practice?” One past approach to enabling excellence while mitigating risk has been creating knowledge resources for our productions and partners, while also working with the industry to develop standards and best practices. But in this instance, the open development framework of the game engine itself, as well as its central place in Virtual Production workflows, offered another compelling solution.

The Validation Framework as Helper, Teacher and Enabler

Many large scale operations use quality control methods and validation frameworks to ensure successful outcomes. Validation frameworks can act as a flexible and unobtrusive assistant, while offering a safety net for the operator. We’ve had success with this type of validation in the past with Photon, which acted as a library, reference implementation and validation tool for IMF packages, the delivery format for all of our Netflix original programming.

An excellent validation framework is designed with the following characteristics:

  1. It is informative and instructive, providing useful information as to why an issue has arisen, what problems will be caused if not resolved, and what can be done to resolve it.
  2. It acts as dynamic documentation, a checklist, and a co-pilot to allow the operator to focus on the creative aspects.
  3. It promotes standards and consistency across individuals and teams, regardless of how bespoke the workflow is, and allows them to operate along a common and agreed set of expectations and checks.
  4. It enhances efficiency in complex interconnected workflows by looking across multiple components, roles and responsibilities and reporting on the system as a whole.

With such a validation framework in place, we can avoid:

  • Locking users behind layers of confining procedures and tooling, which can limit the evolution of new ways of working by constraining the options available to the operator and creator to adapt to new scenarios, as well as preventing them from learning what’s really going on under the hood.
  • Creating piles of documentation, sticky notes, runbooks… While all of that material is often useful as learning aids or during prep, it’s rarely practical to be flicking through pages of documentation to solve a problem during production.

Furthermore, by incorporating the validation framework within the game engine itself, we could ensure that everything is set up for success, maintaining flexibility while minimizing the risk of error. Because when the camera rolls, creators don’t want to assume everything is ok; they need to know that all the possible steps were taken to prevent errors.

Netflix’s Unreal Engine Validation Framework

Image of the Netflix Unreal Engine Validation Framework User Interface

Functionality

Our Validation Framework, which we have developed as a plugin to Unreal Engine, is extensible and customizable. It hosts and manages automated validation checks and fixes, which help identify and address technical problems within a given workflow.

The Validation Framework builds upon Unreal’s EditorUtility functionality and provides a simple base–the Validation Base–from which all other validations are built. The core also provides a registration to find all validations built atop this base within the Unreal project itself. This allows us to execute the validations from either blueprints or from C++, serving a variety of users: from developers extending the execution into a CI/CD from the C++ side, to an artist executing via a widget or the UI in the editor.

Regardless of how many validations there are or where in the project they live, they are always accessible and can be run from either entry point thanks to the registration mechanics. This gives teams a lot of freedom and flexibility as to how they extend validations into their workflows. For example, a core library of validations can be shared across projects within a company, or across a production–including with other vendors. Or project-specific validations can be deployed, encapsulated within an Unreal project until it is delivered.

While we aim to keep as much exposed in blueprints as possible, having a thin underlying C++ layer is a convenient way to grant the framework access to some of the objects, settings and parameters which are inaccessible via blueprinting alone.

All of the Validations provide two simple hooks: the Validation itself, which checks something, and a Fix, which can apply a correction. What the validation framework checks for, and what and how failures are fixed is entirely up to the artists and developers!

Validations can be grouped to a dedicated Scope, specifically either Level or Project. This enables a hierarchy in the assignment of validations, as some will be applied on Project settings and configurations, while others will be inspecting the content of the Levels.

A second layer of organizational tagging can then be applied to create Workflows. While the validations can only apply to a single Scope (either Level or Project), they can belong to multiple workflows, each with its own set of validations. Several workflow “presets” are available in the initial release, along with the capability for users to define their own workflows.

Users can also define new validations, but the tool requires that they provide a description of what they are checking for, as well as a description of what and how non-conformities (invalid settings or configurations) can be corrected. While we can’t police it, we encourage users to describe not only what the new validation checks, but also why, as well as how failing to resolve non-conformities will impact the workflow results. This will help ensure that the validation framework is useful not only as a risk management tool but also as an educational resource.

The UI is built almost entirely in blueprints, and utilizes a few helper blueprint nodes implemented in C++. Again, this allows users to use what’s there, replace it with their own, or integrate it into their existing UIs.

Finally, the framework generates validation reports, in either JSON or CSV format. Each time it runs, it writes a validation report for the project/level that was just inspected. This allows users to share results with others, such as support teams, or create a record of configurations for reporting or archival purposes.

We focused our initial development on a validation tool within Epic’s Unreal Engine because of its rapid adoption across our global slate of movies and shows, and the vendors that support them. This plug-in approach is also well aligned with Epic Games’ toolkit philosophy: being open to productions of all shapes, sizes and experience levels. Given this wide applicability, we saw a lot of benefit in an easy-to-implement utility that addresses the most prevalent “gotchas” in Unreal project set-ups for Virtual Production projects.

In-Camera VFX Validations

The plugin ships with a set of validations for the most commonly required checks and fixes for inspecting configurations aimed at executing ICVFX techniques. This set of validations is the result of a collaborative effort between the Netflix Production Innovation team and Epic Games; it is built from our collective knowledge and experience on ICVFX productions.

We initially focused only on LED In-Camera VFX (ICVFX) applications. The goal is to help production teams catch and resolve common issues which can cause unexpected render results, data outputs, and potential performance or resource allocation challenges when working with Unreal Engine. Many of the issues this tool mitigates can sometimes be hard to spot, particularly in the complex and high pressure environment of a virtual production set.

That said, the Validation Framework is not solely for use on-set; it can also be utilized during the prep and content build phases, ensuring issues are identified and handled early, before moving further along the production pipeline. The framework can also easily be extended by users to encompass bespoke workflows with custom validations and fixes, allowing teams to create and perform their own unique checks.

Current Usage and Next Steps

The Validation Framework is currently being used on a number of Netflix shows, and has already significantly reduced the amount of time needed for system setup and troubleshooting. We have also received great feedback from our crews, which has led us to add new checks and fixes. Some of our partners have even begun adapting and extending this framework to better suit their proprietary workflows.

In the future, we would like to see such frameworks become commonly used in virtual production environments in two ways:

  1. Continued implementation and extension of this particular validation framework by Virtual Production crews and service providers;
  2. Adaptation of this automated validation approach to assist with the entire stage environment, even where Unreal is not part of the workflow: checking the settings on media servers, LED processors, workstations, tracking systems… all of which play a critical part in a Virtual Production system. In order for that to happen, we need to ensure that this wider ecosystem of devices can also be queried and monitored procedurally, which in turn will free crews to focus on the creative work, and not worry so much about the tech.

Availability

The validation framework can be found at: https://github.com/Netflix-Skunkworks/UnrealValidationFramework

The system can be integrated into pipelines and additional tooling around CI/CD to generate validation reports.

We look forward to receiving your feedback and suggestions via the “Issues” tab on our git repo (above).


Virtual Production — A Validation Framework For Unreal Engine was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Mesh — A Data Movement and Processing Platform @ Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-mesh-a-data-movement-and-processing-platform-netflix-1288bcab2873

Data Mesh — A Data Movement and Processing Platform @ Netflix

By Bo Lei, Guilherme Pires, James Shao, Kasturi Chatterjee, Sujay Jain, Vlad Sydorenko

Background

Realtime processing technologies (A.K.A stream processing) is one of the key factors that enable Netflix to maintain its leading position in the competition of entertaining our users. Our previous generation of streaming pipeline solution Keystone has a proven track record of serving multiple of our key business needs. However, as we expand our offerings and try out new ideas, there’s a growing need to unlock other emerging use cases that were not yet covered by Keystone. After evaluating the options, the team has decided to create Data Mesh as our next generation data pipeline solution.

Last year we wrote a blog post about how Data Mesh helped our Studio team enable data movement use cases. A year has passed, Data Mesh has reached its first major milestone and its scope keeps increasing. As a growing number of use cases on board to it, we have a lot more to share. We will deliver a series of articles that cover different aspects of Data Mesh and what we have learned from our journey. This article gives an overview of the system. The following ones will dive deeper into different aspects of it.

Data Mesh Overview

A New Definition Of Data Mesh

Previously, we defined Data Mesh as a fully managed, streaming data pipeline product used for enabling Change Data Capture (CDC) use cases. As the system evolves to solve more and more use cases, we have expanded its scope to handle not only the CDC use cases but also more general data movement and processing use cases such that:

  • Events can be sourced from more generic applications (not only databases).
  • The catalog of available DB connectors is growing (CockroachDB, Cassandra for example)
  • More Processing patterns such as filter, projection, union, join, etc.

As a result, today we define Data Mesh as a general purpose data movement and processing platform for moving data between Netflix systems at scale.

Overall Architecture

The Data Mesh system can be divided into the control plane (Data Mesh Controller) and the data plane (Data Mesh Pipeline). The controller receives user requests, deploys and orchestrates pipelines. Once deployed, the pipeline performs the actual heavy lifting data processing work. Provisioning a pipeline involves different resources. The controller delegates the responsibility to the corresponding microservices to manage their life cycle.

Pipelines

A Data Mesh pipeline reads data from various sources, applies transformations on the incoming events and eventually sinks them into the destination data store. A pipeline can be created from the UI or via our declarative API. On the creation/update request the controller figures out the resources associated with the pipeline and calculates the proper configuration for each of them.

Connectors

A source connector is a Data Mesh managed producer. It monitors the source database’s bin log and produces CDC events to the Data Mesh source fronting Kafka topic. It is able to talk to the Data Mesh controller to automatically create/update the sources.

Previously we only had RDS source connectors to listen to MySQL and Postgres using the DBLog library; Now we have added Cockroach DB source connectors and Cassandra source connectors. They use different mechanisms to stream events out of the source databases. We’ll have blog posts deep dive into them.

In addition to managed connectors, application owners can emit events via a common library, which can be used in circumstances where a DB connector is not yet available or there is a preference to emit domain events without coupling with a DB schema.

Sources

Application developers can expose their domain data in a centralized catalog of Sources. This allows data sharing as multiple teams at Netflix may be interested in receiving changes for an entity. In addition, a Source can be defined as a result of a series of processing steps — for example an enriched Movie entity with several dimensions (such as the list of Talents) that further can be indexed to fulfill search use cases.

Processors

A processor is a Flink Job. It contains a reusable unit of data processing logic. It reads events from the upstream transports and applies some business logic to each of them. An intermediate processor writes data to another transport. A sink processor writes data to an external system such as Iceberg, ElasticSearch, or a separate discoverable Kafka topic.

We have provided a Processor SDK to help the advanced users to develop their own processors. Processors developed by Netflix developers outside our team can also be registered to the platform and work with other processors in a pipeline. Once a processor is registered, the platform also automatically sets up a default alert UI and metrics dashboard

Transports

We use Kafka as the transportation layer for the interconnected processors to communicate. The output events of the upstream processor are written to a Kafka topic, and the downstream processors read their input events from there.

Kafka topics can also be shared across pipelines. A topic in pipeline #1 that holds the output of its upstream processor can be used as the source in pipeline #2. We frequently see use cases where some intermediate output data is needed by different consumers. This design enables us to reuse and share data as much as possible. We have also implemented the features to track the data lineage so that our users can have a better picture of the overall data usage.

Schema

Data Mesh enforces schema on all the pipelines, meaning we require all the events passing through the pipelines to conform to a predefined template. We’re using Avro as a shared format for all our schemas, as it’s simple, powerful, and widely adopted by the community..

We make schema as the first class citizen in Data Mesh due to the following reasons:

  • Better data quality: Only events that comply with the schema can be encoded. Gives the consumer more confidence.
  • Finer granularity of data lineage: The platform is able to track how fields are consumed by different consumers and surface it on the UI.
  • Data discovery: Schema describes data sets and enables the users to browse different data sets and find the dataset of interest.

On pipeline creation, each processor in that pipeline needs to define what schema it consumes and produces. The platform handles the schema validation and compatibility check. We have also built automation around handling schema evolution. If the schema is changed at the source, the platform tries to upgrade the consuming pipelines automatically without human intervention.

Future

Data Mesh Initially started as a project to solve our Change Data Capture needs. Over the past year, we have observed an increasing demand for all sorts of needs in other domains such as Machine Learning, Logging, etc. Today, Data Mesh is still in its early stage and there are just so many interesting problems yet to be solved. Below are the highlights of some of the high priority tasks on our roadmap.

Making Data Mesh The Paved Path (Recommended Solution) For Data Movement And Processing

As mentioned above, Data Mesh is meant to be the next generation of Netflix’s real-time data pipeline solution. As of now, we still have several specialized internal systems serving their own use cases. To streamline the offering, it makes sense to gradually migrate those use cases onto Data Mesh. We are currently working hard to make sure that Data Mesh can achieve feature parity to Delta and Keystone. In addition, we also want to add support for more sources and sinks to unlock a wide range of data integration use cases.

More Processing Patterns And Better Efficiency

People use Data Mesh not only to move data. They often also want to process or transform their data along the way. Another high priority task for us is to make more common processing patterns available to our users. Since by default a processor is a Flink job, having each simple processor doing their work in their own Flink jobs can be less efficient. We are also exploring ways to merge multiple processing patterns into one single Flink job.

Broader support for Connectors

We are frequently asked by our users if Data Mesh is able to get data out of datastore X and land it into datastore Y. Today we support certain sources and sinks but it’s far from enough. The demand for more types of connectors is just enormous and we see a big opportunity ahead of us and that’s definitely something we also want to invest on.

Data Mesh is a complex yet powerful system. We believe that as it gains its maturity, it will be instrumental in Netflix’s future success. Again, we are still at the beginning of our journey and we are excited about the upcoming opportunities. In the following months, we’ll publish more articles discussing different aspects of Data Mesh. Please stay tuned!

The Team

Data Mesh wouldn’t be possible without the hard work and great contributions from the team. Special thanks should go to our stunning colleagues:

Bronwyn Dunn, Jordan Hunt, Kevin Zhu, Pradeep Kumar Vikraman, Santosh Kalidindi, Satyajit Thadeshwar, Tom Lee, Wei Liu


Data Mesh — A Data Movement and Processing Platform @ Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Formulating ‘Out of Memory Kill’ Prediction on the Netflix App as a Machine Learning Problem

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/formulating-out-of-memory-kill-prediction-on-the-netflix-app-as-a-machine-learning-problem-989599029109

by Aryan Mehra
with
Farnaz Karimdady Sharifabad, Prasanna Vijayanathan, Chaïna Wade, Vishal Sharma and Mike Schassberger

Aim and Purpose — Problem Statement

The purpose of this article is to give insights into analyzing and predicting “out of memory” or OOM kills on the Netflix App. Unlike strong compute devices, TVs and set top boxes usually have stronger memory constraints. More importantly, the low resource availability or “out of memory” scenario is one of the common reasons for crashes/kills. We at Netflix, as a streaming service running on millions of devices, have a tremendous amount of data about device capabilities/characteristics and runtime data in our big data platform. With large data, comes the opportunity to leverage the data for predictive and classification based analysis. Specifically, if we are able to predict or analyze the Out of Memory kills, we can take device specific actions to pre-emptively lower the performance in favor of not crashing — aiming to give the user the ultimate Netflix Experience within the “performance vs pre-emptive action” tradeoff limitations. A major advantage of prediction and taking pre-emptive action, is the fact that we can take actions to better the user experience.

This is done by first elaborating on the dataset curation stage — specially focussing on device capabilities and OOM kill related memory readings. We also highlight steps and guidelines for exploratory analysis and prediction to understand Out of Memory kills on a sample set of devices. Since memory management is not something one usually associates with classification problems, this blog focuses on formulating the problem as an ML problem and the data engineering that goes along with it. We also explore graphical analysis of the labeled dataset and suggest some feature engineering and accuracy measures for future exploration.

Challenges of Dataset Curation and Labeling

Unlike other Machine Learning tasks, OOM kill prediction is tricky because the dataset will be polled from different sources — device characteristics come from our on-field knowledge and runtime memory data comes from real-time user data pushed to our servers.

Secondly, and more importantly, the sheer volume of the runtime data is a lot. Several devices running Netflix will log memory usage at fixed intervals. Since the Netflix App does not get killed very often (fortunately!), this means most of these entries represent normal/ideal/as expected runtime states. The dataset will thus be very biased/skewed. We will soon see how we actually label which entries are erroneous and which are not.

Dataset Features and Components

The schema figure above describes the two components of the dataset — device capabilities/characteristics and runtime memory data. When joined together based on attributes that can uniquely match the memory entry with its device’s capabilities. These attributes may be different for different streaming services — for us at Netflix, this is a combination of the device type, app session ID and software development kit version (SDK version). We now explore each of these components individually, while highlighting the nuances of the data pipeline and pre-processing.

Device Capabilities

All the device capabilities may not reside in one source table — requiring multiple if not several joins to gather the data. While creating the device capability table, we decided to primary index it through a composite key of (device type ID, SDK version). So given these two attributes, Netflix can uniquely identify several of the device capabilities. Some nuances while creating this dataset come from the on-field domain knowledge of our engineers. Some features (as an example) include Device Type ID, SDK Version, Buffer Sizes, Cache Capacities, UI resolution, Chipset Manufacturer and Brand.

Major Milestones in Data Engineering for Device Characteristics

Structuring the data in an ML-consumable format: The device capability data needed for the prediction was distributed in over three different schemas across the Big Data Platform. Joining them together and building a single indexable schema that can directly become a part of a bigger data pipeline is a big milestone.

Dealing with ambiguities and missing data: Sometimes the entries in BDP are contaminated with testing entries and NULL values, along with ambiguous values that have no meaning or just simply contradictory values due to unreal test environments. We deal with all of this by a simple majority voting (statistical mode) on the view that is indexed by the device type ID and SDK version from the user query. We thus verify the hypothesis that actual device characteristics are always in majority in the data lake.

Incorporating On-site and field knowledge of devices and engineers: This is probably the single most important achievement of the task because some of the features mentioned above (and some of the ones redacted) involved engineering the features manually. Example: Missing values or NULL values might mean the absence of a flag or feature in some attribute, while it might require extra tasks in others. So if we have a missing value for a feature flag, that might mean “False”, whereas a missing value in some buffer size feature might mean that we need subqueries to fetch and fill the missing data.

Runtime Memory, OOM Kill Data and ground truth labeling

Runtime data is always increasing and constantly evolving. The tables and views we use are refreshed every 24 hours and joining between any two such tables will lead to tremendous compute and time resources. In order to curate this part of the dataset, we suggest some tips given below (written from the point of view of SparkSQL-like distributed query processors):

  • Filtering the entries (conditions) before JOIN, and for this purpose using WHERE and LEFT JOIN clauses carefully. Conditions that eliminate entries after the join operation are much more expensive than when elimination happens before the join. It also prevents the system running out of memory during execution of the query.
  • Restricting Testing and Analysis to one day and device at a time. It is always good to pick a single high frequency day like New Years, or Memorial day, etc. to increase frequency counts and get normalized distributions across various features.
  • Striking a balance between driver and executor memory configurations in SparkSQL-like systems. Too high allocations may fail and restrict system processes. Too low memory allocations may fail at the time of a local collect or when the driver tries to accumulate the results.

Labeling the data — Ground Truth

An important aspect of the dataset is to understand what features will be available to us at inference time. Thus memory data (that contains the navigational level and memory reading) can be labeled using the OOM kill data, but the latter cannot be reflected in the input features. The best way to do this is to use a sliding window approach where we label the memory readings of the sessions in a fixed window before the OOM kill as erroneous, and the rest of the entries as non-erroneous. In order to make the labeling more granular, and bring more variation in a binary classification model, we propose a graded window approach as explained by the image below. Basically, it assigns higher levels to memory readings closer to the OOM kill, making it a multi-class classification problem. Level 4 is the most near to the OOM kill (range of 2 minutes), whereas Level 0 is beyond 5 minutes of any OOM kill ahead of it. We note here that the device and session of the OOM kill instance and the memory reading needs to match for the sanity of the labeling. Later the confusion matrix and model’s results can later be reduced to binary if need be.

Summary of OOM Prediction — Problem Formulation

The dataset now consists of several entries — each of which has certain runtime features (navigational level and memory reading in our case) and device characteristics (a mix of over 15 features that may be numerical, boolean or categorical). The output variable is the graded or ungraded classification variable which is labeled in accordance with the section above — primarily based on the nearness of the memory reading stamp to the OOM kill. Now we can use any multi-class classification algorithm — ANNs, XGBoost, AdaBoost, ElasticNet with softmax etc. Thus we have successfully formulated the problem of OOM kill prediction for a device streaming Netflix.

Data Analysis and Observations

Without diving very deep into the actual devices and results of the classification, we now show some examples of how we could use the structured data for some preliminary analysis and make observations. We do so by just looking at the peak of OOM kills in a distribution over the memory readings within 5 minutes prior to the kill.

Different device types

From the graph above, we show how even without doing any modeling, the structured data can give us immense knowledge about the memory domain. For example, the early peaks (marked in red) are mostly crashes not visible to users, but were marked erroneously as user-facing crashes. The peaks marked in green are real user-facing crashes. Device 2 is an example of a sharp peak towards the higher memory range, with a decline that is sharp and almost no entries after the peak ends. Hence, for Device 1 and 2, the task of OOM prediction is relatively easier, after which we can start taking pre-emptive action to lower our memory usage. In case of Device 3, we have a normalized gaussian like distribution — indicating that the OOM kills occur all over, with the decline not being very sharp, and the crashes happen all over in an approximately normalized fashion.

Feature Engineering, Accuracy Measures and Future Work Directions

We leave the reader with some ideas to engineer more features and accuracy measures specific to the memory usage context in a streaming environment for a device.

  • We could manually engineer features on memory to utilize the time-series nature of the memory value when aggregated over a user’s session. Suggestions include a running mean of the last 3 values, or a difference of the current entry and running exponential average. The analysis of the growth of memory by the user could give insights into whether the kill was caused by in-app streaming demand, or due to external factors.
  • Another feature could be the time spent in different navigational levels. Internally, the app caches several pre-fetched data, images, descriptions etc, and the time spent in the level could indicate whether or not those caches are cleared.
  • When deciding on accuracy measures for the problem, it is important to analyze the distinction between false positives and false negatives. The dataset (fortunately for Netflix!) will be highly biased — as an example, over 99.1% entries are non-kill related. In general, false negatives (not predicting the kill when actually the app is killed) are more detrimental than false positives (predicting a kill even though the app could have survived). This is because since the kill happens rarely (0.9% in this example), even if we end up lowering memory and performance 2% of the time and catch almost all the 0.9% OOM kills, we will have eliminated approximately. all OOM kills with the tradeoff of lowering the performance/clearing the cache an extra 1.1% of the time (False Positives).

Summary

This post has focussed on throwing light on dataset curation and engineering when dealing with memory and low resource crashes for streaming services on device. We also cover the distinction between non-changing attributes and runtime attributes and strategies to join them to make one cohesive dataset for OOM kill prediction. We covered labeling strategies that involved graded window based approaches and explored some graphical analysis on the structured dataset. Finally, we ended with some future directions and possibilities for feature engineering and accuracy measurements in the memory context.

Stay tuned for further posts on memory management and the use of ML modeling to deal with systemic and low latency data collected at the device level. We will try to soon post results of our models on the dataset that we have created.

Acknowledgements
I would like to thank the members of various teams — Partner Engineering (Mihir Daftari, Akshay Garg), TVUI team (Andrew Eichacker, Jason Munning), Streaming Data Team, Big Data Platform Team, Device Ecosystem Team and Data Science Engineering Team (Chris Pham), for all their support.


Formulating ‘Out of Memory Kill’ Prediction on the Netflix App as a Machine Learning Problem was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Netflix Content Engineering makes a federated graph searchable (Part 2)

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-content-engineering-makes-a-federated-graph-searchable-part-2-49348511c06c

By Alex Hutter, Falguni Jhaveri, and Senthil Sayeebaba

In a previous post, we described the indexing architecture of Studio Search and how we scaled the architecture by building a config-driven self-service platform that allowed teams in Content Engineering to spin up search indices easily.

This post will discuss how Studio Search supports querying the data available in these indices.

Data consumption from Studio Search DGS

Introduction

When we say Content Engineering teams are interested in searching against the federated graph, the use-case is mainly focused on known-item search (a user has an item or items in mind they are trying to view or navigate to but need to use an external information system to locate them) and data retrieval (typically the data is structured and there is no ambiguity as to whether a particular record matches the given search criteria except in the case of textual fields where there is limited ambiguity) within a vertical search experience (focus on enabling search for a specific sub-graph within the big federated graph)

Query Language

Given the above scope of the search (vertical search experience with a focus on known-item search and data retrieval), one of the first things we had to design was a language that users can use to easily express their search criteria. With a goal of abstracting users away from the complexity of interacting with Elasticsearch directly, we landed on a custom Studio Search DSL reminiscent of SQL.

The DSL supports specifying the search criteria as comparison expressions or inclusion/exclusion filters. The filter expressions can be combined together through logical operators (AND, OR, NOT) and grouped together through parentheses.

Sample Syntax

For example, to find all comedies from France or Spain, the query would be:

(genre == ‘comedy’) AND (country ANY [‘FR’, ‘SP’])

We used ANTLR to build the grammar for the Query DSL. From the grammar, ANTLR generates a parser that can walk the parse tree. By extending the ANTLR generated parse tree visitor, we were able to implement an Elasticsearch Query Builder component with the logic to generate the Elasticsearch query corresponding to the custom search query.

If you are familiar with Elasticsearch, then you might be familiar with how complicated it can be to build up the correct Elasticsearch query for complex queries, especially if the index includes nested JSON documents which add an additional layer of complexity with respect to building nested queries (Incorrectly constructed nested queries can lead to Elasticsearch quietly returning wrong results). By exposing just a generic query language to the users and isolating the complexity to just our Elasticsearch Query Builder, we have been able to empower users to write search queries without requiring familiarity with Elasticsearch. This also leaves the possibility of swapping Elasticsearch with a different search engine in the future.

One other challenge for the users when writing the search queries is to understand the fields that are available in the index and the associated types. Since we index the data as-is from the federated graph, the indexing query itself acts as self-documentation. For example, given the indexing query –

Sample GraphQL query

To find movies based on the actors’ roles, the query filter is simply

`actors.role == ‘actor’`

Text Search

While the search DSL provides a powerful way to help narrow the scope of the search queries, users can also find documents in the index through free form text — either with just the input text or in combination with a filter expression in the search DSL. Behind the scenes during the indexing process, we have configured the Elasticsearch index with the appropriate analyzers to ensure that the most relevant matches for the input text are returned in the results.

Hydration through Federation

Given the wide adoption of the federated gateway within Content Engineering, we decided to implement the Studio Search service as a DGS (Domain Graph Service) that integrated with the federated gateway. The search APIs (besides search, we have other APIs to support faceted search, typeahead suggestions, etc) are exposed as GraphQL queries within the federated graph.

This integration with the federation gateway allows the search DGS to just return the matching entity keys from the search index instead of the whole matching document(s). Through the power of federation, users are then able to hydrate the search results with any data available in the federated graph. This allows the search indices to be lean by indexing only the fields necessary for the search experience and at the same time provides complete flexibility for the users to fetch any data available in the federated graph instead of being restricted to just the data available in the search index.

Example

Sample Search query

In the above example, users are able to fetch the production schedule as part of the search results even though the search index doesn’t hold that data.

Authorization

With the API to query the data in the search indices in place, the next thing we needed to tackle was figuring out how to secure access to the data in the indices. With several of the indices including sensitive data, and the source teams already having restrictive access policies in place to secure the data they own, the search indices which hosted a secondary copy of the source data needed to be secured as well.

We chose to apply “late binding” (or “query time”) security — on every incoming search query, we make an API call to the centralized access policy server with context including the identity of the caller making the request and the search index they are trying to access. The policy server evaluates the access policies defined by the source teams and returns a set of constraints. Ex. The caller has access to Movies where the type is ‘licensed’ (The caller does not have access to Netflix-produced content, but just the licensed content). The constraints are then translated to a set of filter expressions in the search query DSL format (Ex. movie.type == ‘licensed’) and combined with the user-specified search filter with a logical AND operator to form a new search query that then gets executed against the index.

By adding on the access constraints as additional filters before executing the query, we ensure that the user gets back only the data they have access to from the underlying search index. This also allows source teams to evolve their access policies independently knowing that the correct constraints will be applied at query time.

Customizing Search

With the decision to build Studio Search as a GraphQL service using the DGS framework and relying on federation for hydrating results, onboarding new search indices required updating various portions of the GraphQL schema (the enum of available indices, the union of all federated result types, etc.) manually and registering the updated schema with the federated gateway schema registry before the new index was available for querying through the GraphQL API.

Additionally, there are additional configurations that users can provide while onboarding a new index to customize the search behavior for their applications — including scripts to tune the relevance scoring algorithm, configuring fields for faceted search, and configuration to control the behavior of typeahead suggestions, etc. These configurations were initially stored in our source control repository which meant any changes to the configuration of any index required a deployment for the changes to take effect.

Recently, we automated this process as well by moving all the configurations to a persistence store and leveraging the power of dynamic schemas in the DGS framework. Users can now use an API to create/update search index configuration and we are able to validate the provided configuration, generate the updated DGS schema dynamically and register the updated schema with the federated gateway schema registry immediately. All configuration changes are reflected immediately in subsequent search queries.

Example configuration:

Sample Search configuration

UI Components

While the primary goal of Studio Search was to build an easy-to-use self-service platform to enable searching against the federated graph, another important goal was to help the Content Engineering teams deliver a visually consistent search experience to the users of their tools and workflows. To that end, we partnered with our UI/UX teams to build a robust set of opinionated presentational components. Studio Search’s offering of drop-in UI components based on our Hawkins design system for typeahead suggestion, faceted search, and extensive filtering ensure visual and behavioral consistency across the suite of applications within Content Engineering. Below are a couple of examples.

Typeahead Search Component

Faceted Search Component

What’s Next?

As a config-driven, self-serve platform, Studio Search has already been able to empower Content Engineering teams to quickly enable the functionality to search against the Content federated graph within their suite of applications. But, we are not quite done yet! There are several upcoming features that are in various stages of development including

  • Leveraging the percolate query functionality in Elasticsearch to support a notifications feature (users save their search criteria and are notified when documents are updated in the index that matches their search criteria)
  • Add support for metrics aggregation in our APIs
  • Leverage the managed delivery functionality in Spinnaker to move to a declarative model for onboarding the search indices
  • And, plenty more

If this sounds interesting to you, connect with us on LinkedIn.

Credits

Thanks to Anoop Panicker, Bo Lei, Charles Zhao, Chris Dhanaraj, Hemamalini Kannan, Jim Isaacs, Johnny Chang, Kasturi Chatterjee, Kishore Banala, Kevin Zhu, Tom Lee, Tongliang Liu, Utkarsh Shrivastava, Vince Bello, Vinod Viswanathan, Yucheng Zeng


How Netflix Content Engineering makes a federated graph searchable (Part 2) was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Scaling Appsec at Netflix (Part 2)

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/scaling-appsec-at-netflix-part-2-c9e0f1488bc5

By Astha Singhal, Lakshmi Sudheer, Julia Knecht

The Application Security teams at Netflix are responsible for securing the software footprint that we create to run the Netflix product, the Netflix studio, and the business. Our customers are product and engineering teams at Netflix that build these software services and platforms. The Netflix cultural values of ‘Context not Control’ and ‘Freedom and Responsibility’ strongly influence how we do Security at Netflix. Our goal is to manage security risks to Netflix via clear, opinionated security guidance, and by providing risk context to Netflix engineering teams to make pragmatic risk decisions at scale.

A few years ago, we published this blog post about how we had organized our team to focus our bandwidth on scalable investments as opposed to just traditional Appsec functions, which were not scaling well in our rapidly growing environment. We leaned into the idea of strategic security partnerships and automation investments to create more leverage for application security. This became the foundation for our current org structure with teams focused on Appsec Partnerships and Appsec Engineering. In this operating model, we provided critical Appsec operational services to Netflix — including bug bounty, pentesting, PSIRT (product security incident response), security reviews, and developer security education — via a shared on-call rotation.

Team Structure v1

Over the past few years, this model has allowed us to focus on investments like Secure by Default for baseline security controls, Security Self-Service for clear actionable guidance and Vulnerability Scanning at scale for software supply chain security. We wanted to share an update on learnings from this model, how our needs have evolved, and where we expect to go from here.

Among the most notable wins, we have been able to utilize this scale focused approach to productize application security for our rapidly growing studio engineering ecosystem, standardize security baseline for all Enterprise apps, and build paved roads to provide Secure by Default Authentication & Authorization capabilities for central data engineering tools. Our focus has been on improving overall security assurance as opposed to just vulnerability prevention. We are now expanding this approach to more parts of our ecosystem. This mindset has also allowed us to invest our capacity for white-glove service towards reasonable residual risk and standard guidance so we can reduce the need for white-glove engagements in the long term (e.g., investment in an API proxy that provides baseline security controls for free as opposed to pentesting all applications that would eventually sit behind that API proxy). This approach has also allowed us to build strong relationships with central engineering teams at Netflix (Data Platform, Developer Tools, Cloud Infrastructure, IAM Product Engineering) that will continue to serve as central points of leverage for security in the long term.

However, it has not been all sunshine and rainbows. On the partnership side, the bespoke nature of each partnership means that there isn’t consistency and redundancy built into the operating model and the related partnership artifacts (e.g., Security Strategy and Roadmap, Threat Model, Deliverable Tracking, Residual Risk Criteria, etc). This leads to insufficient context sharing and high operational churn every time we have personnel changes. The partnership charter has also grown laterally into the infrastructure space as we stack our leverage bets on infrastructure components (like Service Mesh, Container Platform, etc). The skill sets and domain depth in those partnerships has further diversified the skills on the team. But this is a tradeoff on our ability to serve generalized Appsec oncall needs like bug bounty triage with high consistency. Given that partnerships focus on long-running strategic initiatives, the wins can be few and far between and that can be difficult for team motivation. We also found various areas in which security partnership work bleeds into security product solutioning and it can be difficult to identify the appropriate handoff points.

Additionally, as the complexity of our ecosystem grows, the goal of “single PoC into information security” becomes increasingly more difficult to maintain. The team is now investing in consistency and scalability of partnership artifacts and communication channels, better redundancy and context sharing on the team through squad operating models, crisper engagement criteria, and definition of done for partnership engagements.

Our Appsec Engineering team builds products to help us scale, e.g.: a dynamic Asset Inventory that understands the nuances of our bespoke engineering ecosystem and how our applications and data relate to each other. This has evolved their identity to be a software engineering team that focuses on security problems as opposed to a security engineering team that writes code/software. Our hiring has reflected that shift, and we’ve added more dedicated software engineers (SWEs) to the team to help us build out software. With this shift, we’ve incorporated engineering best practices, and our products have appropriate investments toward reliability and sustainability. As the team skews towards more software engineering focused talent, ramping up to support the shared Appsec-focused on-call has been challenging.

While originally built to support AppSec use cases around providing guidance to developers in a self-service way, interest in the rich data and relationships we have in our tools, especially our Asset Inventory, has grown. As a result, we’ve continued to invest in making our solutions scalable and accessible, so security engineers can get the data they need more easily to drive security use cases. We’ve also discovered, through interviews with engineers, that self-service guidance doesn’t stand on its own. Moving forward, the team is investing in understanding our customer use cases better, and shifting our self-service story toward higher-context, more opinionated automated guidance to ensure developers have everything they need to make truly informed decisions about the security of their applications (similar to how they might make resiliency or other product decisions).

As the Netflix business and engineering workforce has grown, our software footprint has also grown and become more heterogeneous. At the same time, partnerships have grown more and more strategic, and engineering has grown more and more software-focused. As our team specialized, what emerged was a loss of strategic focus for our AppSec Professional Services charter. These services now need more dedicated strategic investment as the volume and support needs have grown. So, we are now building out a dedicated capability focused on these critical services that are important investments to be made and can no longer be served effectively via a shared Appsec on-call. This will be our “Appsec Reviews and Assessments” function and we are hiring for passionate, early career Appsec engineers to join this group.

Team Structure v2

We will continue to learn as we go through this next phase of evolution of our program. We hope to continue to share these learnings with the broader community interested in scalable product and application security.


Scaling Appsec at Netflix (Part 2) was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

A Survey of Causal Inference Applications at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/a-survey-of-causal-inference-applications-at-netflix-b62d25175e6f

At Netflix, we want to entertain the world through creating engaging content and helping members discover the titles they will love. Key to that is understanding causal effects that connect changes we make in the product to indicators of member joy.

To measure causal effects we rely heavily on AB testing, but we also leverage quasi-experimentation in cases where AB testing is limited. Many scientists across Netflix have contributed to the way that Netflix analyzes these causal effects.

To celebrate that impact and learn from each other, Netflix scientists recently came together for an internal Causal Inference and Experimentation Summit. The weeklong conference brought speakers from across the content, product, and member experience teams to learn about methodological developments and applications in estimating causal effects. We covered a wide range of topics including difference-in-difference estimation, double machine learning, Bayesian AB testing, and causal inference in recommender systems among many others.

We are excited to share a sneak peek of the event with you in this blog post through selected examples of the talks, giving a behind the scenes look at our community and the breadth of causal inference at Netflix. We look forward to connecting with you through a future external event and additional blog posts!

Incremental Impact of Localization

Yinghong Lan, Vinod Bakthavachalam, Lavanya Sharan, Marie Douriez, Bahar Azarnoush, Mason Kroll

At Netflix, we are passionate about connecting our members with great stories that can come from anywhere, and be loved everywhere. In fact, we stream in more than 30 languages and 190 countries and strive to localize the content, through subtitles and dubs, that our members will enjoy the most. Understanding the heterogenous incremental value of localization to member viewing is key to these efforts!

In order to estimate the incremental value of localization, we turned to causal inference methods using historical data. Running large scale, randomized experiments has both technical and operational challenges, especially because we want to avoid withholding localization from members who might need it to access the content they love.

Conceptual overview of using double machine learning to control for confounders and compare similar titles to estimate incremental impact of localization

We analyzed the data across various languages and applied double machine learning methods to properly control for measured confounders. We not only studied the impact of localization on overall title viewing but also investigated how localization adds value at different parts of the member journey. As a robustness check, we explored various simulations to evaluate the consistency and variance of our incrementality estimates. These insights have played a key role in our decisions to scale localization and delight our members around the world.

A related application of causal inference methods to localization arose when some dubs were delayed due to pandemic-related shutdowns of production studios. To understand the impact of these dub delays on title viewing, we simulated viewing in the absence of delays using the method of synthetic control. We compared simulated viewing to observed viewing at title launch (when dubs were missing) and after title launch (when dubs were added back).

To control for confounders, we used a placebo test to repeat the analysis for titles that were not affected by dub delays. In this way, we were able to estimate the incremental impact of delayed dub availability on member viewing for impacted titles. Should there be another shutdown of dub productions, this analysis enables our teams to make informed decisions about delays with greater confidence.

Holdback Experiments for Product Innovation

Travis Brooks, Cassiano Coria, Greg Nettles, Molly Jackman, Claire Lackner

At Netflix, there are many examples of holdback AB tests, which show some users an experience without a specific feature. They have substantially improved the member experience by measuring long term effects of new features or re-examining old assumptions. However, when the topic of holdback tests is raised, it can seem too complicated in terms of experimental design and/or engineering costs.

We aimed to share best practices we have learned about holdback test design and execution in order to create more clarity around holdback tests at Netflix, so they can be used more broadly across product innovation teams by:

  1. Defining the types of holdbacks and their use cases with past examples
  2. Suggesting future opportunities where holdback testing may be valuable
  3. Enumerating the challenges that holdback tests pose
  4. Identifying future investments that can reduce the cost of deploying and maintaining holdback tests for product and engineering teams

Holdback tests have clear value in many product areas to confirm learnings, understand long term effects, retest old assumptions on newer members, and measure cumulative value. They can also serve as a way to test simplifying the product by removing unused features, creating a more seamless user experience. In many areas at Netflix they are already commonly used for these purposes.

Overview of how holdback tests work where we keep the current experience for a subset of members over the long term in order to gain valuable insights for improving the product

We believe by unifying best practices and providing simpler tools, we can accelerate our learnings and create the best product experience for our members to access the content they love.

Causal Ranker: A Causal Adaptation Framework for Recommendation Models

Jeong-Yoon Lee, Sudeep Das

Most machine learning algorithms used in personalization and search, including deep learning algorithms, are purely associative. They learn from the correlations between features and outcomes how to best predict a target.

In many scenarios, going beyond the purely associative nature to understanding the causal mechanism between taking a certain action and the resulting incremental outcome becomes key to decision making. Causal inference gives us a principled way of learning such relationships, and when coupled with machine learning, becomes a powerful tool that can be leveraged at scale.

Compared to machine learning, causal inference allows us to build a robust framework that controls for confounders in order to estimate the true incremental impact to members

At Netflix, many surfaces today are powered by recommendation models like the personalized rows you see on your homepage. We believe that many of these surfaces can benefit from additional algorithms that focus on making each recommendation as useful to our members as possible, beyond just identifying the title or feature someone is most likely to engage with. Adding this new model on top of existing systems can help improve recommendations to those that are right in the moment, helping find the exact title members are looking to stream now.

This led us to create a framework that applies a light, causal adaptive layer on top of the base recommendation system called the Causal Ranker Framework. The framework consists of several components: impression (treatment) to play (outcome) attribution, true negative label collection, causal estimation, offline evaluation, and model serving.

We are building this framework in a generic way with reusable components so that any interested team within Netflix can adopt this framework for their use case, improving our recommendations throughout the product.

Bellmania: Incremental Account Lifetime Valuation at Netflix and its Applications

Reza Badri, Allen Tran

Understanding the value of acquiring or retaining subscribers is crucial for any subscription business like Netflix. While customer lifetime value (LTV) is commonly used to value members, simple measures of LTV likely overstate the true value of acquisition or retention because there is always a chance that potential members may join in the future on their own without any intervention.

We establish a methodology and necessary assumptions to estimate the monetary value of acquiring or retaining subscribers based on a causal interpretation of incremental LTV. This requires us to estimate both on Netflix and off Netflix LTV.

To overcome the lack of data for off Netflix members, we use an approach based on Markov chains that recovers off Netflix LTV from minimal data on non-subscriber transitions between being a subscriber and canceling over time.

Through Markov chains we can estimate the incremental value of a member and non member that appropriately captures the value of potential joins in the future

Furthermore, we demonstrate how this methodology can be used to (1) forecast aggregate subscriber numbers that respect both addressable market constraints and account-level dynamics, (2) estimate the impact of price changes on revenue and subscription growth, and (3) provide optimal policies, such as price discounting, that maximize expected lifetime revenue of members.

Measuring causality is a large part of the data science culture at Netflix, and we are proud to have so many stunning colleagues leverage both experimentation and quasi-experimentation to drive member impact. The conference was a great way to celebrate each other’s work and highlight the ways in which causal methodology can create value for the business.

We look forward to sharing more about our work with the community in upcoming posts. To stay up to date on our work, follow the Netflix Tech Blog, and if you are interested in joining us, we are currently looking for new stunning colleagues to help us entertain the world!


A Survey of Causal Inference Applications at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Evolution of ML Fact Store

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/evolution-of-ml-fact-store-5941d3231762

by Vivek Kaushal

At Netflix, we aim to provide recommendations that match our members’ interests. To achieve this, we rely on Machine Learning (ML) algorithms. ML algorithms can be only as good as the data that we provide to it. This post will focus on the large volume of high-quality data stored in Axion — our fact store that is leveraged to compute ML features offline. We built Axion primarily to remove any training-serving skew and make offline experimentation faster. We will share how its design has evolved over the years and the lessons learned while building it.

Terminology

Axion fact store is part of our Machine Learning Platform, the platform that serves machine learning needs across Netflix. Figure 1 below shows how Axion interacts with Netflix’s ML platform. The overall ML platform has tens of components, and the diagram below only shows the ones that are relevant to this post. To understand Axion’s design, we need to know the various components that interact with it.

Figure 1: Netflix ML Architecture
  • Fact: A fact is data about our members or videos. An example of data about members is the video they had watched or added to their My List. An example of video data is video metadata, like the length of a video. Time is a critical component of Axion — When we talk about facts, we talk about facts at a moment in time. These facts are managed and made available by services like viewing history or video metadata services outside of Axion.
  • Compute application: These applications generate recommendations for our members. They fetch facts from respective data services, run feature encoders to generate features and score the ML models to eventually generate recommendations.
  • Offline feature generator: We regenerate the values of the features that were generated for inferencing in the compute application. Offline Feature Generator is a spark application that enables on-demand generation of features using new, existing, or updated feature encoders.
  • Shared feature encoders: Feature encoders are shared between compute applications and offline feature generators. We make sure there is no training/serving skew by using the same data and the code for online and offline feature generation.

Motivation

Five years ago, we posted and talked about the need for a ML fact store. The motivation has not changed since then; the design has. This post focuses on the new design, but here is a summary of why we built this fact store.

Our machine learning models train on several weeks of data. Thus, if we want to run an experiment with a new or modified feature encoder, we need to build several weeks of feature data with this new or modified feature encoder. We have two options to collect features using this updated feature encoder.

The first is to log features from the compute applications, popularly known as feature logging. We can deploy updated feature encoders in our compute applications and then wait for them to log the feature values. Since we train our models on several weeks of data, this method is slow for us as we will have to wait for several weeks for the data collection.

An alternative to feature logging is to regenerate the features with updated feature encoders. If we can access the historical facts, we can regenerate the features using updated feature encoders. Regeneration takes hours compared to weeks taken by the feature logging. Thus, we decided to go this route and started storing facts to reduce the time it takes to run an experiment with new or modified features.

Design evolution

Axion fact store has four components — fact logging client, ETL, query client, and data quality infrastructure. We will describe how the design evolved in these components.

Evolution of Fact Logging Client

Compute applications access facts (members’ viewing history, their likes and my list information, etc.) from various grpc services that power the whole Netflix experience. These facts are used to generate features using shared feature encoders, which in turn are used by ML models to generate recommendations. After generating the recommendations, compute applications use Axion’s fact logging client to log these facts.

At a later stage in the offline pipelines, the offline feature generator uses these logged facts to regenerate temporally accurate features. Temporal accuracy, in this context, is the ability to regenerate the exact set of features that were generated for the recommendations. This temporal accuracy of features is key to removing the training-serving skew.

The first version of our logger library optimized for storage by deduplicating facts and optimized for network i/o using different compression methods for each fact. Then we started hitting roadblocks while optimizing the query performance. Since we were optimizing at the logging level for storage and performance, we had less data and metadata to play with to optimize the query performance.

Eventually, we decided to simplify the logger. Now we asynchronously collect all the facts and metadata into a protobuf, compress it, and send it to the keystone messaging service.

Evolution of ETL and Query Client

ETL and Query Client are intertwined, as any ETL changes could directly impact the query performance. ETL is the component where we experiment for query performance, improving data quality, and storage optimization. Figure 2 shows components of Axion’s ETL and its interaction with the query client.

Fig 2: Internal components of Axion

Axion’s fact logging client logs facts to the keystone real-time stream processing platform, which outputs data to an Iceberg table. We use Keystone as it is easy to use, reliable, scalable, and provides aggregation of facts from different cloud regions into a single AWS region. Having all data in a single AWS region exposes us to a single point of failure but it significantly reduces the operational overhead of our ETL pipelines which we believe makes it a worthwhile trade-off. We currently send all the facts into a single Keystone stream which we have configured to write to a single Iceberg table. We plan to split these Keystone streams into multiple streams for horizontal scalability.

The Iceberg table created by Keystone contains large blobs of unstructured data. These large unstructured blogs are not efficient for querying, so we need to transform and store this data in a different format to allow efficient queries. One might think that normalizing it would make storage and querying more efficient, albeit at the cost of writing more complex queries. Hence, our first approach was to normalize the incoming data and store it in multiple tables. We soon realized that, while space-optimized, it made querying very inefficient for the scale of data we needed to handle. We ran into various shuffle issues in Spark as we were joining several big tables at query time.

We then decided to denormalize the data and store all facts and metadata in one Iceberg table using nested Parquet format. While storing in one Iceberg table was not as space-optimized, Parquet did provide us with significant savings in storage costs, and most importantly, it made our Spark queries succeed. However, Spark query execution remained slow. Further attempts to optimize query performance, like using bloom filters and predicate pushdown, were successful but still far away from where we wanted it to be.

Why was querying the single Iceberg table slow?

What’s our end goal? We want to train our ML models to personalize the member experience. We have a plethora of ML models that drive personalization. Each of these models are trained with different datasets and features along with different stratification and objectives. Given that Axion is used as the defacto Fact store for assembling the training dataset for all these models, it is important for Axion to log and store enough facts that would be sufficient for all these models. However, for a given ML model, we only require a subset of the data stored in Axion for its training needs. We saw queries filtering down an input dataset of several hundred million rows to less than a million in extreme cases. Even with bloom filters, the query performance was slow because the query was downloading all of the data from s3 and then dropping it. As our label dataset was also random, presorting facts data also did not help.

We realized that our options with Iceberg were limited if we only needed data for a million rows — out of several hundred million — and we had no additional information to optimize our queries. So we decided not to further optimize joins with the Iceberg data and instead move to an alternate approach.

Low-latency Queries

To avoid downloading all of the fact data from s3 in a spark executor and then dropping it, we analyzed our query patterns and figured out that there is a way to only access the data that we are interested in. This was achieved by introducing an EVCache, a key-value store, which stores facts and indices optimized for these particular query patterns.

Let’s see how the solution works for one of these query patterns — querying by member id. We first query the index by member id to find the keys for the facts of that member and query those facts from EVCache in parallel. So, we make multiple calls to the key-value store for each row in our training set. Even when accounting for these multiple calls, the query performance is an order of magnitude faster than scanning several hundred times more data stored in the Iceberg table. Depending on the use case, EVCache queries can be 3x-50x faster than Iceberg.

The only problem with this approach is that EVCache is more expensive than Iceberg storage, so we need to limit the amount of data stored. So, for the queries that request data not available in EVCache, our only option is to query Iceberg. In the future, we want to store all facts in EVCache by optimizing how we store data in EVCache.

How do we monitor the quality of data?

Over the years, we learned the importance of having comprehensive data quality checks for our datasets. Corruption in data can significantly impact production model performance and A/B test results. From the ML researchers’ perspective, it doesn’t matter if Axion or a component outside of Axion corrupted the data. When they read the data from Axion, if it is bad, it is a loss of trust in Axion. For Axion to become the defacto fact store for all Personalization ML models, the research teams needed to trust the quality of data stored. Hence, we designed a comprehensive system that monitors the quality of data flowing through Axion to detect corruptions, whether introduced by Axion or outside Axion.

We bucketed data corruptions observed when reading data from Axion on three dimensions:

  • The impact on a value in data: Was the value missing? Did a new value appear (unintentionally)? Was the value replaced with a different value?
  • The spread of data corruption: Did data corruption have a row or columnar impact? Did the corruption impact one pipeline or multiple ML pipelines?
  • The source of data corruption: Was data corrupted by components outside of Axion? Did Axion components corrupt data? Was data corrupted at rest?

We came up with three different approaches to detect data corruption, wherein each approach can detect corruption along multiple dimensions described above.

Aggregations

Data volume logged to Axion datastore is predictable. Compute applications follow daily trends. Some log consistently every hour, others log for a few hours every day. We aggregate the counts on dimensions like total records, compute application, fact counts etc. Then we use a rule-based approach to validate the counts are within a certain threshold of past trends. Alerts are triggered when counts vary outside these thresholds. These trend-based alerts are helpful with missing or new data; row-level impact, and pipelines impact. They help with column-level impact only on rare occasions.

Consistent sampling

We sample a small percentage of the data based on a predictable member id hash and store it in separate tables. By consistent sampling across different data stores and pipelines, we can run canaries on this smaller subset and get output quickly. We also compare the output of these canaries against production to detect unintended changes in data during new code deployment. One downside of consistent sampling is that it may not catch rare issues, especially if the rate of data corruption is significantly lower than our sampling rate. Consistent sampling checks help detect attribute impact — new, missing, or replacement; columnar impact, and single pipeline issues.

Random sampling

While the above two strategies combined can detect most data corruptions, they do occasionally miss. For those rare occasions, we rely on random sampling. We randomly query a subset of the data multiple times every hour. Both hot and cold data, i.e., recently logged data and data logged a while ago, are randomly sampled. We expect these queries to pass without issues. When they fail, it is either due to bad data or issues with the underlying infrastructure. While we think of it as an “I’m feeling lucky” strategy, it does work as long as we read significantly more data than the rate of corrupted data.

Another advantage to random sampling is maintaining the quality of unused facts. Axion users do not read a significant percentage of facts logged to Axion, and we need to make sure that these unused facts are of good quality as they can be used in the future. We have pipelines that randomly read these unused facts and alert when the query does not get the expected output. In terms of impact, these random checks are like winning a lottery — you win occasionally, and you never know how big it is.

Results from monitoring data quality

We deployed the above three monitoring approaches more than two years ago, and since then, we have identified more than 95% of data issues early. We have also significantly improved the stability of our customer pipelines. If you want to know more about how we monitor data quality in Axion, you can check our spark summit talk and this podcast.

Learnings from Axion’s evolution

We learned from designing this fact store to start with a simple design and avoid premature optimizations that add complexity. Pay the storage, network, and compute cost. As the product becomes available to the customers, new use cases will pop up that will be harder to support with a complex design. Once the customers have adopted the product, start looking into optimizations.

While “keep the design simple” is a frequently shared learning in software engineering, it is not always easy to achieve. For example, we learned that our fact logging client can be simple with minimal business logic, but our query client needs to be functionality-rich. Our learning is that if we need to add complexity, add it in the least number of components instead of spreading it out.

Another learning is that we should have invested early into a robust testing framework. Unit tests and integration tests only took us so far. We needed scalability testing and performance testing as well. This scalability and performance testing framework helped stabilize the system because, without it, we ran into issues that took us weeks to clean up.

Lastly, we learned that we should run data migrations and push the breaking API changes as soon as possible. As more customers adopt Axion, running data migrations and making breaking API changes are becoming harder and harder.

Conclusion and future work

Axion is our primary data source that is used extensively by all our Personalization ML models for offline feature generation. Given that it ensures that there is no training/serving skew and that it has significantly reduced offline feature generation latencies we are now starting to make it the defacto Fact store for other ML use cases within Netflix.

We do have use cases that are not served well with the current design, like bandits, because our current design limits storing a map per row creating a limitation when a compute application needs to log multiple values for the same key. Also, as described in the design, we want to optimize how we store data in EVCache to enable us to store more data.

If you are interested in working on similar challenges, join us.


Evolution of ML Fact Store was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Netflix Content Engineering makes a federated graph searchable

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-content-engineering-makes-a-federated-graph-searchable-5c0c1c7d7eaf

By Alex Hutter, Falguni Jhaveri and Senthil Sayeebaba

Over the past few years Content Engineering at Netflix has been transitioning many of its services to use a federated GraphQL platform. GraphQL federation enables domain teams to independently build and operate their own Domain Graph Services (DGS) and, at the same time, connect their domain with other domains in a unified GraphQL schema exposed by a federated gateway.

As an example, let’s examine three core entities of the graph, each owned by separate engineering teams:

  1. Movie: At Netflix, we make titles (shows, films, shorts etc.). For simplicity, let’s assume each title is a Movie object.
  2. Production: Each Movie is associated with a Studio Production. A Production object tracks everything needed to make a Movie including shooting location, vendors, and more.
  3. Talent: the people working on a Movie are the Talent, including actors, directors, and so on.
Sample GraphQL Schema

Once entities like the above are available in the graph, it’s very common for folks to want to query for a particular entity based on attributes of related entities, e.g. give me all movies that are currently in photography with Ryan Reynolds as an actor.

In a federated graph architecture, how can we answer such a query given that each entity is served by its own service? The Movie service would need to provide an endpoint that accepts a query and filters that may apply to data the service does not own, and use those to identify the appropriate Movie entities to return.

In fact, every entity owning service could be required to do this work.

This common problem of making a federated graph searchable led to the creation of Studio Search.

The Studio Search platform was designed to take a portion of the federated graph, a subgraph rooted at an entity of interest, and make it searchable. The entities of the subgraph can be queried with text input, filtered, ranked, and faceted. In the next section, we’ll discuss how we made this possible.

Introducing Studio Search

When hearing that we want to enable teams to search something, your mind likely goes to building an index of some kind. Ours did too! So we need to build an index of a portion of the federated graph.

How do our users tell us which portion and, even more critically, given that the portion of the graph of interest will almost definitely span data exposed by many services, how do we keep the index current with all these various services?

We chose Elasticsearch as the underlying technology for our index and determined that there were three main pieces of information required to build out an indexing pipeline:

  • A definition of their subgraph of interest rooted at the entity they primarily will be searching for
  • Events to notify the platform of changes to entities in the subgraph
  • Index specific configuration such as whether a field should be used for full text queries or whether a sub-document is nested

In short, our solution was to build an index for the subgraphs of interest. This index needs to be kept up-to-date with the data exposed by the various services in the federated graph in near-real time.

GraphQL gives us a straightforward way to define the subgraph — a single templated GraphQL query that pulls all of the data the user is interested in using in their searches.

Here’s an example GraphQL query template. It’s pulling data for Movies and their related Productions and Talent.

Sample GraphQL query

To keep the index up to date, events are used to trigger a reindexing operation for individual entities when they change. Change Data Capture (CDC) events are the preferred events for triggering these operations — most teams produce them using Netflix’s CDC connectors — however, application events are also supported when necessary.

All data to be indexed is being fetched from the federated graph so all that is needed in the events is an entity id; the id can be substituted into the GraphQL query template to fetch the entity and any related data.

Using the type information present in the GraphQL query template and the user specified index configuration we were able to create an index template with a set of custom Elasticsearch text analyzers that generalized well across domains.

Given these inputs, a Data Mesh pipeline can be created that consists of the user provided CDC event source, a processor to enrich those events using the user provided GraphQL query and a sink to Elasticsearch.

Architecture

Putting this all together, below you can see a simplified view of the architecture.

Studio Search Indexing Architecture
  1. Studio applications produce events to schematized Kafka streams within Data Mesh.

a. By transacting with a database which is monitored by a CDC connector that creates events, or

b. By directly creating events using a Data Mesh client.

2. The schematized events are consumed by Data Mesh processors implemented in the Apache Flink framework. Some entities have multiple events for their changes so we leverage union processors to combine data from multiple Kafka streams.

a. A GraphQL processor executes the user provided GraphQL query to fetch documents from the federated gateway.

b. The federated gateway, in turn, fetches data from the Studio applications.

3. The documents fetched from the federated gateway are put onto another schematized Kafka topic before being processed by an Elasticsearch sink in Data Mesh that indexes them into Elasticsearch index configured with an indexing template created specifically for the fields and types present in the document.

Reverse lookups

You may have noticed something missing in the above explanation. If the index is being populated based on Movie id events, how does it stay up to date when a Production or Talent changes? Our solution to this is a reverse lookup — when a change to a related entity is made, we need to look up all of the primary entities that could be affected and trigger events for those. We do this by consulting the index itself and querying for all primary entities related to the entity that has changed.

For instance if our index has a document that looks like this:

Sample Elasticsearch document

And the pipeline observes a change to the Production with ptpId “abc”, we can query the index for all documents with production.ptpId == “abc” and extract the movieId. Then, we can pass that movieId down into the rest of the indexing pipeline.

Scaling the Solution

The solution we came up with worked quite well. Teams were easily able to share the requirements for their subgraph’s index via a GraphQL query template and could use existing tooling to generate the events to enable the index to be kept up to date in near real-time. Reusing the index itself to power reverse lookups enabled us to keep all the logic for handling related entities contained within our systems and shield our users from that complexity. In fact it worked so well that we became inundated with requests to integrate with Studio Search — it began to power a significant portion of the user experience for many applications within Content Engineering.

Early on, we did integrations by hand but as adoption of Studio Search took off this did not scale. We needed to build tools to help us automate as much of the provisioning of the pipelines as possible. In order to get there we identified four main problems we needed to solve:

  • How to collect all the required configuration for the pipeline from users.
  • Data Mesh streams are schematized with Avro. In the previous architecture diagram, in 3) there is a stream carrying the results of the GraphQL query to the Elasticsearch sink. The response from GraphQL can contain 10s of fields, often nested. Writing an Avro schema for such a document is time consuming and error prone to do by hand. We needed to make this step much easier.
  • Similarly the generation of the Elasticsearch template was time consuming and error prone. We needed to determine how to generate one based on the users’ configuration.
  • Finally, creating Data Mesh pipelines manually was time consuming and error prone as well due to the volume of configuration required.

Configuration

For collecting the indexing pipeline configuration from users we defined a single configuration file that enabled users to provide a high level description of their pipeline that we can use to programmatically create the indexing pipeline in Data Mesh. By using this high-level description we were able to greatly simplify the pipeline creation process for users by filling in common yet required configuration for the Data Mesh pipeline.

Sample .yaml configuration

Avro schema & Elasticsearch index template generation

The approach for both schema and index template generation was very similar. Essentially it required taking the user provided GraphQL query template and generating JSON from it. This was done using graphql-java. The steps required are enumerated below:

  • Introspect the federated graph’s schema and use the response to build a GraphQLSchema object
  • Parse and validate the user provided GraphQL query template against the schema
  • Visit the nodes of the query using utilities provided by graphql-java and collect the results into a JSON object — this generated object is the schema/template

Deployment

The previous steps centralized all the configuration in a single file and provided tools to generate additional configuration for the pipeline’s dependencies. Now all that was required was an entry point for users to provide their configuration file for orchestrating the provisioning of the indexing pipeline. Given our user base was other engineers we decided to provide a command line interface (CLI) written in Python. Using Python we were able to get the first version of the CLI to our users quickly. Netflix provides tooling that makes the CLI auto-update which makes the CLI easy to iterate on. The CLI performs the following tasks:

  • Validates the provided configuration file
  • Calls a service to generate the Avro schema & Elasticsearch index template
  • Assembles the logical plan for the Data Mesh pipeline and creates it using Data Mesh APIs

A CLI is just a step towards a better self-service deployment process. We’re currently exploring options for treating these indices and their pipelines as declarative infrastructure managed within the application that consumes them.

Current Challenges

Using the federated graph to provide the documents for indexing simplifies much of the indexing process but it also creates its own set of challenges. If the challenges below sound exciting to you, come join us!

Backfill

Bootstrapping a new index for the addition or removal of attributes or refreshing an established index both add considerable additional and spiky load to the federated gateway and the component DGSes. Depending on the cardinality of the index and the complexity of its query we may need to coordinate with service owners and/or run backfills off peak. We continue to manage tradeoffs between reindexing speed and load.

Reverse Lookups

Reverse lookups, while convenient, are not particularly user friendly. They introduce a circular dependency in the pipeline — you can’t create the indexing pipeline without reverse lookups and reverse lookups need the index to function — which we’ve mitigated although it still creates some confusion. They also require the definer of the index to have detailed knowledge of the eventing for related entities they want to include and that may cover many different domains depending on the index — we have one index covering eight domains.

Index consistency

As an index becomes more complex it is likely to depend on more DGSes and the likelihood of errors increases when fetching the required documents from the federated graph. These errors can lead to documents in the index being out of date or even missing altogether. The owner of the index is often required to follow up with other domain teams regarding errors in related entities and be in the unenviable position of not being able to do much to resolve the issues independently. When the errors are resolved, the process of replaying the failed events is manual and there can be a lag when the service is again successfully returning data but the index does not match it.

Stay Tuned

In this post, we described how our indexing infrastructure moves data for any given subgraph of the Netflix Content federated graph to Elasticsearch and keeps that data in sync with the source of truth. In an upcoming post, we’ll describe how this data can be queried without actually needing to know anything about Elasticsearch.

Credits

Thanks to Anoop Panicker, Bo Lei, Charles Zhao, Chris Dhanaraj, Hemamalini Kannan, Jim Isaacs, Johnny Chang, Kasturi Chatterjee, Kishore Banala, Kevin Zhu, Tom Lee, Tongliang Liu, Utkarsh Shrivastava, Vince Bello, Vinod Viswanathan, Yucheng Zeng


How Netflix Content Engineering makes a federated graph searchable was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Rapid Event Notification System at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/rapid-event-notification-system-at-netflix-6deb1d2b57d1

By: Ankush Gulati, David Gevorkyan
Additional credits: Michael Clark, Gokhan Ozer

Intro

Netflix has more than 220 million active members who perform a variety of actions throughout each session, ranging from renaming a profile to watching a title. Reacting to these actions in near real-time to keep the experience consistent across devices is critical for ensuring an optimal member experience. This is not an easy task, considering the wide variety of supported devices and the sheer volume of actions our members perform. To this end, we developed a Rapid Event Notification System (RENO) to support use cases that require server initiated communication with devices in a scalable and extensible manner.

In this blog post, we will give an overview of the Rapid Event Notification System at Netflix and share some of the learnings we gained along the way.

Motivation

With the rapid growth in Netflix member base and the increasing complexity of our systems, our architecture has evolved into an asynchronous one that enables both online and offline computation. Providing a seamless and consistent Netflix experience across various platforms (iOS, Android, smart TVs, Roku, Amazon FireStick, web browser) and various device types (mobile phones, tablets, televisions, computers, set top boxes) requires more than the traditional request-response model. Over time, we’ve seen an increase in use cases where backend systems need to initiate communication with devices to notify them of member-driven changes or experience updates quickly and consistently.

Use cases

  • Viewing Activity
    When a member begins to watch a show, their “Continue Watching” list should be updated across all of their devices to reflect that viewing.
  • Personalized Experience Refresh
    Netflix Recommendation engine continuously refreshes recommendations for every member. The updated recommendations need to be delivered to the device timely for an optimal member experience.
  • Membership Plan Changes
    Members often change their plan types, leading to a change in their experience that must be immediately reflected across all of their devices.
  • Member “My List” Updates
    When members update their “My List” by adding or removing titles, the changes should be reflected across all of their devices.
  • Member Profile Changes
    When members update their account settings like add/delete/rename profiles or change their preferred maturity level for content, these updates must be reflected across all of their devices.
  • System Diagnostic Signals
    In special scenarios, we need to send diagnostic signals to the Netflix app on devices to help troubleshoot problems and enable tracing capabilities.

Design Decisions

In designing the system, we made a few key decisions that helped shape the architecture of RENO:

  1. Single Events Source
  2. Event Prioritization
  3. Hybrid Communication Model
  4. Targeted Delivery
  5. Managing High RPS

Single Events Source

The use cases we wanted to support originate from various internal systems and member actions, so we needed to listen for events from several different microservices. At Netflix, our near-real-time event flow is managed by an internal distributed computation framework called Manhattan (you can learn more about it here). We leveraged Manhattan’s event management framework to create a level of indirection serving as the single source of events for RENO.

Event Prioritization

Considering the use cases were wide ranging both in terms of their sources and their importance, we built segmentation into the event processing. For example, a member-triggered event such as “change in a profile’s maturity level” should have a much higher priority than a “system diagnostic signal”. We thus assigned a priority to each use case and sharded event traffic by routing to priority-specific queues and the corresponding event processing clusters. This separation allows us to tune system configuration and scaling policies independently for different event priorities and traffic patterns.

Hybrid Communication Model

As mentioned earlier in this post, one key challenge for a service like RENO is supporting multiple platforms. While a mobile device is almost always connected to the internet and reachable, a smart TV is only online while in use. This network connection heterogeneity made choosing a single delivery model difficult. For example, entirely relying on a Pull model wherein the device frequently calls home for updates would result in chatty mobile apps. That in turn will be triggering the per-app communication limits that iOS and Android platforms enforce (we also need to be considerate of low bandwidth connections). On the other hand, using only a Push mechanism would lead smart TVs to miss notifications while they are powered off during most of the day. We therefore chose a hybrid Push AND Pull communication model wherein the server tries to deliver notifications to all devices immediately using Push notifications, and devices call home at various stages of the application lifecycle.

Using a Push-and-Pull delivery model combination also supports devices limited to a single communication model. This includes older, legacy devices that do not support Push Notifications.

Targeted Delivery

Considering the use cases were wide ranging in terms of both sources and target device types, we built support for device specific notification delivery. This capability allows notifying specific device categories as per the use case. When an actionable event arrives, RENO applies the use case specific business logic, gathers the list of devices eligible to receive this notification and attempts delivery. This helps limit the outgoing traffic footprint considerably.

Managing High RPS

With over 220 million members, we were conscious of the fact that a service like RENO needs to process many events per member during a viewing session. At peak times, RENO serves about 150k events per second. Such a high RPS during specific times of the day can create a thundering herd problem and put strain on internal and external downstream services. We therefore implemented a few optimizations:

  • Event Age
    Many events that need to be notified to the devices are time sensitive, and they are of no or little value unless sent almost immediately. To avoid processing old events, a staleness filter is applied as a gating check. If an event age is older than a configurable threshold, it is not processed. This filter weeds out events that have no value to the devices early in the processing phase and protects the queues from being flooded due to stale upstream events that may have been backed up.
  • Online Devices
    To reduce the ongoing traffic footprint, notifications are sent only to devices that are currently online by leveraging an existing registry that is kept up-to-date by Zuul (learn more about it here).
  • Scaling Policies
    To address the thundering herd problem and to keep latencies under acceptable thresholds, the cluster scale-up policies are configured to be more aggressive than the scale-down policies. This approach enables the computing power to catch up quickly when the queues grow.
  • Event Deduplication
    Both iOS and Android platforms aggressively restrict the level of activity generated by backgrounded apps, hence the reason why incoming events are deduplicated in RENO. Duplicate events can occur in case of high RPS, and they are merged together when it does not cause any loss of context for the device.
  • Bulkheaded Delivery
    Multiple downstream services are used to send push notifications to different device platforms including external ones like Apple Push Notification Service (APNS) for Apple devices and Google’s Firebase Cloud Messaging (FCM) for Android. To safeguard against a downstream service bringing down the entire notification service, the event delivery is parallelized across different platforms, making it best-effort per platform. If a downstream service or platform fails to deliver the notification, the other devices are not blocked from receiving push notifications.

Architecture

As shown in the diagram above, the RENO service can be broken down into the following components.

Event Triggers

Member actions and system-driven updates that require refreshing the experience on members’ devices.

Event Management Engine

The near-real-time event flow management framework at Netflix referred to as Manhattan can be configured to listen to specific events and forward events to different queues.

Event Priority Based Queues

Amazon SQS queues that are populated by priority-based event forwarding rules are set up in Manhattan to allow priority based sharding of traffic.

Event Priority Based Clusters

AWS Instance Clusters that subscribe to the corresponding queues with the same priority. They process all the events arriving on those queues and generate actionable notifications for devices.

Outbound Messaging System

The Netflix messaging system that sends in-app push notifications to members is used to send RENO-produced notifications on the last mile to mobile devices. This messaging system is described in this blog post.

For notifications to web, TV & other streaming devices, we use a homegrown push notification solution ​​called Zuul Push that provides “always-on” persistent connections with online devices. To learn more about the Zuul Push solution, listen to this talk from a Netflix colleague.

Persistent Store

A Cassandra database that stores all the notifications emitted by RENO for each device to allow those devices to poll for their messages at their own cadence.

Observability

At Netflix, we put a strong emphasis on building robust monitoring into our systems to provide a clear view of system health. For a high RPS service like RENO that relies on several upstream systems as its traffic source and simultaneously produces heavy traffic for different internal and external downstream systems, it is important to have a strong combination of metrics, alerting and logging in place. For alerting, in addition to the standard system health metrics such as CPU, memory, and performance, we added a number of “edge-of-the-service” metrics and logging to capture any aberrations from upstream or downstream systems. Furthermore, in addition to real-time alerting, we added trend analysis for important metrics to help catch longer term degradations. We instrumented RENO with a real time stream processing application called Mantis (you can learn more about it here). It allowed us to track events in real-time over the wire at device specific granularity thus making debugging easier. Finally, we found it useful to have platform-specific alerting (for iOS, Android, etc.) in finding the root causes of issues faster.

Wins

  • Can easily support new use cases
  • Scales horizontally with higher throughput

When we set out to build RENO the goal was limited to the “Personalized Experience Refresh” use case of the product. As the design of RENO evolved, support for new use cases became possible and RENO was quickly positioned as the centralized rapid notification service for all product areas at Netflix.

The design decisions we made early on paid off, such as making addition of new use cases a “plug-and-play” solution and providing a hybrid delivery model across all platforms. We were able to onboard additional product use cases at a fast pace thus unblocking a lot of innovation.

An important learning in building this platform was ensuring that RENO could scale horizontally as more types of events and higher throughput was needed over time. This ability was primarily achieved by allowing sharding based on either event type or priority, along with using an asynchronous event driven processing model that can be scaled by simply adding more machines for event processing.

Looking Ahead

As Netflix’s member base continues to grow at a rapid pace, it is increasingly beneficial to have a service like RENO that helps give our members the best and most up to date Netflix experience. From membership related updates to contextual personalization, and more — we are continually evolving our notifications portfolio as we continue to innovate on our member experience. Architecturally, we are evaluating opportunities to build in more features such as guaranteed message delivery and message batching that can open up more use cases and help reduce the communication footprint of RENO.

Building Great Things Together

We are just getting started on this journey to build impactful systems that help propel our business forward. The core to bringing these engineering solutions to life is our direct collaboration with our colleagues and using the most impactful tools and technologies available. If this is something that excites you, we’d love for you to join us.


Rapid Event Notification System at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data pipeline asset management with Dataflow

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-pipeline-asset-management-with-dataflow-86525b3e21ca

by Sam Setegne, Jai Balani, Olek Gorajek

Glossary

  • asset — any business logic code in a raw (e.g. SQL) or compiled (e.g. JAR) form to be executed as part of the user defined data pipeline.
  • data pipeline — a set of tasks (or jobs) to be executed in a predefined order (a.k.a. DAG) for the purpose of transforming data using some business logic.
  • Dataflow — Netflix homegrown CLI tool for data pipeline management.
  • job — a.k.a task, an atomic unit of data transformation logic, a non-separable execution block in the workflow chain.
  • namespace — unique label, usually representing a business subject area, assigned to a workflow asset to identify it across all other assets managed by Dataflow (e.g. security).
  • workflow — see “data pipeline”

Intro

The problem of managing scheduled workflows and their assets is as old as the use of cron daemon in early Unix operating systems. The design of a cron job is simple, you take some system command, you pick the schedule to run it on and you are done. Example:

0 0 * * MON /home/alice/backup.sh

In the above example the system would wake up every Monday morning and execute the backup.sh script. Simple right? But what if the script does not exist in the given path, or what if it existed initially but then Alice let Bob access her home directory and he accidentally deleted it? Or what if Alice wanted to add new backup functionality and she accidentally broke existing code while updating it?

The answers to these questions is something we would like to address in this article and propose a clean solution to this problem.

Let’s define some requirements that we are interested in delivering to the Netflix data engineers or anyone who would like to schedule a workflow with some external assets in it. By external assets we simply mean some executable carrying the actual business logic of the job. It could be a JAR compiled from Scala, a Python script or module, or a simple SQL file. The important thing is that this business logic can be built in a separate repository and maintained independently from the workflow definition. Keeping all that in mind we would like to achieve the following properties for the whole workflow deployment:

  1. Versioning: we want both the workflow definition and its assets to be versioned and we want the versions to be tied together in a clear way.
  2. Transparency: we want to know which version of an asset is running along with every workflow instance, so if there are any issues we can easily identify which version caused the problem and to which one we could revert, if necessary.
  3. ACID deployment: for every scheduler workflow definition change, we would like to have all the workflow assets bundled in an atomic, durable, isolated and consistent manner. This way, if necessary, all we need to know is which version of the workflow to roll back to, and the rest would be taken care of for us.

While all the above goals are our North Star, we also don’t want to negatively affect fast deployment, high availability and arbitrary life span of any deployed asset.

Previous solutions

The basic approach to pulling down arbitrary workflow resources during workflow execution has been known to mankind since the invention of cron, and with the advent of “infinite” cloud storage systems like S3, this approach has served us for many years. Its apparent flexibility and convenience can often fool us into thinking that by simply replacing the asset in the S3 location we can, without any hassle, introduce changes to our business logic. This method often proves very troublesome especially if there is more than one engineer working on the same pipeline and they are not all aware of the other folks’ “deployment process”.

The slightly improved approach is shown on the diagram below.

Figure 1. Manually constructed continuous delivery system.

In Figure 1, you can see an illustration of a typical deployment pipeline manually constructed by a user for an individual project. The continuous deployment tool submits a workflow definition with pointers to assets in fixed S3 locations. These assets are then separately deployed to these fixed locations. At runtime, the assets are retrieved from the defined locations in S3 and executed in the runtime container. Despite requiring users to construct the deployment pipeline manually, often by writing their own scripts from scratch, this design works and has been successfully used by many teams for years. That being said, it does have some drawbacks that are revealed as you try to add any amount of complexity to your deployment logic. Let’s discuss a few of them.

Does not consider branch/PR deployments

In any production pipeline, you want the flexibility of having a “safe” alternative deployment logic. For example, you may want to build your Scala code and deploy it to an alternative location in S3 while pushing a sandbox version of your workflow that points to this alternative location. Something this simple gets very complicated very quickly and requires the user to consider a number of things. Where should this alternative location be in S3? Is a single location enough? How do you set up your deployment logic to know when to deploy the workflow to a test or dev environment? Answers to these questions often end up being more custom logic inside of the user’s deployment scripts.

Cannot rollback to previous workflow versions

When you deploy a workflow, you really want it to encapsulate an atomic and idempotent unit of work. Part of the reason for that is the desire for the ability to rollback to a previous workflow version and knowing that it will always behave as it did in previous runs. There can be many reasons to rollback but the typical one is when you’ve recognized a regression in a recent deployment that was not caught during testing. In the current design, reverting to a previous workflow definition in your scheduling system is not enough! You have to rebuild your assets from source and move them to your fixed S3 location that your workflow points to. To enable atomic rollbacks, you can add more custom logic to your deployment scripts to always deploy your assets to a new location and generate new pointers for your workflows to use, but that comes with higher complexity that often just doesn’t feel worth it. More commonly, teams will opt to do more testing to try and catch regressions before deploying to production and will accept the extra burden of rebuilding all of their workflow dependencies in the event of a regression.

Runtime dependency on user-managed cloud storage locations

At runtime, the container must reach out to a user-defined storage location to retrieve the assets required. This causes the user-managed storage system to be a critical runtime dependency. If we zoom out to look at an entire workflow management system, the runtime dependencies can become unwieldy if it relies on various storage systems that are arbitrarily defined by the workflow developers!

Dataflow deployment with asset management

In the attempt to deliver a simple and robust solution to the managed workflow deployments we created a command line utility called Dataflow. It is a Python based CLI + library that can be installed anywhere inside the Netflix environment. This utility can build and configure workflow definitions and their assets during testing and deployment. See below diagram:

Figure 2. Dataflow asset management system.

In Figure 2, we show a variation of the typical manually constructed deployment pipeline. Every asset deployment is released to some newly calculated UUID. The workflow definition can then identify a specific asset by its UUID. Deploying the workflow to the scheduling system produces a “Deployment Bundle”. The bundle includes all of the assets that have been referenced by the workflow definition and the entire bundle is deployed to the scheduling system. At every scheduled runtime, the scheduling system can create an instance of your workflow without having to gather runtime dependencies from external systems.

The asset management system that we’ve created for Dataflow provides a strong abstraction over this deployment design. Deploying the asset, generating the UUID, and building the deployment bundle is all handled automatically by the Dataflow build logic. The user does not need to be aware of anything that’s happening on S3, nor that S3 is being used at all! Instead, the user is given a flexible UUID referencing system that’s layered on top of our scheduling system’s workflow DSL. Later in the article we’ll cover this referencing system in some detail. But first, let’s look at an example of deploying an asset and a workflow.

Deployment of an asset

Let’s walk through an example of a workflow asset build and deployment. Let’s assume we have a repository called stranger-data with the following structure:

.
├── dataflow.yaml
├── pyspark-workflow
│ ├── main.sch.yaml
│ └── hello_world
│ ├── ...
│ └── setup.py
└── scala-workflow
├── build.gradle
├── main.sch.yaml
└── src
├── main
│ └── ...
└── test
└── ...

Let’s now use Dataflow command to see what project components are visible:

stranger-data$ dataflow project list
Python Assets:
-> ./pyspark-workflow/hello_world/setup.py
Summary: 1 found.
Gradle Assets:
-> ./scala-workflow/build.gradle
Summary: 1 found.
Scheduler Workflows:
-> ./scala-workflow/main.sch.yaml
-> ./pyspark-workflow/main.sch.yaml
Summary: 2found.

Before deploying the assets, and especially if we made any changes to them, we can run unit tests to make sure that we didn’t break anything. In a typical Dataflow configuration this manual testing is optional because Dataflow continuous integration tests will do that for us on any pull-request.

stranger-data$ dataflow project test
Testing Python Assets:
-> ./pyspark-workflow/hello_world/setup.py... PASSED
Summary: 1 successful, 0 failed.
Testing Gradle Assets:
-> ./scala-workflow/build.gradle... PASSED
Summary: 1 successful, 0 failed.
Building Scheduler Workflows:
-> ./scala-workflow/main.sch.yaml... CREATED ./.workflows/scala-workflow.main.sch.rendered.yaml
-> ./pyspark-workflow/main.sch.yaml... CREATED ./.workflows/pyspark-workflow.main.sch.rendered.yaml
Summary: 2 successful, 0 failed.
Testing Scheduler Workflows:
-> ./scala-workflow/main.sch.yaml... PASSED
-> ./pyspark-workflow/main.sch.yaml... PASSED
Summary: 2 successful, 0 failed.

Notice that the test command we use above not only executes unit test suites defined in our Scala and Python sub-projects, but it also renders and statically validates all the workflow definitions in our repo, but more on that later…

Assuming all tests passed, let’s now use the Dataflow command to build and deploy a new version of the Scala and Python assets into the Dataflow asset registry.

stranger-data$ dataflow project deploy
Building Python Assets:
-> ./pyspark-workflow/hello_world/setup.py... CREATED ./pyspark-workflow/hello_world/dist/hello_world-0.0.1-py3.7.egg
Summary: 1 successful, 0 failed.
Deploying Python Assets:
-> ./pyspark-workflow/hello_world/setup.py... DEPLOYED AS dataflow.egg.hello_world.user.stranger-data.master.39206ee8.3
Summary: 1 successful, 0 failed.
Building Gradle Assets:
-> ./scala-workflow/build.gradle... CREATED ./scala-workflow/build/libs/scala-workflow-all.jar
Summary: 1 successful, 0 failed.
Deploying Gradle Assets:
-> ./scala-workflow/build.gradle... DEPLOYED AS dataflow.jar.scala-workflow.user.stranger-data.master.39206ee8.11
Summary: 1 successful, 0 failed.
...

Notice that the above command:

  • created a new version of the workflow assets
  • assigned the asset a “UUID” (consisting of the “dataflow” string, asset type, asset namespace, git repo owner, git repo name, git branch name, commit hash and consecutive build number)
  • and deployed them to a Dataflow managed S3 location.

We can check the existing assets of any given type deployed to any given namespace using the following Dataflow command:

stranger-data$ dataflow project list eggs --namespace hello_world --deployed
Project namespaces with deployed EGGS:
hello_world
-> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.3
-> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.2
-> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.1

The above list could come in handy, for example if we needed to find and access an older version of an asset deployed from a given branch and commit hash.

Deployment of a workflow

Now let’s have a look at the build and deployment of the workflow definition which references the above assets as part of its pipeline DAG.

Let’s list the workflow definitions in our repo again:

stranger-data$ dataflow project list workflows
Scheduler Workflows:
-> ./scala-workflow/main.sch.yaml
-> ./pyspark-workflow/main.sch.yaml
Summary: 2 found.

And let’s look at part of the content of one of these workflows:

stranger-data$ cat ./scala-workflow/main.sch.yaml
...
dag:
- ddl -> write
- write -> audit
- audit -> publish
jobs:
- ddl: ...
- write:
spark:
script: ${dataflow.jar.scala-workflow}
class: com.netflix.spark.ExampleApp
conf: ...
params: ...
- audit: ...
- publish: ...
...

You can see from the above snippet that the write job wants to access some version of the JAR from the scala-workflow namespace. A typical workflow definition, written in YAML, does not need any compilation before it is shipped to the Scheduler API, but Dataflow designates a special step called “rendering” to substitute all of the Dataflow variables and build the final version.

The above expression ${dataflow.jar.scala-workflow} means that the workflow will be rendered and deployed with the latest version of the scala-workflow JAR available at the time of the workflow deployment. It is possible that the JAR is built as part of the same repository in which case the new build of the JAR and a new version of the workflow may be coming from the same deployment. But the JAR may be built as part of a completely different project and in that case the testing and deployment of the new workflow version can be completely decoupled.

We showed above how one would request the latest asset version available during deployment, but with Dataflow asset management we can distinguish two more asset access patterns. An obvious next one is to specify it by all its attributes: asset type, asset namespace, git repo owner, git repo name, git branch name, commit hash and consecutive build number. There is one more extra method for a middle ground solution to pick a specific build for a given namespace and git branch, which can help during testing and development. All of this is part of the user-interface for determining how the deployment bundle will be created. See below diagram for a visual illustration.

Figure 3. A closer at the Deployment Bundle

In short, using the above variables gives the user full flexibility and allows them to pick any version of any asset in any workflow.

An example of the workflow deployment with the rendering step is shown below:

stranger-data$ dataflow project deploy
...
Building Scheduler Workflows:
-> ./scala-workflow/main.sch.yaml... CREATED ./.workflows/scala-workflow.main.sch.rendered.yaml
-> ./pyspark-workflow/main.sch.yaml... CREATED ./.workflows/pyspark-workflow.main.sch.rendered.yaml
Summary: 2 successful, 0 failed.
Deploying Scheduler Workflows:
-> ./scala-workflow/main.sch.yaml… DEPLOYED AS https://hawkins.com/scheduler/sandbox:user.stranger-data.scala-workflow
-> ./pyspark-workflow/main.sch.yaml… DEPLOYED AS https://hawkins.com/scheduler/sandbox:user.stranger-data.pyspark-workflow
Summary: 2 successful, 0 failed.

And here you can see what the workflow definition looks like before it is sent to the Scheduler API and registered as the latest version. Notice the value of the script variable of the write job. In the original code says ${dataflow.jar.scala-workflow} and in the rendered version it is translated to a specific file pointer:

stranger-data$ cat ./scala-workflow/main.sch.yaml
...
dag:
- ddl -> write
- write -> audit
- audit -> publish
jobs:
- ddl: ...
- write:
spark:
script: s3://dataflow/jars/scala-workflow/user/stranger-data/master/39206ee8/1.jar
class: com.netflix.spark.ExampleApp
conf: ...
params: ...
- audit: ...
- publish: ...
...

User perspective

The Infrastructure DSE team at Netflix is responsible for providing insights into data that can help the Netflix platform and service scale in a secure and effective way. Our team members partner with business units like Platform, OpenConnect, InfoSec and engage in enterprise level initiatives on a regular basis.

One side effect of such wide engagement is that over the years our repository evolved into a mono-repo with each module requiring a customized build, testing and deployment strategy packaged into a single Jenkins job. This setup required constant upkeep and also meant every time we had a build failure multiple people needed to spend a lot of time in communication to ensure they did not step on each other.

Last quarter we decided to split the mono-repo into separate modules and adopt Dataflow as our asset orchestration tool. Post deployment, the team relies on Dataflow for automated execution of unit tests, management and deployment of workflow related assets.

By the end of the migration process our Jenkins configuration went from:

Figure 4. Real example of a deployment script.

to:

cd /dataflow_workspace
dataflow project deploy

The simplicity of deployment enabled the team to focus on the problems they set out to solve while the branch based customization gave us the flexibility to be our most effective at solving them.

Conclusions

This new method available for Netflix data engineers makes workflow management easier, more transparent and more reliable. And while it remains fairly easy and safe to build your business logic code (in Scala, Python, etc) in the same repository as the workflow definition that invokes it, the new Dataflow versioned asset registry makes it easier yet to build that code completely independently and then reference it safely inside data pipelines in any other Netflix repository, thus enabling easy code sharing and reuse.

One more aspect of data workflow development that gets enabled by this functionality is what we call branch-driven deployment. This approach enables multiple versions of your business logic and workflows to be running at the same time in the scheduler ecosystem, and makes it easy, not only for individual users to run isolated versions of the code during development, but also to define isolated staging environments through which the code can pass before it reaches the production stage. Obviously, in order for the workflows to be safely used in that configuration they must comply with a few simple rules with regards to the parametrization of their inputs and outputs, but let’s leave this subject for another blog post.

Credits

Special thanks to Peter Volpe, Harrington Joseph and Daniel Watson for the initial design review.


Data pipeline asset management with Dataflow was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Demystifying Interviewing for Backend Engineers @ Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/demystifying-interviewing-for-backend-engineers-netflix-aceb26a83495

By Karen Casella, Director of Engineering, Access & Identity Management

Have you ever experienced one of the following scenarios while looking for your next role?

  • You study and practice coding interview problems for hours/days/weeks/months, only to be asked to merge two sorted lists.
  • You apply for multiple roles at the same company and proceed through the interview process with each hiring team separately, despite the fact that there is tremendous overlap in the roles.
  • You go through the interview process, do really well, get really excited about the company and the people you meet, and in the end, you are “matched” to a role that does not excite you, working with a manager and team you have not even met during the interview process.

Interviewing can be a daunting endeavor and how companies, and teams, approach the process varies greatly. We hope that by demystifying the process, you will feel more informed and confident about your interview experience.

Backend Engineering Interview Loop

When you apply for a backend engineering role at Netflix, or if one of our recruiters or hiring managers find your LinkedIn profile interesting, a recruiter or hiring manager reviews your technical background and experience to see if your experience is aligned with our requirements. If so, we invite you to begin the interview process.

Most backend engineering teams follow a process very similar to what is shown below. While this is a relatively stream-lined process, it is not as efficient if a candidate is interested in or qualified for multiple roles within the organization.

Following is a brief description of each of these stages.

Recruiter Phone Screen: A member of our talent team contacts you to explain the process and to assess high-level qualifications . The recruiter also reviews the relevant open roles to see if you have a strong affinity for one or another. If your interests and experience align well with one or more of the roles, they schedule a phone screen with one of the hiring managers.

Manager Phone Screen: The purpose of this discussion is to get a sense for your technical background, your approach to problem solving, and how you work. It’s also a great opportunity for you to learn more about the available roles, the technical challenges the teams are facing and what it’s like to work on a backend engineering team at Netflix.

Technical Screen: The final screen before on-site interviews is used to assess your technical skills and match for the team. For many roles, you will be given a choice between a take-home coding exercise or a one-hour discussion with one of the engineers from the team. The problems you are asked to solve are related to the work of the team.

Round 1 Interviews: If you are invited on-site, the first round interview is with four or five people for 45 minutes each. The interview panel consists of two or three engineers, a hiring manager and a recruiter. The engineers assess your technical skills by asking you to solve various design and coding problems. These questions reflect actual challenges that our teams face.

Round 2 Interviews: You meet with two or three additional people, for 45 minutes each. The interview panel comprises an engineering director, a partner engineer or manager, and another engineering leader. The focus of this round is to assess how well you partner with other teams and your non-technical skills.

Decision & Offer: After round 2, we review the feedback and decide whether or not we will be offering you a role. If so, you will work with the recruiter to discuss compensation expectations, answer any questions that remain for you, and discuss a start date with your new team.

Enter Centralized Hiring

Some Netflix backend engineering teams, seeking stunning colleagues with similar backgrounds and talents, are joining forces and adopting a centralized hiring model. Centralized hiring is an approach of making multiple hiring decisions through one unified hiring process across multiple teams with shared needs in skill, function and experience level.

The interview approach does not vary much from what is shown above, with one big exception: there are several potential “pivot points” where you and / or Netflix may decide to focus on a particular role based on your experience and preference. At each stage of the process, we consider your preference and skills and may focus your remaining interviews with a specific team if we both consider it a strong match. It’s important to note that, even though your experience may not be an exact match for one team, you might be more closely aligned with another team. In that case, we would pivot you to another team rather than disqualify you from the process.

Interview Tips

Interviewing can be intimidating and stressful! Being prepared can help you minimize stress and anxiety. Following are a few quick tips to help you prepare:

  • Review your profile and make connections between your experience and the job description.
  • Think about your past work experiences and prepare some examples of when you achieved something amazing, or had some tough challenges.
  • We recommend against interview coding practice puzzle-type exercises, as we don’t ask those types of questions. If you want to practice, focus on medium-difficulty real-world problems you might encounter in a software engineering role.
  • Be sure to have questions prepared to ask the interviewers. This is a conversation, not an inquisition!

We are here to accommodate any accessibility needs you may have, to ensure that you’re set up for success during your interview. Let us know if you need any assistive technology or other accommodations ahead of time, and we’ll be sure to work with you to get it set up.

We want to see you at your best — we are not trying to trick you or trip you up! Try to relax, remember to breathe, and be honest and curious. Remember, this is not just about whether Netflix thinks you are a fit for the role, it’s about you deciding that Netflix and the role are right for you!

Yes, We Are Hiring!

Several of our backend engineering teams are searching for our next stunning colleagues. Some of the areas for which we are actively seeking backend engineers include Streaming & Gaming Technologies, Product Innovation, Infrastructure, and Studio Technologies. If any of the high-level descriptions below are of interest to you and seem like a good match for your experience and career goals, we’d like to hear from you! Simply click on the job description link and submit your application through our jobs site.

Streaming & Gaming Technologies

(https://jobs.netflix.com/jobs/175726412)

  • You are a distributed systems engineer working on product backend systems that support streaming video and/or mobile & cloud games.
  • You’re passionate about resilience, scalability, availability, and observability. Passion for large data sets, APIs, access & identity management, or delivering backend systems that enable mobile and cloud gaming is a big plus.
  • Your work centers around architecting, building and operating fault-tolerant distributed systems at massive scale.

Product Innovation

(https://jobs.netflix.com/jobs/175728345)

  • You are a distributed systems engineer working on core backend services that support our user journeys in signup, subscription, search, personalization and messaging.
  • You’re passionate about working at the intersection of business, product and technology at large scale.
  • Your work centers around building fault-tolerant backend systems and services that make a direct impact on users and the business.

Infrastructure

(https://jobs.netflix.com/jobs/122163878)

  • You are a distributed systems engineer working on infrastructure and platforms that enable or amplify the work of other engineering teams or systems.
  • You’re passionate about scalable and highly available complex distributed systems and have a deep understanding of how they operate and fail.
  • Your work centers around raising levels of abstraction to improve development at scale and creating engineering efficiencies.

Studio Technologies

(https://jobs.netflix.com/jobs/175745345)

  • You are a software engineer that builds products and services used by creative partners across the studio and external productions to produce and manage all of Netflix global content. Our products enable the entire workflow of content acquisition, production, promotion and financing from script to screen. We create innovative solutions that develop and manage entertainment at scale while helping entertain the world as members find joy in the shows and movies they love.
  • You’re passionate about innovation, scalability, functionality, shipping high-value features quickly and are committed to delivering exceptional backend systems for our consumers. You’re humble, curious, and looking to deliver results with other stunning colleagues.
  • Your work centers around building products and services targeting creative partners producing/managing global content.

Conclusion

Netflix has a Freedom & Responsibility culture in which every Netflix employee has the freedom to do their best work and the responsibility to achieve excellence. We value strong judgment, communication, impact, curiosity, innovation, courage, passion, integrity, selflessness, inclusion, and diversity. For more information on the culture, see http://jobs.netflix.com/culture.

Karen Casella is the Director of Engineering for Access & Identity Management technologies for Netflix streaming and gaming products. Connect with Karen on LinkedIn or Twitter.


Demystifying Interviewing for Backend Engineers @ Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Netflix: A Culture of Learning

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/netflix-a-culture-of-learning-394bc7d0f94c

Martin Tingley with Wenjing Zheng, Simon Ejdemyr, Stephanie Lane, Colin McFarland, Mihir Tendulkar, and Travis Brooks

This is the last post in an overview series on experimentation at Netflix. Need to catch up? Earlier posts covered the basics of A/B tests (Part 1 and Part 2 ), core statistical concepts (Part 3 and Part 4), how to build confidence in a decision (Part 5), and the the role of Experimentation and A/B testing within the larger Data Science and Engineering organization at Netflix (Part 6).

Earlier posts in this series covered the why, what and how of A/B testing, all of which are necessary to reap the benefits of experimentation for product development. But without a little magic, these basics are still not enough.

The secret sauce that turns the raw ingredients of experimentation into supercharged product innovation is culture. There are never any shortcuts when developing and growing culture, and fostering a culture of experimentation is no exception. Building leadership buy-in for an approach to learning that emphasizes A/B testing, building trust in the results of tests, and building the technical capabilities to execute experiments at scale all take time — particularly within an organization that’s new to these ideas. But the pay-offs of using experimentation and the virtuous cycle of product development via the scientific method are well worth the effort. Our colleagues at Microsoft have shared thoughtful publications on how to Kickstart the Experimentation Flywheel and build a culture of experimentation, while their “Crawl, Walk, Run, Fly” model is a great tool for assessing the maturity of an experimentation practice.

At Netflix, we’ve been leveraging experimentation and the scientific method for decades, and are fortunate to have a mature experimentation culture. There is broad buy-in across the company, including from the C-Suite, that, whenever possible, results from A/B tests or other causal inference approaches are near-requirements for decision making. We’ve also invested in education programs to up-level company-wide understanding of how we use A/B tests as a framework for product development. In fact, most of the material from this blog series has been adapted from our internal Experimentation 101 and 201 classes, which are open to anyone at Netflix.

Netflix is organized to learn

As a company, Netflix is organized to emphasize the importance of learning from data, including from A/B tests. Our Data and Insights organization has teams that partner with all corners of the company to deliver a better experience to our members, from understanding content preferences around the globe to delivering a seamless customer support experience. We use qualitative and quantitative consumer research, analytics, experimentation, predictive modeling, and other tools to develop a deep understanding of our members. And we own the data pipelines that power everything from executive-oriented dashboards to the personalization systems that help connect each Netflix member with content that will spark joy for them. This data-driven mindset is ubiquitous at all levels of the company, and the Data and Insights organization is represented at the highest echelon of Netflix Leadership.

As discussed in Part 6, there are experimentation and causal inference focussed data scientists who collaborate with product innovation teams across Netflix. These data scientists design and execute tests to support learning agendas and contribute to decision making. By diving deep into the details of single test results, looking for patterns across tests, and exploring other data sources, these Netflix data scientists build up domain expertise about aspects of the Netflix experience and become valued partners to product managers and engineering leaders. Data scientists help shape the evolution of the Netflix product through opportunity sizing and identifying areas ripe for innovation, and frequently propose hypotheses that are subsequently tested.

We’ve also invested in a broad and flexible experimentation platform that allows our experimentation program to scale with the ambitions of the company to learn more and better serve Netflix members. Just as the Netflix product itself has evolved over the years, our approach to developing technologies to support experimentation at scale continues to evolve. In fact, we’ve been working to improve experimentation platform solutions at Netflix for more than 20 years — our first investments in tooling to support A/B tests came way back in 2001.

Early experimentation tooling at Netflix, from 2001.

Learning and experimentation are ubiquitous across Netflix

Netflix has a unique internal culture that reinforces the use of experimentation and the scientific method as a means to deliver more joy to all of our current and future members. As a company, we aim to be curious, and to truly and honestly understand our members around the world, and how we can better entertain them. We are also open minded, knowing that great ideas can come from unlikely sources. There’s no better way to learn and make great decisions than to confirm or falsify ideas and hypotheses using the power of rigorous testing. Openly and candidly sharing test results allows everyone at Netflix to develop intuition about our members and ideas for how we can deliver an ever better experience to them — and then the virtuous cycle starts again.

In fact, Netflix has so many tests running on the product at any given time that a member may be simultaneously allocated to several tests. There is not one Netflix product: at any given time, we are testing out a large number of product variants, always seeking to learn more about how we can deliver more joy to our current members and attract new members. Some tests, such as the Top 10 list, are easy for users to notice, while others, such as changes to the personalization and search systems or how Netflix encodes and delivers streaming video, are less obvious.

At Netflix, we are not afraid to test boldly, and to challenge fundamental or long-held assumptions. The Top 10 list is a great example of both: it’s a large and noticeable change that surfaces a new type of evidence on the Netflix product. Large tests like this can open up whole new areas for innovation, and are actively socialized and debated within the company (see below). On the other end of the spectrum, we also run tests on much smaller scales in order to optimize every aspect of the product. A great example is the testing we do to find just the right text copy for every aspect of the product. By the numbers, we run far more of these smaller and less noticeable tests, and we invest in end-to-end infrastructure that simplifies their execution, allowing product teams to rapidly go from hypothesis to test to roll out of the winning experience. As an example, the Shakespeare project provides an end-to-end solution for rapid text copy testing that integrates with the centralized Netflix experimentation platform. More generally, we are always on the lookout for new areas that can benefit from experimentation, or areas where additional methodology or tooling can produce new or faster learnings.

Debating tests and the importance of humility

Netflix has mature operating mechanisms to debate, make, and socialize product decisions. Netflix does not make decisions by committee or by seeking consensus. Instead, for every significant decision there is a single “Informed Captain” who is ultimately responsible for making a judgment call after digesting relevant data and input from colleagues (including dissenting perspectives). Wherever possible, A/B test results or causal inference studies are an expected input to this decision making process.

In fact, not only are test results expected for product decisions — it’s expected that decisions on investment areas for innovation and testing, test plans for major innovations, and results of major tests are all summarized in memos, socialized broadly, and actively debated. The forums where these debates take place are broadly accessible, ensuring a diverse set of viewpoints provide feedback on test designs and results, and weigh in on decisions. Invites for these forums are open to anyone who is interested, and the price of admission is reading the memo. Despite strong executive attendance, there’s a notable lack of hierarchy in these forums, as we all seek to be led by the data.

Netflix data scientists are active and valued participants in these forums. Data scientists are expected to speak for the data, both what can and what cannot be concluded from experimental results, the pros and cons of different experimental designs, and so forth. Although they are not informed captains on product decisions, data scientists, as interpreters of the data, are active contributors to key product decisions.

Product evolution via experimentation can be a humbling experience. At Netflix, we have experts in every discipline required to develop and evolve the Netflix service (product managers, UI/UX designers, data scientists, engineers of all types, experts in recommendation systems and streaming video optimization — the list goes on), who are constantly coming up with novel hypotheses for how to improve Netflix. But only a small percentage of our ideas turn out to be winners in A/B tests. That’s right: despite our broad expertise, our members let us know, through their actions in A/B tests, that most of our ideas do not improve the service. We build and test hundreds of product variants each year, but only a small percentage end up in production and rolled out to the more than 200 million Netflix members around the world.

The low win rate in our experimentation program is both humbling and empowering. It’s hard to maintain a big ego when anyone at the company can look at the data and see all the big ideas and investments that have ultimately not panned out. But nothing proves the value of decision making through experimentation like seeing ideas that all the experts were bullish on voted down by member actions in A/B tests — and seeing a minor tweak to a sign up flow turn out to be a massive revenue generator.

At Netflix, we do not view tests that do not produce winning experience as “failures.” When our members vote down new product experiences with their actions, we still learn a lot about their preferences, what works (and does not work!) for different member cohorts, and where there may, or may not be, opportunities for innovation. Combining learnings from tests in a given innovation area, such as the Mobile UI experience, helps us paint a more complete picture of the types of experiences that do and do not resonate with our members, leading to new hypotheses, new tests, and, ultimately, a more joyful experience for our members. And as our member base continues to grow globally, and as consumer preferences and expectations continue to evolve, we also revisit ideas that were unsuccessful when originally tested. Sometimes there are signals from the original analysis that suggest now is a better time for that idea, or that it will provide value to some of our newer member cohorts.

Because Netflix tests all ideas, and because most ideas are not winners, our culture of experimentation democratizes ideation. Product managers are always hungry for ideas, and are open to innovative suggestions coming from anyone in the company, regardless of seniority or expertise. After all, we’ll test anything before rolling it out to the member base, and even the experts have low success rates! We’ve seen time and time again at Netflix that product suggestions large and small that arise from engineers, data scientists, even our executives, can result in unexpected wins.

(Left) Very few of our ideas are winners. (Right) Experimentation democratizes ideation. Because we test all ideas, and because most do not win, there’s an openness to product ideas coming from all corners of the business: anyone can raise their hand and make a suggestion.

A culture of experimentation allows more voices to contribute to ideation, and far, far more voices to help inform decision making. It’s a way to get the best ideas from everyone working on the product, and to ensure that the innovations that are rolled out are vetted and approved by members.

A better product for our members and an internal culture that is humble and values ideas and evidence: experimentation is a win-win proposition for Netflix.

Emerging research areas

Although Netflix has been running experiments for decades, we’ve only scratched the surface relative to what we want to learn and the capabilities we need to build to support those learning ambitions. There are open challenges and opportunities across experimentation and causal inference at Netflix: exploring and implementing new methodologies that allow us to learn faster and better; developing software solutions that support research; evolving our internal experimentation platform to better serve a growing user community and ever increasing size and throughput of experiments. And there’s a continuous focus on evolving and growing our experimentation culture through internal events and education programs, as well as external contributions. Here are a few themes that are on our radar:

Increasing velocity: beyond fixed time horizon experimentation.

This series has focused on fixed time horizon tests: sample sizes, the proportion of traffic allocated to each treatment experience, and the test duration are all fixed in advance. In principle, the data are examined only once, at the conclusion of the test. This ensures that the false positive rate (see Part 3) is not increased by peeking at the data numerous times. In practice, we’d like to be able to call tests early, or to adapt how incoming traffic is allocated as we learn incrementally about which treatments are successful and which are not, in a way that preserves the statistical properties described earlier in this series. To enable these benefits, Netflix is investing in sequential experimentation that permits for valid decision making at any time, versus waiting until a fixed time has passed. These methods are already being used to ensure safe deployment of Netflix client applications. We are also investing in support for experimental designs that adaptively allocate traffic throughout the test towards promising treatments. The goal of both these efforts is the same: more rapid identification of experiences that benefit members.

Scaling support for quasi experimentation and causal inference.

Netflix has learned an enormous amount, and dramatically improved almost every aspect of the product, using the classic online A/B tests, or randomized controlled trials, that have been the focus of this series. But not every business question is amenable to A/B testing, whether due to an inability to randomize at the individual level, or due to factors, such as spillover effects, that may violate key assumptions for valid causal inference. In these instances, we often rely on the rigorous evaluation of quasi-experiments, where units are not assigned to a treatment or control condition by a random process. But the term “quasi-experimentation” itself covers a broad category of experimental design and methodological approaches that differ between the myriad academic backgrounds represented by the Netflix data science community. How can we synthesize best practices across domains and scale our approach to enable more colleagues to leverage quasi-experimentation?

Our early successes in this space have been driven by investments in knowledge sharing across business verticals, education, and enablement via tooling. Because quasi-experiment use cases span many domains at Netflix, identifying common patterns has been a powerful driver in developing shared libraries that scientists can use to evaluate individual quasi-experiments. And to support our continued scale, we’ve built internal tooling that coalesces data retrieval, design evaluation, analysis, and reproducible reporting, all with the goal to enable our scientists.

We expect our investments in research, tooling, and education for quasi-experiments to grow over time. In success, we will enable both scientists and their cross functional partners to learn more about how to deliver more joy to current and future Netflix members.

Experimentation Platform as a Product.

We treat the Netflix Experimentation Platform as an internal product, complete with its own product manager and innovation roadmap. We aim to provide an end-to-end paved path for configuring, allocating, monitoring, reporting, storing and analyzing A/B tests, focusing on experimentation use cases that are optimized for simplicity and testing velocity. Our goal is to make experimentation a simple and integrated part of the product lifecycle, with little effort required on the part of engineers, data scientists, or PMs to create, analyze, and act on tests, with automation available wherever the test owner wants it.

However, if the platform’s default paths don’t work for a specific use case, experimenters can leverage our democratized contribution model, or reuse pieces of the platform, to build out their own solutions. As experimenters innovate on the boundaries of what’s possible in measurement methodology, experimental design, and automation, the Experimentation Platform team partners to commoditize these innovations and make them available to the broader organization.

Three core principles guide product development for our experimentation platform:

  • Complexities and nuances of testing such as allocations and methodologies should, typically, be abstracted away from the process of running a single test, with emphasis instead placed on opinionated defaults that are sensible for a set of use cases or testing areas.
  • Manual intervention at specific steps in the test execution should, typically, be optional, with emphasis instead on test owners being able to invest their attention where they feel it adds value and leave other areas to automation.
  • Designing, executing, reporting, deciding, and learning are all different phases of the experiment lifecycle that have differing needs and users, and each stage benefits from purpose built tooling for each use.

Conclusion

Netflix has a strong culture of experimentation, and results from A/B tests, or other applications of the scientific method, are generally expected to inform decisions about how to improve our product and deliver more joy to members. To support the current and future scale of experimentation required by the growing Netflix member base and the increasing complexity of our business, Netflix has invested in culture, people, infrastructure, and internal education to make A/B testing broadly accessible across the company.

And we are continuing to evolve our culture of learning and experimentation to deliver more joy to Netflix members around the world. As our member base and business grows, smaller differences between treatment and control experiences become materially important. That’s also true for subsets of the population: with a growing member base, we can become more targeted and look to deliver positive experiences to cohorts of users, defined by geographical region, device type, etc. As our business grows and expands, we are looking for new places that could benefit from experimentation, ways to run more experiments and learn more with each, and ways to accelerate our experimentation program while making experimentation accessible to more of our colleagues.

But the biggest opportunity is to deliver more joy to our members through the virtuous cycle of experimentation.

Interested in learning more? Explore our research site.

Interested in joining us? Explore our open roles.


Netflix: A Culture of Learning was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Fixing Performance Regressions Before they Happen

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/fixing-performance-regressions-before-they-happen-eab2602b86fe

Angus Croll

Netflix is used by 222 million members and runs on over 1700 device types ranging from state-of-the-art smart TVs to low-cost mobile devices.

At Netflix we’re proud of our reliability and we want to keep it that way. To that end, it’s important that we prevent significant performance regressions from reaching the production app. Sluggish scrolling or late rendering is frustrating and triggers accidental navigations. Choppy playback makes watching a show less enjoyable. Any performance regression that makes it into a product release will degrade user experience, so the challenge is to detect and fix such regressions before they ship.

This post describes how the Netflix TVUI team implemented a robust strategy to quickly and easily detect performance anomalies before they are released — and often before they are even committed to the codebase.

What do we mean by Performance?

Technically, “performance” metrics are those relating to the responsiveness or latency of the app, including start up time.

But TV devices also tend to be more memory constrained than other devices, and as such are more liable to crash during a memory spike — so for Netflix TV we actually care about memory at least as much as performance, maybe more so.

At Netflix the term “performance” usually encompasses both performance metrics (in the strict meaning) and memory metrics, and that’s how we’re using the term here.

Why do we run Performance Tests on commits?

It’s harder to reason about the performance profile of pre-production code since we can’t gather real-time metrics for code that hasn’t yet shipped. We do cut a canary release in advance of shipment which is dogfooded by Netflix employees and subject to the same metrics collection as the production release. While the canary release is a useful dry-run for pending shipments, it sometimes misses regressions because the canary user base is a fraction of the production release. And in the event that regressions are detected in the canary, it still necessitates an often messy and time consuming revert or patch.

By running performance tests against every commit (pre- and post-merge), we can detect potentially regressive commits earlier. The sooner we detect such commits the fewer subsequent builds are affected and the easier it is to revert. Ideally we catch regressions before they even reach the main branch.

What are the Performance Tests?

The goal of our TVUI Performance Tests is to gather memory and responsiveness metrics while simulating the full range of member interactions with Netflix TV.

There are roughly 50 performance tests, each one designed to reproduce an aspect of member engagement. The goal is to keep each test brief and focused on a specific, isolated piece of functionality (startup, profile switching, scrolling through titles, selecting an episode, playback etc.), while the test suite as a whole should cover the entire member experience with minimal duplication. In this way we can run multiple tests in parallel and the absence of long pole tests keeps the overall test time manageable and allows for repeat test runs. Every test runs on a combination of devices (physical and virtual) and platform versions (SDKs). We’ll refer to each unique test/device/SDK combination as a test variation.

We run the full performance suite twice per Pull Request (PR):

  • when the PR is first submitted
  • when the PR is merged to the destination branch

Measurement

Each performance test tracks either memory or responsiveness. Both of these metrics will fluctuate over the course of a test, so we post metric values at regular intervals throughout the test. To compare test runs we need a method to consolidate this range of observed values into a single value.

We made the following decisions:

Memory Tests: use the maximum memory value observed during the test run (because that’s the value that determines whether a device could crash).

Responsiveness Tests : use the median value observed during the test run (based on the assumption that perceived slowness is influenced by all responses, not just the worst response).

What are the Challenges?

When Netflix is running in production, we capture real-time performance data which makes it relatively easy to make assertions about the app’s performance. It’s much harder to assess the performance of pre-production code (changes merged to the main branch but not yet released) and harder still to get a performance signal for unmerged code in a PR. Performance test metrics are inferior to real-time usage metrics for several reasons:

  • Data volume: In the Netflix app, the same steps are repeated billions of times, but developer velocity and resource constraints dictate that performance tests can only run a handful of times per build.
  • Simulation: No matter how rigorous or creative our testing process is, we can only ever approximate the experience of real life users, never replicate it. Real users regularly use Netflix for hours at a time, and every user has different preferences and habits.
  • Noise: Ideally a given codebase running any given test variation will always return identical results. In reality that just never happens: no two device CPUs are identical, garbage collection is not entirely predictable, API request volume and backend activity is variable — so are power levels and network bandwidth. For every test there will be background noise that we need to somehow filter from our analysis.

Initial Approach: Static Thresholds

For our first attempt at performance validation we assigned maximum acceptable threshold values for memory metrics. There was a sound rationale behind this approach — when a TV runs Netflix there is a hard limit for memory footprint beyond which Netflix has the potential to crash.

There were several issues with the static thresholds approach:

  • Custom preparation work per test: Since each test variation has a unique memory profile, the appropriate static threshold had to be researched and assigned on a case-by-case basis. This was difficult and time consuming, so we only assigned thresholds to about 30% of test variations.
  • Lack of context: As a validation technique, static thresholds proved to be somewhat arbitrary. Imagine a commit that increases memory usage by 10% but to a level which is just below the threshold. The next commit might be a README change (zero memory impact) but due to normal variations in device background noise, the metric could increase by just enough to breach the threshold.
  • Background variance is not filtered: Once the codebase is bumping against the memory threshold, background device noise becomes the principal factor determining which side of the threshold line the test result falls.
Unreliable regression signals with static Threshold technique
  • Post-alert adjustments: We found ourselves repeatedly increasing the thresholds to move them clear of background noise

The Pivot: Anomaly and Changepoint Detection

It became apparent we needed a technique for performance validation that:

  • Removes failure bias by giving equal weight to all test runs, regardless of results
  • Doesn’t treat performance data points in isolation, but instead assesses the performance impact of a build in relation to previous builds.
  • Can be automatically applied to every test without the need for pre-hoc research, data entry or ongoing manual intervention
  • Could be equally applied to test data of any type: memory, responsiveness, or any other non-boolean test data
  • Minimizes the impact of background noise by prioritizing variance over absolute values
  • Improves insight by examining data points both at the time of creation and retroactively

We settled on a two-pronged approach:

  • Anomaly Detection immediately calls out potential performance regressions by comparing with recent past data
  • Changepoint Detection identifies more subtle performance inflections by examining past and future data clusters

Anomaly Detection

We define an anomaly as any metric data point that is more than n standard deviations above the recent mean, where recent mean and standard deviation are derived from the previous m test runs. For Netflix TV performance tests we currently set n to 4 and m to 40 but these values can be tweaked to maximize signal to noise ratio. When an anomaly is detected the test status is set to failed and an alert is generated.

Anomaly detection works because thresholds are dynamic and derived from existing data. If the data exhibits a lot of background variance then the anomaly threshold will increase to account for the extra noise.

Changepoints

Changepoints are data points at the boundary of two distinct data distribution patterns. We use a technique called e-divisive to analyze the 100 most recent test runs, using a Python implementation based on this implementation.

Since we’re only interested in performance regressions, we ignore changepoints that trend lower. When a changepoint is detected for a test, we don’t fail the test or generate an alert (we consider changepoints to be warnings of unusual patterns, not full blown error assertions).

As you can see, changepoints are a more subtle signal. They don’t necessarily indicate a regression but they suggest builds that had an impact on subsequent data distribution.

Builds that generate changepoints across multiple tests, warrant further investigation before they can be included in the release candidate.

Changepoints give us more confidence in regression detection because they disregard false positives such as one time data spikes. Because changepoint detection requires after-the-fact data, they are best suited to identifying potentially regressive code that is already in the main branch but has not yet been shipped.

Additional Adjustments

Runs per Test

To address failure bias, we decided to run all tests 3 times, regardless of the result. We chose 3 iterations to provide enough data to eliminate most device noise (tests are allocated to devices randomly) without creating a productivity bottleneck.

Summarizing across Test Runs

Next we needed to decide on a methodology to compress the results of each batch of 3 runs into a single value. The goal was to ignore outlier results caused by erratic device behavior.

Initially we took the average of those three runs, but that led to an excess of false positives because the most irregular test runs exerted too much influence on the result. Switching to the median eliminated some of these false positives but we were still getting an unacceptable number of excess alerts (because during periods of high device noise we would occasionally see outlier results two times out of three). Finally, since we noticed that outlier results tended to be higher than normal — rarely lower — we settled on using the minimum value across the 3 runs and this proved to be the most effective at eliminating external noise.

All data points (3 runs per build)
Selecting median value per build
Selecting minimum value per build

What were the Results?

After switching our performance validation to use anomaly and changepoint detection we noticed several improvements.

a) We are alerted for potential performance regressions far less often, and when we do get alerted it’s much more likely to indicate a genuine regression. Our workload is further reduced by no longer having to manually increment static performance thresholds after each false positive.

The following table represents the alert summary for two distinct months last year. In March 2021 we still used static thresholds for regression alerts. By October 2021 we had switched using anomaly detection for regression alerts. Alerts which were true regressions is the number of alerted commits for which the suspected regression turned out to be both significant and persistent.

Note that since the March tests only validated when a threshold was manually set, the total number of validating test runs in October was much greater, and yet we still got only 10% of the alerts.

b) We are not alerted for subsequent innocuous builds that inherit regressive commits from preceding builds. (Using the static threshold technique, all subsequent builds were alerted until the regressive build was reverted.) This is because regressive builds increase both mean and standard deviation and thus put subsequent non-regressing builds comfortably below the alert threshold.

Regressive build is above alert threshold
Subsequent build is easily below alert threshold

c) Performance tests against PRs, which had been almost constantly red (because the probability of at least one static threshold being breached was always high), are now mostly green. When the performance tests are red we have a much higher confidence that there is a genuine performance regression.

d) Displaying the anomaly and changepoint count per build provides a visual snapshot that quickly highlights potentially problematic builds.

What’s Next?

Further Work

There are still several things we’d like to improve

  • Make it easier to determine if regressions were due to external agents: Often it turns out the detected regression, though real, was not a result of the committed code but due to an external factor such as an upgrade to one of our platform dependencies, or a feature flag that got switched on. It would be helpful to summarize external changes in our alert summaries.
  • Factor out resolved regressions when determining baselines for validation:
    When generating recent mean and standard deviation values, we could improve regression detection by filtering out data from erstwhile regressions that have since been fixed.
  • Improve Developer Velocity: We can further reduce total test time by removing unnecessary iterations within tests, adding more devices to ensure availability, and de-emphasizing testing for those parts of the app where performance is less likely to be critical. We can also pre-build app bundles (at least partially) so that the test suite is not delayed by waiting for fresh builds.
  • More closely mirror metrics gathered by the production app: In the deployed Netflix TV app we collect additional metrics such as TTR (time to render) and empty box rate (how frequently titles in the viewport are missing images). While test metrics and metrics collected during real use do not lend themselves to direct comparison, measuring the relative change in metrics in pre-production builds can help us to anticipate regressions in production.

Wider Adoption and New Use Cases

At this point Anomaly and Changepoint detection is applied to every commit in the TVUI repo, and is in the process of being deployed for commits to the TV Player repo (the layer that manages playback operations). Other Netflix teams (outside of the TV platform) have also expressed interest in these techniques and the ultimate goal is to standardize regression detection across Netflix.

Anomaly and changepoint detection are entirely framework independent — the only required inputs are a current value and an array of recent values to compare it to. As such, their utility extends far beyond performance tests. For example, we are considering using these techniques to monitor the reliability of non-performance-based test suites — in this case the metric of interest is the percent of tests that ran to completion.

In the future we plan to decouple anomaly and changepoint logic from our test infrastructure and offer it as a standalone open-source library.

Wrap Up

By using techniques that assess the performance impact of a build in relation to the performance characteristics (magnitude, variance, trend) of adjacent builds, we can more confidently distinguish genuine regressions from metrics that are elevated for other reasons (e.g. inherited code, regressions in previous builds or one-off data spikes due to test irregularities). We also spend less time chasing false negatives and no longer need to manually assign a threshold to each result — the data itself now sets the thresholds dynamically.

This improved efficiency and higher confidence level helps us to quickly identify and fix regressions before they reach our members.

The anomaly and changepoint techniques discussed here can be used to identify regressions (or progressions), unexpected values or inflection points in any chronologically sequenced, quantitative data. Their utility extends well beyond performance analysis. For example they could be used to identify inflection points in system reliability, customer satisfaction, product usage, download volume or revenue.

We encourage you to try these techniques on your own data. We’d love to learn more about their success (or otherwise) in other contexts!


Fixing Performance Regressions Before they Happen was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.