All posts by Netflix Technology Blog

Sequential Testing Keeps the World Streaming Netflix Part 2: Counting Processes

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/sequential-testing-keeps-the-world-streaming-netflix-part-2-counting-processes-da6805341642

Michael Lindon, Chris Sanden, Vache Shirikian, Yanjun Liu, Minal Mishra, Martin Tingley

Have you ever encountered a bug while streaming Netflix? Did your title stop unexpectedly, or not start at all? In the first installment of this blog series on sequential testing, we described our canary testing methodology for continuous metrics such as play-delay. One of our readers commented

What if the new release is not related to a new play/streaming feature? For example, what if the new release includes modified login functionality? Will you still monitor the “play-delay” metric?

Netflix monitors a large suite of metrics, many of which can be classified as counts. These include metrics such as the number of logins, errors, successful play starts, and even the number of customer call center contacts. In this second installment, we describe our sequential methodology for testing count metrics, outlined in the NeurIPS paper Anytime Valid Inference for Multinomial Count Data.

Spot the Difference

Suppose we are about to deploy new code that changes the login behavior. To de-risk the software rollout we A/B test the new code, known also as a canary test. Whenever an event such as a login occurs, a log flows through our real-time backend and the corresponding timestamp is recorded. Figure 1 illustrates the sequences of timestamps generated by devices assigned to the new (treatment) and existing (control) software versions. A question that naturally concerns us is whether there are fewer login events in the treatment. Can you tell?

Figure 1: Timestamps of events occurring in control and treatment

It is not immediately obvious by simple inspection of the point processes in Figure 1. The difference becomes immediately obvious when we visualize the observed counting processes, shown in Figure 2.

Figure 2: Visualizing the counting processes — the number of events observed by time t

The counting processes are functions that increment by 1 whenever a new event arrives. Clearly, there are fewer events occurring in the treatment than in the control. If these were login events, this would suggest that the new code contains a bug that prevents some users from being able to log in successfully.

This is a common situation when dealing with event timestamps. To give another example, if events corresponded to errors or crashes, we would like to know if these are accruing faster in the treatment than in the control. Moreover, we want to answer that question as quickly as possible to prevent any further disruption to the service. This necessitates sequential testing techniques which were introduced in part 1.

Time-Inhomogeneous Poisson Process

Our data for each treatment group is a realization of a one-dimensional point process, that is, a sequence of timestamps. As the rate at which the events arrive is time-varying (in both treatment and control), we model the point process as a time-inhomogeneous Poisson point process. This point process is defined by an intensity function λ: ℝ → [0, ∞). The number of events in the interval [0,t), denoted N(t), has the following Poisson distribution

N(t) ~ Poisson(Λ(t)), where Λ(t) = ∫₀ᵗ λ(s) ds.

We seek to test the null hypothesis H₀: λᴬ(t) = λᴮ(t) for all t i.e. the intensity functions for control (A) and treatment (B) are the same. This can be done semiparametrically without making any assumptions about the intensity functions λᴬ and λᴮ. Moreover, the novelty of the research is that this can be done sequentially, as described in section 4 of our paper. Conveniently, the only data required to test this hypothesis at time t is Nᴬ(t) and Nᴮ(t), the total number of events observed so far in control and treatment. In other words, all you need to test the null hypothesis is two integers, which can easily be updated as new events arrive. Here is an example from a simulated A/A test, in which we know by design that the intensity function is the same for the control (A) and the treatment (B), albeit nonstationary.

Figure 3: (Left) An A/A simulation of two inhomogeneous Poisson point processes. (Right) Confidence sequence on the log-difference of intensity functions, and sequential p-value.

Figure 3 provides an illustration of an A/A setting. The left figure presents the raw data and the intensity functions, and the right figure presents the sequential statistical analysis. The blue and red rug plots indicate the observed arrival timestamps of events from the treatment and control streams respectively. The dashed lines are the observed counting processes. As this data is simulated under the null, the intensity functions are identical and overlay each other. The left axis of the right figure visualizes the evolution of the confidence sequence on the log-difference of intensity functions. The right axis of the right figure visualizes the evolution of the sequential p-value. We can make the two following observations

  • Under the null, the difference of log intensities is zero, which is correctly covered by the 0.95 confidence sequence at all times.
  • The sequential p-value is greater than 0.05 at all times

Now let’s consider an illustration of an A/B setting. Figure 4 shows observed arrival times for treatment and control when the intensity functions differ. As this is a simulation, the true difference between log intensities is known.

Figure 4: (Left) An A/B simulation of two inhomogeneous Poisson point processes. (Right) Confidence sequence on the difference of log of intensity functions, and sequential p-value.

We can make the following observations

  • The 0.95 confidence sequence covers the true log-difference at all times
  • The sequential p-value falls below 0.05 at the same time the 0.95 confidence sequence excludes the null value of zero

Now we present a number of case studies where this methodology has rapidly detected serious problems in a number of count metrics

Case Study 1: Drop in Successful Title Starts

Figure 2 actually presents counts of title start events from a real canary test. Whenever a title starts successfully, an event is sent from the device to Netflix. We have a stream of title start events from treatment devices and a stream of title start events from control devices. Whenever fewer title starts are observed among treatment devices, there is usually a bug in the new client preventing playback.

In this case, the canary test detected a bug that was later determined to have prevented approximately 60% of treatment devices from being able to start their streams. The confidence sequence is shown in Figure 5, in addition to the (sequential) p-value. While the exact units of time have been omitted, this bug was detected at the sub-second level.

Figure 5: 0.99 Confidence sequence on the difference of log-intensities with sequential p-value.

Case Study 2: Increase in Abnormal Shutdowns

In addition to title start events, we also monitor whenever the Netflix client shuts down unexpectedly. As before, we have two streams of abnormal shutdown events, one from treatment devices, and one from control devices. The following screenshots are taken directly from our Lumen dashboards.

Figure 6: Counts of Abnormal Shutdowns over time, cumulative and non-cumulative. Treatment (Black) and Control (Blue)

Figure 6 illustrates two important points. There is clearly nonstationarity in the arrival of abnormal shutdown events. It is also not easy to visibly see any difference between treatment and control from the non-cumulative view. The difference is, however, much easier to see from the cumulative view by observing the counting process. There is a small but visible increase in the number of abnormal shutdowns in the treatment. Figure 7 shows how our sequential statistical methodology is even able to identify such small differences.

Figure 7: Abnormal Shutdowns. (Top Panel) Confidence sequences on λᴮ(t)/λᴬ(t) (shaded blue) with observed counting processes for treatment (black dashed) and control (blue dashed). (Bottom Panel) sequential p-values.

Case Study 3: Increase in Errors

Netflix also monitors the number of errors produced by treatment and control. This is a high cardinality metric as every error is annotated with a code indicating the type of error. Monitoring errors segmented by code helps developers diagnose issues quickly. Figure 8 shows the sequential p-values, on the log scale, for a set of error codes that Netflix monitors during client rollouts. In this example, we have detected a higher volume of 3.1.18 errors being produced by treatment devices. Devices experiencing this error are presented with the following message:

“We’re having trouble playing this title right now”

Figure 8: Sequential p-values for start play errors by error code
Figure 9: Observed error-3.1.18 timestamps and counting processes for treatment (blue) and control (red)

Knowing which errors increased can streamline the process of identifying the bug for our developers. We immediately send developers alerts through Slack integrations, such as the following

Figure 10: Notifications via Slack Integrations

The next time you are watching Netflix and encounter an error, know that we’re on it!

Try it Out!

The statistical approach outlined in our paper is remarkably easy to implement in practice. All you need are two integers, the number of events observed so far in the treatment and control. The code is available in this short GitHub gist. Here are two usage examples:

> counts = [100, 101]
> assignment_probabilities = [0.5, 0.5]
> sequential_p_value(counts, assignment_probabilities)
1

> counts = [100, 201]
> assignment_probabilities = [0.5, 0.5]
> sequential_p_value(counts, assignment_probabilities)
5.06061172163498e-06

The code generalizes to more than just two treatment groups. For full details, including hyperparameter tuning, see section 4 of the paper.

Further Reading


Sequential Testing Keeps the World Streaming Netflix Part 2: Counting Processes was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Supporting Diverse ML Systems at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/supporting-diverse-ml-systems-at-netflix-2d2e6b6d205d

David J. Berg, Romain Cledat, Kayla Seeley, Shashank Srikanth, Chaoying Wang, Darin Yu

Netflix uses data science and machine learning across all facets of the company, powering a wide range of business applications from our internal infrastructure and content demand modeling to media understanding. The Machine Learning Platform (MLP) team at Netflix provides an entire ecosystem of tools around Metaflow, an open source machine learning infrastructure framework we started, to empower data scientists and machine learning practitioners to build and manage a variety of ML systems.

Since its inception, Metaflow has been designed to provide a human-friendly API for building data and ML (and today AI) applications and deploying them in our production infrastructure frictionlessly. While human-friendly APIs are delightful, it is really the integrations to our production systems that give Metaflow its superpowers. Without these integrations, projects would be stuck at the prototyping stage, or they would have to be maintained as outliers outside the systems maintained by our engineering teams, incurring unsustainable operational overhead.

Given the very diverse set of ML and AI use cases we support — today we have hundreds of Metaflow projects deployed internally — we don’t expect all projects to follow the same path from prototype to production. Instead, we provide a robust foundational layer with integrations to our company-wide data, compute, and orchestration platform, as well as various paths to deploy applications to production smoothly. On top of this, teams have built their own domain-specific libraries to support their specific use cases and needs.

In this article, we cover a few key integrations that we provide for various layers of the Metaflow stack at Netflix, as illustrated above. We will also showcase real-life ML projects that rely on them, to give an idea of the breadth of projects we support. Note that all projects leverage multiple integrations, but we highlight them in the context of the integration that they use most prominently. Importantly, all the use cases were engineered by practitioners themselves.

These integrations are implemented through Metaflow’s extension mechanism which is publicly available but subject to change, and hence not a part of Metaflow’s stable API yet. If you are curious about implementing your own extensions, get in touch with us on the Metaflow community Slack.

Let’s go over the stack layer by layer, starting with the most foundational integrations.

Data: Fast Data

Our main data lake is hosted on S3, organized as Apache Iceberg tables. For ETL and other heavy lifting of data, we mainly rely on Apache Spark. In addition to Spark, we want to support last-mile data processing in Python, addressing use cases such as feature transformations, batch inference, and training. Occasionally, these use cases involve terabytes of data, so we have to pay attention to performance.

To enable fast, scalable, and robust access to the Netflix data warehouse, we have developed a Fast Data library for Metaflow, which leverages high-performance components from the Python data ecosystem:

As depicted in the diagram, the Fast Data library consists of two main interfaces:

  • The Table object is responsible for interacting with the Netflix data warehouse which includes parsing Iceberg (or legacy Hive) table metadata, resolving partitions and Parquet files for reading. Recently, we added support for the write path, so tables can be updated as well using the library.
  • Once we have discovered the Parquet files to be processed, MetaflowDataFrame takes over: it downloads data using Metaflow’s high-throughput S3 client directly to the process’ memory, which often outperforms reading of local files.

We use Apache Arrow to decode Parquet and to host an in-memory representation of data. The user can choose the most suitable tool for manipulating data, such as Pandas or Polars to use a dataframe API, or one of our internal C++ libraries for various high-performance operations. Thanks to Arrow, data can be accessed through these libraries in a zero-copy fashion.

We also pay attention to dependency issues: (Py)Arrow is a dependency of many ML and data libraries, so we don’t want our custom C++ extensions to depend on a specific version of Arrow, which could easily lead to unresolvable dependency graphs. Instead, in the style of nanoarrow, our Fast Data library only relies on the stable Arrow C data interface, producing a hermetically sealed library with no external dependencies.

Example use case: Content Knowledge Graph

Our knowledge graph of the entertainment world encodes relationships between titles, actors and other attributes of a film or series, supporting all aspects of business at Netflix.

A key challenge in creating a knowledge graph is entity resolution. There may be many different representations of slightly different or conflicting information about a title which must be resolved. This is typically done through a pairwise matching procedure for each entity which becomes non-trivial to do at scale.

This project leverages Fast Data and horizontal scaling with Metaflow’s foreach construct to load large amounts of title information — approximately a billion pairs — stored in the Netflix Data Warehouse, so the pairs can be matched in parallel across many Metaflow tasks.

We use metaflow.Table to resolve all input shards which are distributed to Metaflow tasks which are responsible for processing terabytes of data collectively. Each task loads the data using metaflow.MetaflowDataFrame, performs matching using Pandas, and populates a corresponding shard in an output Table. Finally, when all matching is done and data is written the new table is committed so it can be read by other jobs.

Compute: Titus

Whereas open-source users of Metaflow rely on AWS Batch or Kubernetes as the compute backend, we rely on our centralized compute-platform, Titus. Under the hood, Titus is powered by Kubernetes, but it provides a thick layer of enhancements over off-the-shelf Kubernetes, to make it more observable, secure, scalable, and cost-efficient.

By targeting @titus, Metaflow tasks benefit from these battle-hardened features out of the box, with no in-depth technical knowledge or engineering required from the ML engineers or data scientist end. However, in order to benefit from scalable compute, we need to help the developer to package and rehydrate the whole execution environment of a project in a remote pod in a reproducible manner (preferably quickly). Specifically, we don’t want to ask developers to manage Docker images of their own manually, which quickly results in more problems than it solves.

This is why Metaflow provides support for dependency management out of the box. Originally, we supported only @conda, but based on our work on Portable Execution Environments, open-source Metaflow gained support for @pypi a few months ago as well.

Example use case: Building model explainers

Here’s a fascinating example of the usefulness of portable execution environments. For many of our applications, model explainability matters. Stakeholders like to understand why models produce a certain output and why their behavior changes over time.

There are several ways to provide explainability to models but one way is to train an explainer model based on each trained model. Without going into the details of how this is done exactly, suffice to say that Netflix trains a lot of models, so we need to train a lot of explainers too.

Thanks to Metaflow, we can allow each application to choose the best modeling approach for their use cases. Correspondingly, each application brings its own bespoke set of dependencies. Training an explainer model therefore requires:

  1. Access to the original model and its training environment, and
  2. Dependencies specific to building the explainer model.

This poses an interesting challenge in dependency management: we need a higher-order training system, “Explainer flow” in the figure below, which is able to take a full execution environment of another training system as an input and produce a model based on it.

Explainer flow is event-triggered by an upstream flow, such Model A, B, C flows in the illustration. The build_environment step uses the metaflow environment command provided by our portable environments, to build an environment that includes both the requirements of the input model as well as those needed to build the explainer model itself.

The built environment is given a unique name that depends on the run identifier (to provide uniqueness) as well as the model type. Given this environment, the train_explainer step is then able to refer to this uniquely named environment and operate in an environment that can both access the input model as well as train the explainer model. Note that, unlike in typical flows using vanilla @conda or @pypi, the portable environments extension allows users to also fetch those environments directly at execution time as opposed to at deploy time which therefore allows users to, as in this case, resolve the environment right before using it in the next step.

Orchestration: Maestro

If data is the fuel of ML and the compute layer is the muscle, then the nerves must be the orchestration layer. We have talked about the importance of a production-grade workflow orchestrator in the context of Metaflow when we released support for AWS Step Functions years ago. Since then, open-source Metaflow has gained support for Argo Workflows, a Kubernetes-native orchestrator, as well as support for Airflow which is still widely used by data engineering teams.

Internally, we use a production workflow orchestrator called Maestro. The Maestro post shares details about how the system supports scalability, high-availability, and usability, which provide the backbone for all of our Metaflow projects in production.

A hugely important detail that often goes overlooked is event-triggering: it allows a team to integrate their Metaflow flows to surrounding systems upstream (e.g. ETL workflows), as well as downstream (e.g. flows managed by other teams), using a protocol shared by the whole organization, as exemplified by the example use case below.

Example use case: Content decision making

One of the most business-critical systems running on Metaflow supports our content decision making, that is, the question of what content Netflix should bring to the service. We support a massive scale of over 260M subscribers spanning over 190 countries representing hugely diverse cultures and tastes, all of whom we want to delight with our content slate. Reflecting the breadth and depth of the challenge, the systems and models focusing on the question have grown to be very sophisticated.

We approach the question from multiple angles but we have a core set of data pipelines and models that provide a foundation for decision making. To illustrate the complexity of just the core components, consider this high-level diagram:

In this diagram, gray boxes represent integrations to partner teams downstream and upstream, green boxes are various ETL pipelines, and blue boxes are Metaflow flows. These boxes encapsulate hundreds of advanced models and intricate business logic, handling massive amounts of data daily.

Despite its complexity, the system is managed by a relatively small team of engineers and data scientists autonomously. This is made possible by a few key features of Metaflow:

The team has also developed their own domain-specific libraries and configuration management tools, which help them improve and operate the system.

Deployment: Cache

To produce business value, all our Metaflow projects are deployed to work with other production systems. In many cases, the integration might be via shared tables in our data warehouse. In other cases, it is more convenient to share the results via a low-latency API.

Notably, not all API-based deployments require real-time evaluation, which we cover in the section below. We have a number of business-critical applications where some or all predictions can be precomputed, guaranteeing the lowest possible latency and operationally simple high availability at the global scale.

We have developed an officially supported pattern to cover such use cases. While the system relies on our internal caching infrastructure, you could follow the same pattern using services like Amazon ElasticCache or DynamoDB.

Example use case: Content performance visualization

The historical performance of titles is used by decision makers to understand and improve the film and series catalog. Performance metrics can be complex and are often best understood by humans with visualizations that break down the metrics across parameters of interest interactively. Content decision makers are equipped with self-serve visualizations through a real-time web application built with metaflow.Cache, which is accessed through an API provided with metaflow.Hosting.

A daily scheduled Metaflow job computes aggregate quantities of interest in parallel. The job writes a large volume of results to an online key-value store using metaflow.Cache. A Streamlit app houses the visualization software and data aggregation logic. Users can dynamically change parameters of the visualization application and in real-time a message is sent to a simple Metaflow hosting service which looks up values in the cache, performs computation, and returns the results as a JSON blob to the Streamlit application.

Deployment: Metaflow Hosting

For deployments that require an API and real-time evaluation, we provide an integrated model hosting service, Metaflow Hosting. Although details have evolved a lot, this old talk still gives a good overview of the service.

Metaflow Hosting is specifically geared towards hosting artifacts or models produced in Metaflow. This provides an easy to use interface on top of Netflix’s existing microservice infrastructure, allowing data scientists to quickly move their work from experimentation to a production grade web service that can be consumed over a HTTP REST API with minimal overhead.

Its key benefits include:

  • Simple decorator syntax to create RESTFull endpoints.
  • The back-end auto-scales the number of instances used to back your service based on traffic.
  • The back-end will scale-to-zero if no requests are made to it after a specified amount of time thereby saving cost particularly if your service requires GPUs to effectively produce a response.
  • Request logging, alerts, monitoring and tracing hooks to Netflix infrastructure

Consider the service similar to managed model hosting services like AWS Sagemaker Model Hosting, but tightly integrated with our microservice infrastructure.

Example use case: Media

We have a long history of using machine learning to process media assets, for instance, to personalize artwork and to help our creatives create promotional content efficiently. Processing large amounts of media assets is technically non-trivial and computationally expensive, so over the years, we have developed plenty of specialized infrastructure dedicated for this purpose in general, and infrastructure supporting media ML use cases in particular.

To demonstrate the benefits of Metaflow Hosting that provides a general-purpose API layer supporting both synchronous and asynchronous queries, consider this use case involving Amber, our feature store for media.

While Amber is a feature store, precomputing and storing all media features in advance would be infeasible. Instead, we compute and cache features in an on-demand basis, as depicted below:

When a service requests a feature from Amber, it computes the feature dependency graph and then sends one or more asynchronous requests to Metaflow Hosting, which places the requests in a queue, eventually triggering feature computations when compute resources become available. Metaflow Hosting caches the response, so Amber can fetch it after a while. We could have built a dedicated microservice just for this use case, but thanks to the flexibility of Metaflow Hosting, we were able to ship the feature faster with no additional operational burden.

Future Work

Our appetite to apply ML in diverse use cases is only increasing, so our Metaflow platform will keep expanding its footprint correspondingly and continue to provide delightful integrations to systems built by other teams at Netlfix. For instance, we have plans to work on improvements in the versioning layer, which wasn’t covered by this article, by giving more options for artifact and model management.

We also plan on building more integrations with other systems that are being developed by sister teams at Netflix. As an example, Metaflow Hosting models are currently not well integrated into model logging facilities — we plan on working on improving this to make models developed with Metaflow more integrated with the feedback loop critical in training new models. We hope to do this in a pluggable manner that would allow other users to integrate with their own logging systems.

Additionally we want to supply more ways Metaflow artifacts and models can be integrated into non-Metaflow environments and applications, e.g. JVM based edge service, so that Python-based data scientists can contribute to non-Python engineering systems easily. This would allow us to better bridge the gap between the quick iteration that Metaflow provides (in Python) with the requirements and constraints imposed by the infrastructure serving Netflix member facing requests.

If you are building business-critical ML or AI systems in your organization, join the Metaflow Slack community! We are happy to share experiences, answer any questions, and welcome you to contribute to Metaflow.

Acknowledgements:

Thanks to Wenbing Bai, Jan Florjanczyk, Michael Li, Aliki Mavromoustaki, and Sejal Rai for help with use cases and figures. Thanks to our OSS contributors for making Metaflow a better product.


Supporting Diverse ML Systems at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Bending pause times to your will with Generational ZGC

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/bending-pause-times-to-your-will-with-generational-zgc-256629c9386b

The surprising and not so surprising benefits of generations in the Z Garbage Collector.

By Danny Thomas, JVM Ecosystem Team

The latest long term support release of the JDK delivers generational support for the Z Garbage Collector.

More than half of our critical streaming video services are now running on JDK 21 with Generational ZGC, so it’s a good time to talk about our experience and the benefits we’ve seen. If you’re interested in how we use Java at Netflix, Paul Bakker’s talk How Netflix Really Uses Java, is a great place to start.

Reduced tail latencies

In both our GRPC and DGS Framework services, GC pauses are a significant source of tail latencies. That’s particularly true of our GRPC clients and servers, where request cancellations due to timeouts interact with reliability features such as retries, hedging and fallbacks. Each of these errors is a canceled request resulting in a retry so this reduction further reduces overall service traffic by this rate:

Errors rates per second. Previous week in white vs current cancellation rate in purple, as ZGC was enabled on a service cluster on November 15

Removing the noise of pauses also allows us to identify actual sources of latency end-to-end, which would otherwise be hidden in the noise, as maximum pause time outliers can be significant:

Maximum GC pause times by cause, for the same service cluster as above. Yes, those ZGC pauses really are usually under one millisecond

Efficiency

Even after we saw very promising results in our evaluation, we expected the adoption of ZGC to be a trade off: a little less application throughput, due to store and load barriers, work performed in thread local handshakes, and the GC competing with the application for resources. We considered that an acceptable trade off, as avoiding pauses provided benefits that would outweigh that overhead.

In fact, we’ve found for our services and architecture that there is no such trade off. For a given CPU utilization target, ZGC improves both average and P99 latencies with equal or better CPU utilization when compared to G1.

The consistency in request rates, request patterns, response time and allocation rates we see in many of our services certainly help ZGC, but we’ve found it’s equally capable of handling less consistent workloads (with exceptions of course; more on that below).

Operational simplicity

Service owners often reach out to us with questions about excessive pause times and for help with tuning. We have several frameworks that periodically refresh large amounts of on-heap data to avoid external service calls for efficiency. These periodic refreshes of on-heap data are great at taking G1 by surprise, resulting in pause time outliers well beyond the default pause time goal.

This long lived on-heap data was the major contributor to us not adopting non-generational ZGC previously. In the worst case we evaluated, non-generational ZGC caused 36% more CPU utilization than G1 for the same workload. That became a nearly 10% improvement with generational ZGC.

Half of all services required for streaming video use our Hollow library for on-heap metadata. Removing pauses as a concern allowed us to remove array pooling mitigations, freeing hundreds of megabytes of memory for allocations.

Operational simplicity also stems from ZGC’s heuristics and defaults. No explicit tuning has been required to achieve these results. Allocation stalls are rare, typically coinciding with abnormal spikes in allocation rates, and are shorter than the average pause times we saw with G1.

Memory overhead

We expected that losing compressed references on heaps < 32G, due to colored pointers requiring 64-bits object pointers, would be a major factor in the choice of a garbage collector.

We’ve found that while that’s an important consideration for stop-the-world GCs, that’s not the case for ZGC where even on small heaps, the increase in allocation rate is amortized by the efficiency and operational improvements. Our thanks to Erik Österlund at Oracle for explaining the less intuitive benefits of colored pointers when it comes to concurrent garbage collectors, which lead us to evaluating ZGC more broadly than initially planned.

In the majority of cases ZGC is also able to consistently make more memory available to the application:

Used vs available heap capacity following each GC cycle, for the same service cluster as above

ZGC has a fixed overhead 3% of the heap size, requiring more native memory than G1. Except in a couple of cases, there’s been no need to lower the maximum heap size to allow for more headroom, and those were services with greater than average native memory needs.

Reference processing is also only performed in major collections with ZGC. We paid particular attention to deallocation of direct byte buffers, but we haven’t seen any impact thus far. This difference in reference processing did cause a performance problem with JSON thread dump support, but that’s a unusual situation caused by a framework accidentally creating an unused ExecutorService instance for every request.

Transparent huge pages

Even if you’re not using ZGC, you probably should be using huge pages, and transparent huge pages is the most convenient way to use them.

ZGC uses shared memory for the heap and many Linux distributions configure shmem_enabled to never, which silently prevents ZGC from using huge pages with -XX:+UseTransparentHugePages.

Here we have a service deployed with no other change but shmem_enabled going from never to advise, reducing CPU utilization significantly:

Deployment moving from 4k to 2m pages. Ignore the gap, that’s our immutable deployment process temporarily doubling the cluster capacity

Our default configuration:

  • Sets heap minimum and maximums to equal size
  • Configures -XX:+UseTransparentHugePages -XX:+AlwaysPreTouch
  • Uses the following transparent_hugepage configuration:
echo madvise | sudo tee /sys/kernel/mm/transparent_hugepage/enabled
echo advise | sudo tee /sys/kernel/mm/transparent_hugepage/shmem_enabled
echo defer | sudo tee /sys/kernel/mm/transparent_hugepage/defrag
echo 1 | sudo tee /sys/kernel/mm/transparent_hugepage/khugepaged/defrag

What workloads weren’t a good fit?

There is no best garbage collector. Each trades off collection throughput, application latency and resource utilization depending on the goal of the garbage collector.

For the workloads that have performed better with G1 vs ZGC, we’ve found that they tend to be more throughput oriented, with very spiky allocation rates and long running tasks holding objects for unpredictable periods.

A notable example was a service where very spiky allocation rates and large numbers of long lived objects, which happened to be a particularly good fit for G1’s pause time goal and old region collection heuristics. It allowed G1 to avoid unproductive work in GC cycles that ZGC couldn’t.

The switch to ZGC by default has provided the perfect opportunity for application owners to think about their choice of garbage collector. Several batch/precompute cases had been using G1 by default, where they would have seen better throughput from the parallel collector. In one large precompute workload we saw a 6–8% improvement in application throughput, shaving an hour off the batch time, versus G1.

Try it for yourself!

Left unquestioned, assumptions and expectations could have caused us to miss one of the most impactful changes we’ve made to our operational defaults in a decade. We’d encourage you to try generational ZGC for yourself. It might surprise you as much as it surprised us.


Bending pause times to your will with Generational ZGC was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/evolving-from-rule-based-classifier-machine-learning-powered-auto-remediation-in-netflix-data-039d5efd115b

Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data Platform

by Binbing Hou, Stephanie Vezich Tamayo, Xiao Chen, Liang Tian, Troy Ristow, Haoyuan Wang, Snehal Chennuru, Pawan Dixit

This is the first of the series of our work at Netflix on leveraging data insights and Machine Learning (ML) to improve the operational automation around the performance and cost efficiency of big data jobs. Operational automation–including but not limited to, auto diagnosis, auto remediation, auto configuration, auto tuning, auto scaling, auto debugging, and auto testing–is key to the success of modern data platforms. In this blog post, we present our project on Auto Remediation, which integrates the currently used rule-based classifier with an ML service and aims to automatically remediate failed jobs without human intervention. We have deployed Auto Remediation in production for handling memory configuration errors and unclassified errors of Spark jobs and observed its efficiency and effectiveness (e.g., automatically remediating 56% of memory configuration errors and saving 50% of the monetary costs caused by all errors) and great potential for further improvements.

Introduction

At Netflix, hundreds of thousands of workflows and millions of jobs are running per day across multiple layers of the big data platform. Given the extensive scope and intricate complexity inherent to such a distributed, large-scale system, even if the failed jobs account for a tiny portion of the total workload, diagnosing and remediating job failures can cause considerable operational burdens.

For efficient error handling, Netflix developed an error classification service, called Pensive, which leverages a rule-based classifier for error classification. The rule-based classifier classifies job errors based on a set of predefined rules and provides insights for schedulers to decide whether to retry the job and for engineers to diagnose and remediate the job failure.

However, as the system has increased in scale and complexity, the rule-based classifier has been facing challenges due to its limited support for operational automation, especially for handling memory configuration errors and unclassified errors. Therefore, the operational cost increases linearly with the number of failed jobs. In some cases–for example, diagnosing and remediating job failures caused by Out-Of-Memory (OOM) errors–joint effort across teams is required, involving not only the users themselves, but also the support engineers and domain experts.

To address these challenges, we have developed a new feature, called Auto Remediation, which integrates the rule-based classifier with an ML service. Based on the classification from the rule-based classifier, it uses an ML service to predict retry success probability and retry cost and selects the best candidate configuration as recommendations; and a configuration service to automatically apply the recommendations. Its major advantages are below:

  • Integrated intelligence. Instead of completely deprecating the current rule-based classifier, Auto Remediation integrates the classifier with an ML service so that it can leverage the merits of both: the rule-based classifier provides static, deterministic classification results per error class, which is based on the context of domain experts; the ML service provides performance- and cost-aware recommendations per job, which leverages the power of ML. With the integrated intelligence, we can properly meet the requirements of remediating different errors.
  • Fully automated. The pipeline of classifying errors, getting recommendations, and applying recommendations is fully automated. It provides the recommendations together with the retry decision to the scheduler, and particularly uses an online configuration service to store and apply recommended configurations. In this way, no human intervention is required in the remediation process.
  • Multi-objective optimizations. Auto Remediation generates recommendations by considering both performance (i.e., the retry success probability) and compute cost efficiency (i.e., the monetary costs of running the job) to avoid blindly recommending configurations with excessive resource consumption. For example, for memory configuration errors, it searches multiple parameters related to the memory usage of job execution and recommends the combination that minimizes a linear combination of failure probability and compute cost.

These advantages have been verified by the production deployment for remediating Spark jobs’ failures. Our observations indicate that Auto Remediation can successfully remediate about 56% of all memory configuration errors by applying the recommended memory configurations online without human intervention; and meanwhile reduce the cost of about 50% due to its ability to recommend new configurations to make memory configurations successful and disable unnecessary retries for unclassified errors. We have also noted a great potential for further improvement by model tuning (see the section of Rollout in Production).

Rule-based Classifier: Basics and Challenges

Basics

Figure 1 illustrates the error classification service, i.e., Pensive, in the data platform. It leverages the rule-based classifier and is composed of three components:

  • Log Collector is responsible for pulling logs from different platform layers for error classification (e.g., the scheduler, job orchestrator, and compute clusters).
  • Rule Execution Engine is responsible for matching the collected logs against a set of predefined rules. A rule includes (1) the name, source, log, and summary, of the error and whether the error is restartable; and (2) the regex to identify the error from the log. For example, the rule with the name SparkDriverOOM includes the information indicating that if the stdout log of a Spark job can match the regex SparkOutOfMemoryError:, then this error is classified to be a user error, not restartable.
  • Result Finalizer is responsible for finalizing the error classification result based on the matched rules. If one or multiple rules are matched, then the classification of the first matched rule determines the final classification result (the rule priority is determined by the rule ordering, and the first rule has the highest priority). On the other hand, if no rules are matched, then this error will be considered unclassified.

Challenges

While the rule-based classifier is simple and has been effective, it is facing challenges due to its limited ability to handle the errors caused by misconfigurations and classify new errors:

  • Memory configuration errors. The rules-based classifier provides error classification results indicating whether to restart the job; however, for non-transient errors, it still relies on engineers to manually remediate the job. The most notable example is memory configuration errors. Such errors are generally caused by the misconfiguration of job memory. Setting an excessively small memory can result in Out-Of-Memory (OOM) errors while setting an excessively large memory can waste cluster memory resources. What’s more challenging is that some memory configuration errors require changing the configurations of multiple parameters. Thus, setting a proper memory configuration requires not only the manual operation but also the expertise of Spark job execution. In addition, even if a job’s memory configuration is initially well tuned, changes such as data size and job definition can cause performance to degrade. Given that about 600 memory configuration errors per month are observed in the data platform, timely remediation of memory configuration errors alone requires non-trivial engineering efforts.
  • Unclassified errors. The rule-based classifier relies on data platform engineers to manually add rules for recognizing errors based on the known context; otherwise, the errors will be unclassified. Due to the migrations of different layers of the data platform and the diversity of applications, existing rules can be invalid, and adding new rules requires engineering efforts and also depends on the deployment cycle. More than 300 rules have been added to the classifier, yet about 50% of all failures remain unclassified. For unclassified errors, the job may be retried multiple times with the default retry policy. If the error is non-transient, these failed retries incur unnecessary job running costs.

Evolving to Auto Remediation: Service Architecture

Methodology

To address the above-mentioned challenges, our basic methodology is to integrate the rule-based classifier with an ML service to generate recommendations, and use a configuration service to apply the recommendations automatically:

  • Generating recommendations. We use the rule-based classifier as the first pass to classify all errors based on predefined rules, and the ML service as the second pass to provide recommendations for memory configuration errors and unclassified errors.
  • Applying recommendations. We use an online configuration service to store and apply the recommended configurations. The pipeline is fully automated, and the services used to generate and apply recommendations are decoupled.

Service Integrations

Figure 2 illustrates the integration of the services generating and applying the recommendations in the data platform. The major services are as follows:

  • Nightingale is a service running the ML model trained using Metaflow and is responsible for generating a retry recommendation. The recommendation includes (1) whether the error is restartable; and (2) if so, the recommended configurations to restart the job.
  • ConfigService is an online configuration service. The recommended configurations are saved in ConfigService as a JSON patch with a scope defined to specify the jobs that can use the recommended configurations. When Scheduler calls ConfigService to get recommended configurations, Scheduler passes the original configurations to ConfigService and ConfigService returns the mutated configurations by applying the JSON patch to the original configurations. Scheduler can then restart the job with the mutated configurations (including the recommended configurations).
  • Pensive is an error classification service that leverages the rule-based classifier. It calls Nightingale to get recommendations and stores the recommendations to ConfigService so that it can be picked up by Scheduler to restart the job.
  • Scheduler is the service scheduling jobs (our current implementation is with Netflix Maestro). Each time when a job fails, it calls Pensive to get the error classification to decide whether to restart a job and calls ConfigServices to get the recommended configurations for restarting the job.

Figure 3 illustrates the sequence of service calls with Auto Remediation:

  1. Upon a job failure, Scheduler calls Pensive to get the error classification.
  2. Pensive classifies the error based on the rule-based classifier. If the error is identified to be a memory configuration error or an unclassified error, it calls Nightingale to get recommendations.
  3. With the obtained recommendations, Pensive updates the error classification result and saves the recommended configurations to ConfigService; and then returns the error classification result to Scheduler.
  4. Based on the error classification result received from Pensive, Scheduler determines whether to restart the job.
  5. Before restarting the job, Scheduler calls ConfigService to get the recommended configuration and retries the job with the new configuration.

Evolving to Auto Remediation: ML Service

Overview

The ML service, i.e., Nightingale, aims to generate a retry policy for a failed job that trades off between retry success probability and job running costs. It consists of two major components:

  • A prediction model that jointly estimates a) probability of retry success, and b) retry cost in dollars, conditional on properties of the retry.
  • An optimizer which explores the Spark configuration parameter space to recommend a configuration which minimizes a linear combination of retry failure probability and cost.

The prediction model is retrained offline daily, and is called by the optimizer to evaluate each candidate set of configuration parameter values. The optimizer runs in a RESTful service which is called upon job failure. If there is a feasible configuration solution from the optimization, the response includes this recommendation, which ConfigService uses to mutate the configuration for the retry. If there is no feasible solution–in other words, it is unlikely the retry will succeed by changing Spark configuration parameters alone–the response includes a flag to disable retries and thus eliminate wasted compute cost.

Prediction Model

Given that we want to explore how retry success and retry cost might change under different configuration scenarios, we need some way to predict these two values using the information we have about the job. Data Platform logs both retry success outcome and execution cost, giving us reliable labels to work with. Since we use a shared feature set to predict both targets, have good labels, and need to run inference quickly online to meet SLOs, we decided to formulate the problem as a multi-output supervised learning task. In particular, we use a simple Feedforward Multilayer Perceptron (MLP) with two heads, one to predict each outcome.

Training: Each record in the training set represents a potential retry which previously failed due to memory configuration errors or unclassified errors. The labels are: a) did retry fail, b) retry cost. The raw feature inputs are largely unstructured metadata about the job such as the Spark execution plan, the user who ran it, and the Spark configuration parameters and other job properties. We split these features into those that can be parsed into numeric values (e.g., Spark executor memory parameter) and those that cannot (e.g., user name). We used feature hashing to process the non-numeric values because they come from a high cardinality and dynamic set of values. We then create a lower dimensionality embedding which is concatenated with the normalized numeric values and passed through several more layers.

Inference: Upon passing validation audits, each new model version is stored in Metaflow Hosting, a service provided by our internal ML Platform. The optimizer makes several calls to the model prediction function for each incoming configuration recommendation request, described in more detail below.

Optimizer

When a job attempt fails, it sends a request to Nightingale with a job identifier. From this identifier, the service constructs the feature vector to be used in inference calls. As described previously, some of these features are Spark configuration parameters which are candidates to be mutated (e.g., spark.executor.memory, spark.executor.cores). The set of Spark configuration parameters was based on distilled knowledge of domain experts who work on Spark performance tuning extensively. We use Bayesian Optimization (implemented via Meta’s Ax library) to explore the configuration space and generate a recommendation. At each iteration, the optimizer generates a candidate parameter value combination (e.g., spark.executor.memory=7192 mb, spark.executor.cores=8), then evaluates that candidate by calling the prediction model to estimate retry failure probability and cost using the candidate configuration (i.e., mutating their values in the feature vector). After a fixed number of iterations is exhausted, the optimizer returns the “best” configuration solution (i.e., that which minimized the combined retry failure and cost objective) for ConfigService to use if it is feasible. If no feasible solution is found, we disable retries.

One downside of the iterative design of the optimizer is that any bottleneck can block completion and cause a timeout, which we initially observed in a non-trivial number of cases. Upon further profiling, we found that most of the latency came from the candidate generated step (i.e., figuring out which directions to step in the configuration space after the previous iteration’s evaluation results). We found that this issue had been raised to Ax library owners, who added GPU acceleration options in their API. Leveraging this option decreased our timeout rate substantially.

Rollout in Production

We have deployed Auto Remediation in production to handle memory configuration errors and unclassified errors for Spark jobs. Besides the retry success probability and cost efficiency, the impact on user experience is the major concern:

  • For memory configuration errors: Auto remediation improves user experience because the job retry is rarely successful without a new configuration for memory configuration errors. This means that a successful retry with the recommended configurations can reduce the operational loads and save job running costs, while a failed retry does not make the user experience worse.
  • For unclassified errors: Auto remediation recommends whether to restart the job if the error cannot be classified by existing rules in the rule-based classifier. In particular, if the ML model predicts that the retry is very likely to fail, it will recommend disabling the retry, which can save the job running costs for unnecessary retries. For cases in which the job is business-critical and the user prefers always retrying the job even if the retry success probability is low, we can add a new rule to the rule-based classifier so that the same error will be classified by the rule-based classifier next time, skipping the recommendations of the ML service. This presents the advantages of the integrated intelligence of the rule-based classifier and the ML service.

The deployment in production has demonstrated that Auto Remediation can provide effective configurations for memory configuration errors, successfully remediating about 56% of all memory configuration without human intervention. It also decreases compute cost of these jobs by about 50% because it can either recommend new configurations to make the retry successful or disable unnecessary retries. As tradeoffs between performance and cost efficiency are tunable, we can decide to achieve a higher success rate or more cost savings by tuning the ML service.

It is worth noting that the ML service is currently adopting a conservative policy to disable retries. As discussed above, this is to avoid the impact on the cases that users prefer always retrying the job upon job failures. Although these cases are expected and can be addressed by adding new rules to the rule-based classifier, we consider tuning the objective function in an incremental manner to gradually disable more retries is helpful to provide desirable user experience. Given the current policy to disable retries is conservative, Auto Remediation presents a great potential to eventually bring much more cost savings without affecting the user experience.

Beyond Error Handling: Towards Right Sizing

Auto Remediation is our first step in leveraging data insights and Machine Learning (ML) for improving user experience, reducing the operational burden, and improving cost efficiency of the data platform. It focuses on automating the remediation of failed jobs, but also paves the path to automate operations other than error handling.

One of the initiatives we are taking, called Right Sizing, is to reconfigure scheduled big data jobs to request the proper resources for job execution. For example, we have noted that the average requested executor memory of Spark jobs is about four times their max used memory, indicating a significant overprovision. In addition to the configurations of the job itself, the resource overprovision of the container that is requested to execute the job can also be reduced for cost savings. With heuristic- and ML-based methods, we can infer the proper configurations of job execution to minimize resource overprovisions and save millions of dollars per year without affecting the performance. Similar to Auto Remediation, these configurations can be automatically applied via ConfigService without human intervention. Right Sizing is in progress and will be covered with more details in a dedicated technical blog post later. Stay tuned.

Acknowledgements

Auto Remediation is a joint work of the engineers from different teams and organizations. This work would have not been possible without the solid, in-depth collaborations. We would like to appreciate all folks, including Spark experts, data scientists, ML engineers, the scheduler and job orchestrator engineers, data engineers, and support engineers, for sharing the context and providing constructive suggestions and valuable feedback (e.g., John Zhuge, Jun He, Holden Karau, Samarth Jain, Julian Jaffe, Batul Shajapurwala, Michael Sachs, Faisal Siddiqi).


Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Announcing bpftop: Streamlining eBPF performance optimization

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/announcing-bpftop-streamlining-ebpf-performance-optimization-6a727c1ae2e5

By Jose Fernandez

Today, we are thrilled to announce the release of bpftop, a command-line tool designed to streamline the performance optimization and monitoring of eBPF applications. As Netflix increasingly adopts eBPF [1, 2], applying the same rigor to these applications as we do to other managed services is imperative. Striking a balance between eBPF’s benefits and system load is crucial, ensuring it enhances rather than hinders our operational efficiency. This tool enables Netflix to embrace eBPF’s potential.

Introducing bpftop

bpftop provides a dynamic real-time view of running eBPF programs. It displays the average execution runtime, events per second, and estimated total CPU % for each program. This tool minimizes overhead by enabling performance statistics only while it is active.

bpftop simplifies the performance optimization process for eBPF programs by enabling an efficient cycle of benchmarking, code refinement, and immediate feedback. Without bpftop, optimization efforts would require manual calculations, adding unnecessary complexity to the process. With bpftop, users can quickly establish a baseline, implement improvements, and verify enhancements, streamlining the process.

A standout feature of this tool is its ability to display the statistics in time series graphs. This approach can uncover patterns and trends that could be missed otherwise.

How it works

bpftop uses the BPF_ENABLE_STATS syscall command to enable global eBPF runtime statistics gathering, which is disabled by default to reduce performance overhead. It collects these statistics every second, calculating the average runtime, events per second, and estimated CPU utilization for each eBPF program within that sample period. This information is displayed in a top-like tabular format or a time series graph over a 10s moving window. Once bpftop terminates, it turns off the statistics-gathering function. The tool is written in Rust, leveraging the libbpf-rs and ratatui crates.

Getting started

Visit the project’s GitHub page to learn more about using the tool. We’ve open-sourced bpftop under the Apache 2 license and look forward to contributions from the community.


Announcing bpftop: Streamlining eBPF performance optimization was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Sequential A/B Testing Keeps the World Streaming Netflix Part 1: Continuous Data

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/sequential-a-b-testing-keeps-the-world-streaming-netflix-part-1-continuous-data-cba6c7ed49df

Michael Lindon, Chris Sanden, Vache Shirikian, Yanjun Liu, Minal Mishra, Martin Tingley

Using sequential anytime-valid hypothesis testing procedures to safely release software

1. Spot the Difference

Can you spot any difference between the two data streams below? Each observation is the time interval between a Netflix member hitting the play button and playback commencing, i.e., play-delay. These observations are from a particular type of A/B test that Netflix runs called a software canary or regression-driven experiment. More on that below — for now, what’s important is that we want to quickly and confidently identify any difference in the distribution of play-delay — or conclude that, within some tolerance, there is no difference.

In this blog post, we will develop a statistical procedure to do just that, and describe the impact of these developments at Netflix. The key idea is to switch from a “fixed time horizon” to an “any-time valid” framing of the problem.

Sequentially comparing two streams of measurements from treatment and control
Figure 1. An example data stream for an A/B test where each observation represents play-delay for the control (left) and treatment (right). Can you spot any differences in the statistical distributions between the two data streams?

2. Safe software deployment, canary testing, and play-delay

Software engineering readers of this blog are likely familiar with unit, integration and load testing, as well as other testing practices that aim to prevent bugs from reaching production systems. Netflix also performs canary tests — software A/B tests between current and newer software versions. To learn more, see our previous blog post on Safe Updates of Client Applications.

The purpose of a canary test is twofold: to act as a quality-control gate that catches bugs prior to full release, and to measure performance of the new software in the wild. This is carried out by performing a randomized controlled experiment on a small subset of users, where the treatment group receives the new software update and the control group continues to run the existing software. If any bugs or performance regressions are observed in the treatment group, then the full-scale release can be prevented, limiting the “impact radius” among the user base.

One of the metrics Netflix monitors in canary tests is how long it takes for the video stream to start when a title is requested by a user. Monitoring this “play-delay” metric throughout releases ensures that the streaming performance of Netflix only ever improves as we release newer versions of the Netflix client. In Figure 1, the left side shows a real-time stream of play-delay measurements from users running the existing version of the Netflix client, while the right side shows play-delay measurements from users running the updated version. We ask ourselves: Are users of the updated client experiencing longer play-delays?

We consider any increase in play-delay to be a serious performance regression and would prevent the release if we detect an increase. Critically, testing for differences in means or medians is not sufficient and does not provide a complete picture. For example, one situation we might face is that the median or mean play-delay is the same in treatment and control, but the treatment group experiences an increase in the upper quantiles of play-delay. This corresponds to the Netflix experience being degraded for those who already experience high play delays — likely our members on slow or unstable internet connections. Such changes should not be ignored by our testing procedure.

For a complete picture, we need to be able to reliably and quickly detect an upward shift in any part of the play-delay distribution. That is, we must do inference on and test for any differences between the distributions of play-delay in treatment and control.

To summarize, here are the design requirements of our canary testing system:

  1. Identify bugs and performance regressions, as measured by play-delay, as quickly as possible. Rationale: To minimize member harm, if there is any problem with the streaming quality experienced by users in the treatment group we need to abort the canary and roll back the software change as quickly as possible.
  2. Strictly control false positive (false alarm) probabilities. Rationale: This system is part of a semi-automated process for all client deployments. A false positive test unnecessarily interrupts the software release process, reducing the velocity of software delivery and sending developers looking for bugs that do not exist.
  3. This system should be able to detect any change in the distribution. Rationale: We care not only about changes in the mean or median, but also about changes in tail behaviour and other quantiles.

We now build out a sequential testing procedure that meets these design requirements.

3. Sequential Testing: The Basics

Standard statistical tests are fixed-n or fixed-time horizon: the analyst waits until some pre-set amount of data is collected, and then performs the analysis a single time. The classic t-test, the Kolmogorov-Smirnov test, and the Mann-Whitney test are all examples of fixed-n tests. A limitation of fixed-n tests is that they can only be performed once — yet in situations like the above, we want to be testing frequently to detect differences as soon as possible. If you apply a fixed-n test more than once, then you forfeit the Type-I error or false positive guarantee.

Here’s a quick illustration of how fixed-n tests fail under repeated analysis. In the following figure, each red line traces out the p-value when the Mann-Whitney test is repeatedly applied to a data set as 10,000 observations accrue in both treatment and control. Each red line shows an independent simulation, and in each case, there is no difference between treatment and control: these are simulated A/A tests.

The black dots mark where the p-value falls below the standard 0.05 rejection threshold. An alarming 70% of simulations declare a significant difference at some point in time, even though, by construction, there is no difference: the actual false positive rate is much higher than the nominal 0.05. Exactly the same behaviour would be observed for the Kolmogorov-Smirnov test.

increased false positives when peeking at mann-whitney test
Figure 2. 100 Sample paths of the p-value process simulated under the null hypothesis shown in red. The dotted black line indicates the nominal alpha=0.05 level. Black dots indicate where the p-value process dips below the alpha=0.05 threshold, indicating a false rejection of the null hypothesis. A total of 66 out of 100 A/A simulations falsely rejected the null hypothesis.

This is a manifestation of “peeking”, and much has been written about the downside risks of this practice (see, for example, Johari et al. 2017). If we restrict ourselves to correctly applied fixed-n statistical tests, where we analyze the data exactly once, we face a difficult tradeoff:

  • Perform the test early on, after a small amount of data has been collected. In this case, we will only be powered to detect larger regressions. Smaller performance regressions will not be detected, and we run the risk of steadily eroding the member experience as small regressions accrue.
  • Perform the test later, after a large amount of data has been collected. In this case, we are powered to detect small regressions — but in the case of large regressions, we expose members to a bad experience for an unnecessarily long period of time.

Sequential, or “any-time valid”, statistical tests overcome these limitations. They permit for peeking –in fact, they can be applied after every new data point arrives– while providing false positive, or Type-I error, guarantees that hold throughout time. As a result, we can continuously monitor data streams like in the image above, using confidence sequences or sequential p-values, and rapidly detect large regressions while eventually detecting small regressions.

Despite relatively recent adoption in the context of digital experimentation, these methods have a long academic history, with initial ideas dating back to Abraham Wald’s Sequential Tests of Statistical Hypotheses from 1945. Research in this area remains active, and Netflix has made a number of contributions in the last few years (see the references in these papers for a more complete literature review):

In this and following blogs, we will describe both the methods we’ve developed and their applications at Netflix. The remainder of this post discusses the first paper above, which was published at KDD ’22 (and available on ArXiV). We will keep it high level — readers interested in the technical details can consult the paper.

4. A sequential testing solution

Differences in Distributions

At any point in time, we can estimate the empirical quantile functions for both treatment and control, based on the data observed so far.

empirical quantile functions for treatment and control data
Figure 3: Empirical quantile function for control (left) and treatment (right) at a snapshot in time after starting the canary experiment. This is from actual Netflix data, so we’ve suppressed numerical values on the y-axis.

These two plots look pretty close, but we can do better than an eyeball comparison — and we want the computer to be able to continuously evaluate if there is any significant difference between the distributions. Per the design requirements, we also wish to detect large effects early, while preserving the ability to detect small effects eventually — and we want to maintain the false positive probability at a nominal level while permitting continuous analysis (aka peeking).

That is, we need a sequential test on the difference in distributions.

Obtaining “fixed-horizon” confidence bands for the quantile function can be achieved using the DKWM inequality. To obtain time-uniform confidence bands, however, we use the anytime-valid confidence sequences from Howard and Ramdas (2022) [arxiv version]. As the coverage guarantee from these confidence bands holds uniformly across time, we can watch them become tighter without being concerned about peeking. As more data points stream in, these sequential confidence bands continue to shrink in width, which means any difference in the distribution functions — if it exists — will eventually become apparent.

Anytime-valid confidence bands on treatment and control quantile functions
Figure 4: 97.5% Time-Uniform Confidence bands on the quantile function for control (left) and treatment (right)

Note each frame corresponds to a point in time after the experiment began, not sample size. In fact, there is no requirement that each treatment group has the same sample size.

Differences are easier to see by visualizing the difference between the treatment and control quantile functions.

Confidence sequences on quantile differences and sequential p-value
Figure 5: 95% Time-Uniform confidence band on the quantile difference function Q_b(p) — Q_a(p) (left). The sequential p-value (right).

As the sequential confidence band on the treatment effect quantile function is anytime-valid, the inference procedure becomes rather intuitive. We can continue to watch these confidence bands tighten, and if at any point the band no longer covers zero at any quantile, we can conclude that the distributions are different and stop the test. In addition to the sequential confidence bands, we can also construct a sequential p-value for testing that the distributions differ. Note from the animation that the moment the 95% confidence band over quantile treatment effects excludes zero is the same moment that the sequential p-value falls below 0.05: as with fixed-n tests, there is consistency between confidence intervals and p-values.

There are many multiple testing concerns in this application. Our solution controls Type-I error across all quantiles, all treatment groups, and all joint sample sizes simultaneously (see our paper, or Howard and Ramdas for details). Results hold for all quantiles, and for all times.

5. Impact at Netflix

Releasing new software always carries risk, and we always want to reduce the risk of service interruptions or degradation to the member experience. Our canary testing approach is another layer of protection for preventing bugs and performance regressions from slipping into production. It’s fully automated and has become an integral part of the software delivery process at Netflix. Developers can push to production with peace of mind, knowing that bugs and performance regressions will be rapidly caught. The additional confidence empowers developers to push to production more frequently, reducing the time to market for upgrades to the Netflix client and increasing our rate of software delivery.

So far this system has successfully prevented a number of serious bugs from reaching our end users. We detail one example.

Case study: Safe Rollout of Netflix Client Application

Figures 3–5 are taken from a canary test in which the behaviour of the client application was modified application (actual numerical values of play-delay have been suppressed). As we can see, the canary test revealed that the new version of the client increases a number of quantiles of play-delay, with the median and 75% percentile of play experiencing relative increases of at least 0.5% and 1% respectively. The timeseries of the sequential p-value shows that, in this case, we were able to reject the null of no change in distribution at the 0.05 level after about 60 seconds. This provides rapid feedback in the software delivery process, allowing developers to test the performance of new software and quickly iterate.

6. What’s next?

If you are curious about the technical details of the sequential tests for quantiles developed here, you can learn all about the math in our KDD paper (also available on arxiv).

You might also be wondering what happens if the data are not continuous measurements. Errors and exceptions are critical metrics to log when deploying software, as are many other metrics which are best defined in terms of counts. Stay tuned — our next post will develop sequential testing procedures for count data.


Sequential A/B Testing Keeps the World Streaming Netflix
Part 1: Continuous Data
was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Introducing SafeTest: A Novel Approach to Front End Testing

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/introducing-safetest-a-novel-approach-to-front-end-testing-37f9f88c152d

by Moshe Kolodny

In this post, we’re excited to introduce SafeTest, a revolutionary library that offers a fresh perspective on End-To-End (E2E) tests for web-based User Interface (UI) applications.

The Challenges of Traditional UI Testing

Traditionally, UI tests have been conducted through either unit testing or integration testing (also referred to as End-To-End (E2E) testing). However, each of these methods presents a unique trade-off: you have to choose between controlling the test fixture and setup, or controlling the test driver.

For instance, when using react-testing-library, a unit testing solution, you maintain complete control over what to render and how the underlying services and imports should behave. However, you lose the ability to interact with an actual page, which can lead to a myriad of pain points:

  • Difficulty in interacting with complex UI elements like <Dropdown /> components.
  • Inability to test CORS setup or GraphQL calls.
  • Lack of visibility into z-index issues affecting click-ability of buttons.
  • Complex and unintuitive authoring and debugging of tests.

Conversely, using integration testing tools like Cypress or Playwright provides control over the page, but sacrifices the ability to instrument the bootstrapping code for the app. These tools operate by remotely controlling a browser to visit a URL and interact with the page. This approach has its own set of challenges:

  • Difficulty in making calls to an alternative API endpoint without implementing custom network layer API rewrite rules.
  • Inability to make assertions on spies/mocks or execute code within the app.
  • Testing something like dark mode entails clicking the theme switcher or knowing the localStorage mechanism to override.
  • Inability to test segments of the app, for example if a component is only visible after clicking a button and waiting for a 60 second timer to countdown, the test will need to run those actions and will be at least a minute long.

Recognizing these challenges, solutions like E2E Component Testing have emerged, with offerings from Cypress and Playwright. While these tools attempt to rectify the shortcomings of traditional integration testing methods, they have other limitations due to their architecture. They start a dev server with bootstrapping code to load the component and/or setup code you want, which limits their ability to handle complex enterprise applications that might have OAuth or a complex build pipeline. Moreover, updating TypeScript usage could break your tests until the Cypress/Playwright team updates their runner.

Welcome to SafeTest

SafeTest aims to address these issues with a novel approach to UI testing. The main idea is to have a snippet of code in our application bootstrapping stage that injects hooks to run our tests (see the How Safetest Works sections for more info on what this is doing). Note that how this works has no measurable impact on the regular usage of your app since SafeTest leverages lazy loading to dynamically load the tests only when running the tests (in the README example, the tests aren’t in the production bundle at all). Once that’s in place, we can use Playwright to run regular tests, thereby achieving the ideal browser control we want for our tests.

This approach also unlocks some exciting features:

  • Deep linking to a specific test without needing to run a node test server.
  • Two-way communication between the browser and test (node) context.
  • Access to all the DX features that come with Playwright (excluding the ones that come with @playwright/test).
  • Video recording of tests, trace viewing, and pause page functionality for trying out different page selectors/actions.
  • Ability to make assertions on spies in the browser in node, matching snapshot of the call within the browser.

Test Examples with SafeTest

SafeTest is designed to feel familiar to anyone who has conducted UI tests before, as it leverages the best parts of existing solutions. Here’s an example of how to test an entire application:

import { describe, it, expect } from 'safetest/jest';
import { render } from 'safetest/react';

describe('my app', () => {
it('loads the main page', async () => {
const { page } = await render();

await expect(page.getByText('Welcome to the app')).toBeVisible();
expect(await page.screenshot()).toMatchImageSnapshot();
});
});

We can just as easily test a specific component

import { describe, it, expect, browserMock } from 'safetest/jest';
import { render } from 'safetest/react';

describe('Header component', () => {
it('has a normal mode', async () => {
const { page } = await render(<Header />);

await expect(page.getByText('Admin')).not.toBeVisible();
});

it('has an admin mode', async () => {
const { page } = await render(<Header admin={true} />);

await expect(page.getByText('Admin')).toBeVisible();
});

it('calls the logout handler when signing out', async () => {
const spy = browserMock.fn();
const { page } = await render(<Header handleLogout={fn} />);

await page.getByText('logout').click();
expect(await spy).toHaveBeenCalledWith();
});
});

Leveraging Overrides

SafeTest utilizes React Context to allow for value overrides during tests. For an example of how this works, let’s assume we have a fetchPeople function used in a component:

import { useAsync } from 'react-use';
import { fetchPerson } from './api/person';

export const People: React.FC = () => {
const { data: people, loading, error } = useAsync(fetchPeople);

if (loading) return <Loader />;
if (error) return <ErrorPage error={error} />;
return <Table data={data} rows=[...] />;
}

We can modify the People component to use an Override:

 import { fetchPerson } from './api/person';
+import { createOverride } from 'safetest/react';

+const FetchPerson = createOverride(fetchPerson);

export const People: React.FC = () => {
+ const fetchPeople = FetchPerson.useValue();
const { data: people, loading, error } = useAsync(fetchPeople);

if (loading) return <Loader />;
if (error) return <ErrorPage error={error} />;
return <Table data={data} rows=[...] />;
}

Now, in our test, we can override the response for this call:

const pending = new Promise(r => { /* Do nothing */ });
const resolved = [{name: 'Foo', age: 23], {name: 'Bar', age: 32]}];
const error = new Error('Whoops');

describe('People', () => {
it('has a loading state', async () => {
const { page } = await render(
<FetchPerson.Override with={() => () => pending}>
<People />
</FetchPerson.Override>
);

await expect(page.getByText('Loading')).toBeVisible();
});

it('has a loaded state', async () => {
const { page } = await render(
<FetchPerson.Override with={() => async () => resolved}>
<People />
</FetchPerson.Override>
);

await expect(page.getByText('User: Foo, name: 23')).toBeVisible();
});

it('has an error state', async () => {
const { page } = await render(
<FetchPerson.Override with={() => async () => { throw error }}>
<People />
</FetchPerson.Override>
);

await expect(page.getByText('Error getting users: "Whoops"')).toBeVisible();
});
});

The render function also accepts a function that will be passed the initial app component, allowing for the injection of any desired elements anywhere in the app:

it('has a people loaded state', async () => {
const { page } = await render(app =>
<FetchPerson.Override with={() => async () => resolved}>
{app}
</FetchPerson.Override>
);
await expect(page.getByText('User: Foo, name: 23')).toBeVisible();
});

With overrides, we can write complex test cases such as ensuring a service method which combines API requests from /foo, /bar, and /baz, has the correct retry mechanism for just the failed API requests and still maps the return value correctly. So if /bar takes 3 attempts to resolve the method will make a total of 5 API calls.

Overrides aren’t limited to just API calls (since we can use also use page.route), we can also override specific app level values like feature flags or changing some static value:

+const UseFlags = createOverride(useFlags);
export const Admin = () => {
+ const useFlags = UseFlags.useValue();
const { isAdmin } = useFlags();
if (!isAdmin) return <div>Permission error</div>;
// ...
}

+const Language = createOverride(navigator.language);
export const LanguageChanger = () => {
- const language = navigator.language;
+ const language = Language.useValue();
return <div>Current language is { language } </div>;
}

describe('Admin', () => {
it('works with admin flag', async () => {
const { page } = await render(
<UseIsAdmin.Override with={oldHook => {
const oldFlags = oldHook();
return { ...oldFlags, isAdmin: true };
}}>
<MyComponent />
</UseIsAdmin.Override>
);

await expect(page.getByText('Permission error')).not.toBeVisible();
});
});

describe('Language', () => {
it('displays', async () => {
const { page } = await render(
<Language.Override with={old => 'abc'}>
<MyComponent />
</Language.Override>
);

await expect(page.getByText('Current language is abc')).toBeVisible();
});
});

Overrides are a powerful feature of SafeTest and the examples here only scratch the surface. For more information and examples, refer to the Overrides section on the README.

Reporting

SafeTest comes out of the box with powerful reporting capabilities, such as automatic linking of video replays, Playwright trace viewer, and even deep link directly to the mounted tested component. The SafeTest repo README links to all the example apps as well as the reports

Image of SafeTest report showing a video of a test run

SafeTest in Corporate Environments

Many large corporations need a form of authentication to use the app. Typically, navigating to localhost:3000 just results in a perpetually loading page. You need to go to a different port, like localhost:8000, which has a proxy server to check and/or inject auth credentials into underlying service calls. This limitation is one of the main reasons that Cypress/Playwright Component Tests aren’t suitable for use at Netflix.

However, there’s usually a service that can generate test users whose credentials we can use to log in and interact with the application. This facilitates creating a light wrapper around SafeTest to automatically generate and assume that test user. For instance, here’s basically how we do it at Netflix:

import { setup } from 'safetest/setup';
import { createTestUser, addCookies } from 'netflix-test-helper';

type Setup = Parameters<typeof setup>[0] & {
extraUserOptions?: UserOptions;
};


export const setupNetflix = (options: Setup) => {
setup({
...options,
hooks: { beforeNavigate: [async page => addCookies(page)] },
});

beforeAll(async () => {
createTestUser(options.extraUserOptions)
});
};

After setting this up, we simply import the above package in place of where we would have used safetest/setup.

Beyond React

While this post focused on how SafeTest works with React, it’s not limited to just React. SafeTest also works with Vue, Svelte, Angular, and even can run on NextJS or Gatsby. It also runs using either Jest or Vitest based on which test runner your scaffolding started you off with. The examples folder demonstrates how to use SafeTest with different tooling combinations, and we encourage contributions to add more cases.

At its core, SafeTest is an intelligent glue for a test runner, a UI library, and a browser runner. Though the most common usage at Netflix employs Jest/React/Playwright, it’s easy to add more adapters for other options.

Conclusion

SafeTest is a powerful testing framework that’s being adopted within Netflix. It allows for easy authoring of tests and provides comprehensive reports when and how any failures occurred, complete with links to view a playback video or manually run the test steps to see what broke. We’re excited to see how it will revolutionize UI testing and look forward to your feedback and contributions.


Introducing SafeTest: A Novel Approach to Front End Testing was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Rebuilding Netflix Video Processing Pipeline with Microservices

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/rebuilding-netflix-video-processing-pipeline-with-microservices-4e5e6310e359

Liwei Guo, Anush Moorthy, Li-Heng Chen, Vinicius Carvalho, Aditya Mavlankar, Agata Opalach, Adithya Prakash, Kyle Swanson, Jessica Tweneboah, Subbu Venkatrav, Lishan Zhu

This is the first blog in a multi-part series on how Netflix rebuilt its video processing pipeline with microservices, so we can maintain our rapid pace of innovation and continuously improve the system for member streaming and studio operations. This introductory blog focuses on an overview of our journey. Future blogs will provide deeper dives into each service, sharing insights and lessons learned from this process.

The Netflix video processing pipeline went live with the launch of our streaming service in 2007. Since then, the video pipeline has undergone substantial improvements and broad expansions:

  • Starting with Standard Dynamic Range (SDR) at Standard-Definitions, we expanded the encoding pipeline to 4K and High Dynamic Range (HDR) which enabled support for our premium offering.
  • We moved from centralized linear encoding to distributed chunk-based encoding. This architecture shift greatly reduced the processing latency and increased system resiliency.
  • Moving away from the use of dedicated instances that were constrained in quantity, we tapped into Netflix’s internal trough created due to autoscaling microservices, leading to significant improvements in computation elasticity as well as resource utilization efficiency.
  • We rolled out encoding innovations such as per-title and per-shot optimizations, which provided significant quality-of-experience (QoE) improvement to Netflix members.
  • By integrating with studio content systems, we enabled the pipeline to leverage rich metadata from the creative side and create more engaging member experiences like interactive storytelling.
  • We expanded pipeline support to serve our studio/content-development use cases, which had different latency and resiliency requirements as compared to the traditional streaming use case.

Our experience of the last decade-and-a-half has reinforced our conviction that an efficient, flexible video processing pipeline that allows us to innovate and support our streaming service, as well as our studio partners, is critical to the continued success of Netflix. To that end, the Video and Image Encoding team in Encoding Technologies (ET) has spent the last few years rebuilding the video processing pipeline on our next-generation microservice-based computing platform Cosmos.

From Reloaded to Cosmos

Reloaded

Starting in 2014, we developed and operated the video processing pipeline on our third-generation platform Reloaded. Reloaded was well-architected, providing good stability, scalability, and a reasonable level of flexibility. It served as the foundation for numerous encoding innovations developed by our team.

When Reloaded was designed, we focused on a single use case: converting high-quality media files (also known as mezzanines) received from studios into compressed assets for Netflix streaming. Reloaded was created as a single monolithic system, where developers from various media teams in ET and our platform partner team Content Infrastructure and Solutions (CIS)¹ worked on the same codebase, building a single system that handled all media assets. Over the years, the system expanded to support various new use cases. This led to a significant increase in system complexity, and the limitations of Reloaded began to show:

  • Coupled functionality: Reloaded was composed of a number of worker modules and an orchestration module. The setup of a new Reloaded module and its integration with the orchestration required a non-trivial amount of effort, which led to a bias towards augmentation rather than creation when developing new functionalities. For example, in Reloaded the video quality calculation was implemented inside the video encoder module. With this implementation, it was extremely difficult to recalculate video quality without re-encoding.
  • Monolithic structure: Since Reloaded modules were often co-located in the same repository, it was easy to overlook code-isolation rules and there was quite a bit of unintended reuse of code across what should have been strong boundaries. Such reuse created tight coupling and reduced development velocity. The tight coupling among modules further forced us to deploy all modules together.
  • Long release cycles: The joint deployment meant that there was increased fear of unintended production outages as debugging and rollback can be difficult for a deployment of this size. This drove the approach of the “release train”. Every two weeks, a “snapshot” of all modules was taken, and promoted to be a “release candidate”. This release candidate then went through exhaustive testing which attempted to cover as large a surface area as possible. This testing stage took about two weeks. Thus, depending on when the code change was merged, it could take anywhere between two and four weeks to reach production.

As time progressed and functionalities grew, the rate of new feature contributions in Reloaded dropped. Several promising ideas were abandoned owing to the outsized work needed to overcome architectural limitations. The platform that had once served us well was now becoming a drag on development.

Cosmos

As a response, in 2018 the CIS and ET teams started developing the next-generation platform, Cosmos. In addition to the scalability and the stability that the developers already enjoyed in Reloaded, Cosmos aimed to significantly increase system flexibility and feature development velocity. To achieve this, Cosmos was developed as a computing platform for workflow-driven, media-centric microservices.

The microservice architecture provides strong decoupling between services. Per-microservice workflow support eases the burden of implementing complex media workflow logic. Finally, relevant abstractions allow media algorithm developers to focus on the manipulation of video and audio signals rather than on infrastructural concerns. A comprehensive list of benefits offered by Cosmos can be found in the linked blog.

Building the Video Processing Pipeline in Cosmos

Service Boundaries

In the microservice architecture, a system is composed of a number of fine-grained services, with each service focusing on a single functionality. So the first (and arguably the most important) thing is to identify boundaries and define services.

In our pipeline, as media assets travel through creation to ingest to delivery, they go through a number of processing steps such as analyses and transformations. We analyzed these processing steps to identify “boundaries” and grouped them into different domains, which in turn became the building blocks of the microservices we engineered.

As an example, in Reloaded, the video encoding module bundles 5 steps:

1. divide the input video into small chunks

2. encode each chunk independently

3. calculate the quality score (VMAF) of each chunk

4. assemble all the encoded chunks into a single encoded video

5. aggregate quality scores from all chunks

From a system perspective, the assembled encoded video is of primary concern while the internal chunking and separate chunk encodings exist in order to fulfill certain latency and resiliency requirements. Further, as alluded to above, the video quality calculation provides a totally separate functionality as compared to the encoding service.

Thus, in Cosmos, we created two independent microservices: Video Encoding Service (VES) and Video Quality Service (VQS), each of which serves a clear, decoupled function. As implementation details, the chunked encoding and the assembling were abstracted away into the VES.

Video Services

The approach outlined above was applied to the rest of the video processing pipeline to identify functionalities and hence service boundaries, leading to the creation of the following video services².

  1. Video Inspection Service (VIS): This service takes a mezzanine as the input and performs various inspections. It extracts metadata from different layers of the mezzanine for downstream services. In addition, the inspection service flags issues if invalid or unexpected metadata is observed and provides actionable feedback to the upstream team.
  2. Complexity Analysis Service (CAS): The optimal encoding recipe is highly content-dependent. This service takes a mezzanine as the input and performs analysis to understand the content complexity. It calls Video Encoding Service for pre-encoding and Video Quality Service for quality evaluation. The results are saved to a database so they can be reused.
  3. Ladder Generation Service (LGS): This service creates an entire bitrate ladder for a given encoding family (H.264, AV1, etc.). It fetches the complexity data from CAS and runs the optimization algorithm to create encoding recipes. The CAS and LGS cover much of the innovations that we have previously presented in our tech blogs (per-title, mobile encodes, per-shot, optimized 4K encoding, etc.). By wrapping ladder generation into a separate microservice (LGS), we decouple the ladder optimization algorithms from the creation and management of complexity analysis data (which resides in CAS). We expect this to give us greater freedom for experimentation and a faster rate of innovation.
  4. Video Encoding Service (VES): This service takes a mezzanine and an encoding recipe and creates an encoded video. The recipe includes the desired encoding format and properties of the output, such as resolution, bitrate, etc. The service also provides options that allow fine-tuning latency, throughput, etc., depending on the use case.
  5. Video Validation Service (VVS): This service takes an encoded video and a list of expectations about the encode. These expectations include attributes specified in the encoding recipe as well as conformance requirements from the codec specification. VVS analyzes the encoded video and compares the results against the indicated expectations. Any discrepancy is flagged in the response to alert the caller.
  6. Video Quality Service (VQS): This service takes the mezzanine and the encoded video as input, and calculates the quality score (VMAF) of the encoded video.

Service Orchestration

Each video service provides a dedicated functionality and they work together to generate the needed video assets. Currently, the two main use cases of the Netflix video pipeline are producing assets for member streaming and for studio operations. For each use case, we created a dedicated workflow orchestrator so the service orchestration can be customized to best meet the corresponding business needs.

For the streaming use case, the generated videos are deployed to our content delivery network (CDN) for Netflix members to consume. These videos can easily be watched millions of times. The Streaming Workflow Orchestrator utilizes almost all video services to create streams for an impeccable member experience. It leverages VIS to detect and reject non-conformant or low-quality mezzanines, invokes LGS for encoding recipe optimization, encodes video using VES, and calls VQS for quality measurement where the quality data is further fed to Netflix’s data pipeline for analytics and monitoring purposes. In addition to video services, the Streaming Workflow Orchestrator uses audio and timed text services to generate audio and text assets, and packaging services to “containerize” assets for streaming.

For the studio use case, some example video assets are marketing clips and daily production editorial proxies. The requests from the studio side are generally latency-sensitive. For example, someone from the production team may be waiting for the video to review so they can decide the shooting plan for the next day. Because of this, the Studio Workflow Orchestrator optimizes for fast turnaround and focuses on core media processing services. At this time, the Studio Workflow Orchestrator calls VIS to extract metadata of the ingested assets and calls VES with predefined recipes. Compared to member streaming, studio operations have different and unique requirements for video processing. Therefore, the Studio Workflow Orchestrator is the exclusive user of some encoding features like forensic watermarking and timecode/text burn-in.

Where we are now

We have had the new video pipeline running alongside Reloaded in production for a few years now. During this time, we completed the migration of all necessary functionalities from Reloaded, began gradually shifting over traffic one use case at a time, and completed the switchover in September of 2023.

While it is still early days, we have already seen the benefits of the new platform, specifically the ease of feature delivery. Notably, Netflix launched the Advertising-supported plan in November 2022. Processing Ad creatives posed some new challenges: media formats of Ads are quite different from movie and TV mezzanines that the team was familiar with, and there was a new set of media processing requirements related to the business needs of Ads. With the modularity and developer productivity benefits of Cosmos, we were able to quickly iterate the pipeline to keep up with the changing requirements and support a successful product launch.

Summary

Rebuilding the video pipeline was a huge undertaking for the team. We are very proud of what we have achieved, and also eager to share our journey with the technical community. This blog has focused on providing an overview: a brief history of our pipeline and the platforms, why the rebuilding was necessary, what these new services look like, and how they are being used for Netflix businesses. In the next blog, we are going to delve into the details of the Video Encoding Service (VES), explaining step-by-step the service creation, and sharing lessons learned (we have A LOT!). We also plan to cover other video services in future tech blogs. Follow the Netflix Tech Blog to stay up to date.

Acknowledgments

A big shout out to the CIS team for their outstanding work in building the Cosmos platform and their receptiveness to feedback from service developers.

We want to express our appreciation to our users, the Streaming Encoding Pipeline team, and the Video Engineering team. Just like our feedback helps iron out the platform, the feedback from our users has been instrumental in building high-quality services.

We also want to thank Christos Bampis and Zhi Li for their significant contributions to video services, and our two former team members, Chao Chen and Megha Manohara for contributing to the early development of this project.

Footnotes

  1. Formerly known as Media Cloud Engineering/MCE team.
  2. The actual number of video services is more than listed here. Some of them are Netflix-specific and thus omitted from this blog.


Rebuilding Netflix Video Processing Pipeline with Microservices was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Our First Netflix Data Engineering Summit

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/our-first-netflix-data-engineering-summit-f326b0589102

Holden Karau Elizabeth Stone Pedro Duarte Chris Stephens Pallavi Phadnis Lee Woodridge Mark Cho Guil Pires Sujay Jain Tristan Reid Senthilnathan Athinarayanan Bharath Mummadisetty Abhinaya Shetty Judit Lantos Amanuel Kahsay Dao Mi Mick Dreeling Chris Colburn and Agata Gryzbek

Introduction

Earlier this summer Netflix held our first-ever Data Engineering Forum. Engineers from across the company came together to share best practices on everything from Data Processing Patterns to Building Reliable Data Pipelines. The result was a series of talks which we are now sharing with the rest of the Data Engineering community!

You can find each of the talks below with a short description of each, or you can go straight to the playlist on YouTube here.

The Talks

The Netflix Data Engineering Stack

Chris Stephens, Data Engineer, Content & Studio and Pedro Duarte, Software Engineer, Consolidated Logging walk engineers new to Netflix through the building blocks of the Netflix Data Engineering stack. Learn more about how batch and streaming data pipelines are built at Netflix.

Data Processing Patterns

Lee Woodridge and Pallavi Phadnis, Data Engineers at Netflix, talk about how you can apply different processing strategies for your batch pipelines by implementing generic abstractions to help scale, be more efficient, handle late-arriving data, and be more fault tolerant.

Streaming SQL on Data Mesh using Apache Flink

Mark Cho, Guil Pires and Sujay Jain, Engineers from the Netflix Data Platform talk about how a managed Streaming SQL using Apache Flink can help unlock new Stream Processing use cases at Netflix. You can read more about Data Mesh, Netflix’s next-generation stream processing platform, here

Building Reliable Data Pipelines

Holden Karau, OSS Engineer, Data Platform Engineering, talks about the importance of reliable data pipelines and how to build them covering tools from testing to validation and auditing. The talk uses Apache Spark as an example, but the concepts generalize regardless of your specific tools.

Knowledge Management — Leveraging Institutional Data

Tristan Reid, software engineer, shares experiences about the Knowledge Management project at Netflix, which seeks to leverage language modeling techniques and metadata from internal systems to improve the impact of the >100K memos that circulate within the company.

Psyberg, An Incremental ETL Framework Using Iceberg

Abhinaya Shetty and Bharath Mummadisetty, Data Engineers from Netflix’s Membership Data Engineering team, introduce Psyberg, an incremental ETL framework. Learn about how Psyberg leverages Iceberg metadata to handle late-arriving data, and improves data pipelines while simplifying on-call life!

Start/Stop/Continue for optimizing complex ETL jobs

Judit Lantos, Data Engineer, Member Experience Data Engineering, shares a case study to demonstrate an effective approach for optimizing complex ETL jobs.

Media Data for ML Studio Creative Production

In the last 2 decades, Netflix has revolutionized the way video content is consumed, however, there is significant work to be done in revolutionizing how movies and tv shows are made. In this video, Sr. Data Engineers Amanual Kahsay and Dao Mi showcase how data and insights are being utilized to accomplish such a vision.

We hope that our fellow members of the Data Engineering Community find these videos useful and engaging. Please follow our Netflix Data Twitter account for updates and notifications of future Data Engineering Summits!

Mick Dreeling, Chris Colburn


Our First Netflix Data Engineering Summit was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

All of Netflix’s HDR video streaming is now dynamically optimized

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/all-of-netflixs-hdr-video-streaming-is-now-dynamically-optimized-e9e0cb15f2ba

by Aditya Mavlankar, Zhi Li, Lukáš Krasula and Christos Bampis

High dynamic range (HDR) video brings a wider range of luminance and a wider gamut of colors, paving the way for a stunning viewing experience. Separately, our invention of Dynamically Optimized (DO) encoding helps achieve optimized bitrate-quality tradeoffs depending on the complexity of the content.

HDR was launched at Netflix in 2016 and the number of titles available in HDR has been growing ever since. We were, however, missing the systematic ability to measure perceptual quality (VMAF) of HDR streams since VMAF was limited to standard dynamic range (SDR) video signals.

As noted in an earlier blog post, we began developing an HDR variant of VMAF; let’s call it HDR-VMAF. A vital aspect of such development is subjective testing with HDR encodes in order to generate training data. The pandemic, however, posed unique challenges in conducting a conventional in-lab subjective test with HDR encodes. We improvised as part of a collaborative effort with Dolby Laboratories and conducted subjective tests with 4K-HDR content using high-end OLED panels in calibrated conditions created in participants’ homes [1],[2]. Details pertaining to HDR-VMAF exceed the scope of this article and will be covered in a future blog post; for now, suffice it to say that the first version of HDR-VMAF landed internally in 2021 and we have been improving the metric ever since.

The arrival of HDR-VMAF allowed us to create HDR streams with DO applied, i.e., HDR-DO encodes. Prior to that, we were using a fixed ladder with predetermined bitrates — regardless of content characteristics — for HDR video streaming. We A/B tested HDR-DO encodes in production in Q3-Q4 2021, followed by improving the ladder generation algorithm further in early 2022. We started backfilling HDR-DO encodes for existing titles from Q2 2022. By June 2023 the entire HDR catalog was optimized. The graphic below (Fig. 1) depicts the migration of traffic from fixed bitrates to DO encodes.

Fig. 1: Migration of traffic from fixed-ladder encodes to DO encodes.

Bitrate versus quality comparison

HDR-VMAF is designed to be format-agnostic — it measures the perceptual quality of HDR video signal regardless of its container format, for example, Dolby Vision or HDR10. HDR-VMAF focuses on the signal characteristics (as a result of lossy encoding) instead of display characteristics, and thus it does not include display mapping in its pipeline. Display mapping is the specific tone mapping applied by the display based on its own characteristics — peak luminance, black level, color gamut, etc. — and based on content characteristics and/or metadata signaled in the bitstream.

Two ways that HDR10 and Dolby Vision differ are: 1) the preprocessing applied to the signal before encoding 2) the metadata informing the display mapping on different displays. So, HDR-VMAF will capture the effect of 1) but ignore the effect of 2). Display capabilities vary a lot among the heterogeneous population of devices that stream HDR content — this aspect is similar to other factors that vary session to session such as ambient lighting, viewing distance, upscaling algorithm on the device, etc. “VMAF not incorporating display mapping” implies the scores are computed for an “ideal display” that’s capable of representing the entire luminance range and the entire color gamut spanned by the video signal — thus not requiring display mapping. This background is useful to have before looking at rate vs quality curves pertaining to these two formats.

Shown below are rate versus quality examples for a couple of titles from our HDR catalog. We present two sets. Within each set we show curves for both Dolby Vision and HDR10. The first set (Fig. 2) corresponds to an episode from a gourmet cooking show incorporating fast-paced scenes from around the world. The second set (Fig. 3) corresponds to an episode from a relatively slower drama series; slower in terms of camera action. The optimized encodes are chosen from the convex hull formed by various rate-quality points corresponding to different bitrates, spatial resolutions and encoding recipes.

For brevity we skipped annotating ladder points with their spatial resolutions but the overall observations from our previous article on SDR-4K encode optimization apply here as well. The fixed ladder is slow in ramping up spatial resolution, so the quality stays almost flat among two successive 1080p points or two successive 4K points. On the other hand, the optimized ladder presents a sharper increase in quality with increasing bitrate.

The fixed ladder has predetermined 4K bitrates — 8, 10, 12 and 16 Mbps — it deterministically maxes out at 16 Mbps. On the other hand, the optimized ladder targets very high levels of quality on the top rung of the bitrate ladder, even at the cost of higher bitrates if the content is complex, thereby satisfying the most discerning viewers. In spite of reaching higher qualities than the fixed ladder, the HDR-DO ladder, on average, occupies only 58% of the storage space compared to fixed-bitrate ladder. This is achieved by more efficiently spacing the ladder points, especially in the high-bitrate region. After all, there is little to no benefit in packing multiple high-bitrate points so close to each other — for example, 3 QHD (2560×1440) points placed in the 6 to 7.5 Mbps range followed by the four 4K points at 8, 10, 12 and 16 Mbps, as was done on the fixed ladder.

Fig. 2: Rate-quality curves comparing fixed and optimized ladders corresponding to an episode from a gourmet cooking show incorporating fast-paced scenes from around the world.
Fig. 3: Rate-quality curves comparing fixed and optimized ladders corresponding to an episode from a drama series, which is slower in terms of camera action.

It is important to note that the fixed-ladder encodes had constant duration group-of-pictures (GoPs) and suffered from some inefficiency due to shot boundaries not aligning with Instantaneous Decoder Refresh (IDR) frames. The DO encodes are shot-based and so the IDR frames align with shot boundaries. For a given rate-quality operating point, the DO process helps allocate bits among the various shots while maximizing an overall objective function. Also thanks to the DO framework, within a given rate-quality operating point, challenging shots can and do burst in bitrate up to the codec level limit associated with that point.

Member benefits

We A/B tested the fixed and optimized ladders; first and foremost to make sure that devices in the field can handle the new streams and serving new streams doesn’t cause unintended playback issues. A/B testing also allows us to get a read on the improvement in quality of experience (QoE). Overall, the improvements can be summarized as:

  • 40% fewer rebuffers
  • Higher video quality for both bandwidth-constrained as well as unconstrained sessions
  • Lower initial bitrate
  • Higher initial quality
  • Lower play delay
  • Less variation in delivered video quality
  • Lower Internet data usage, especially on mobiles and tablets

Will HDR-VMAF be open-source?

Yes, we are committed to supporting the open-source community. The current implementation, however, is largely tailored to our internal pipelines. We are working to ensure it is versatile, stable, and easy-to-use for the community. Additionally, the current version has some algorithmic limitations that we are in the process of improving before the official release. When we do release it, HDR-VMAF will have higher accuracy in perceptual quality prediction, and be easier to use “out of the box”.

Summary

Thanks to the arrival of HDR-VMAF, we were able to optimize our HDR encodes. Fixed-ladder HDR encodes have been fully replaced by optimized ones, reducing storage footprint and Internet data usage — and most importantly, improving the video quality for our members. Improvements have been seen across all device categories ranging from TVs to mobiles and tablets.

Acknowledgments

We thank all the volunteers who participated in the subjective experiments. We also want to acknowledge the contributions of our colleagues from Dolby, namely Anustup Kumar Choudhury, Scott Daly, Robin Atkins, Ludovic Malfait, and Suzanne Farrell, who helped with preparations and conducting of the subjective tests.

We thank Matthew Donato, Adithya Prakash, Rich Gerber, Joe Drago, Benbuck Nason and Joseph McCormick for all the interesting discussions on HDR video.

We thank various internal teams at Netflix for the crucial roles they play:

  • The various client device and UI engineering teams at Netflix that manage the Netflix experience on various device platforms
  • The data science and engineering teams at Netflix that help us run and analyze A/B tests; we thank Chris Pham in particular for generating various data insights for the encoding team
  • The Playback Systems team that steers the Netflix experience for every client device including the experience served in various encoding A/B tests
  • The Open Connect team that manages Netflix’s own content delivery network
  • The Content Infrastructure and Solutions team that manages the compute platform that enables us to execute video encoding at scale
  • The Streaming Encoding Pipeline team that helps us orchestrate the generation of various streaming assets

Find our work interesting? Join us and be a part of the amazing team that brought you this tech-blog; open positions:

References

[1] L. Krasula, A. Choudhury, S. Daly, Z. Li, R. Atkins, L. Malfait, A. Mavlankar, “Subjective video quality for 4K HDR-WCG content using a browser-based approach for “at-home” testing,” Electronic Imaging, vol. 35, pp. 263–1–8 (2023) [online]
[2] A. Choudhury, L. Krasula, S. Daly, Z. Li, R. Atkins, L. Malfait, “Testing 4K HDR-WCG professional video content for subjective quality using a remote testing approach,” SMPTE Media Technology Summit 2023


All of Netflix’s HDR video streaming is now dynamically optimized was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Netflix Original Research: MIT CODE 2023

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/netflix-original-research-mit-code-2023-9340b879176a

Netflix was thrilled to be the premier sponsor for the 2nd year in a row at the 2023 Conference on Digital Experimentation (CODE@MIT) in Cambridge, MA. The conference features a balanced blend of academic and industry research from some wicked smart folks, and we’re proud to have contributed a number of talks and posters along with a plenary session.

Our contributions kicked off with a concept that is crucial to our understanding of A/B tests: surrogates!

Our first talk was given by Aurelien Bibaut (with co-authors Nathan Kallus, Simon Ejdemyr and Michael Zhao) in which we discussed how to confidently measure long-term outcomes using short term surrogates in the presence of bias. For example, how do we estimate the effects of innovations on retention a year later without running all our experiments for a year? We proposed an estimation method using cross-fold procedures, and construct valid confidence intervals for long term effects before that effect is fully observed.

Later on, Michael Zhao (with Vickie Zhang, Anh Le and Nathan Kallus) spoke about the evaluation of surrogate index models for product decision making. Using 200 real A/B tests performed at Netflix, we showed that surrogate-index models, constructed using only 2 weeks of data, lead to the same product ship decisions ~95% of the time when compared to making a call based on 2 months of data. This means we can reliably run shorter tests with confidence without needing to wait months for results!

Our next topic focused on how to understand and balance competing engagement metrics; for example, should 1 hour of gaming equal 1 hour of streaming? Michael Zhao and Jordan Schafer shared a poster on how they built an Overall Evaluation Criterion (OEC) metric that provides holistic evaluation for A/B tests, appropriately weighting different engagement metrics to serve a single overall objective. This new framework has enabled fast and confident decision making in tests, and is being actively adapted as our business continues to expand into new areas.

In the second plenary session of the day, Martin Tingley took us on a compelling and fun journey of complexity, exploring key challenges in digital experimentation and how they differ from the challenges faced by agricultural researchers a century ago. He highlighted different areas of complexity and provided perspectives on how to tackle the right challenges based on business objectives.

Our final talk was given by Apoorva Lal (with co-authors Samir Khan and Johan Ugander) in which we show how partial identification of the dose-response function (DRF) under non-parametric assumptions can be used to provide more insightful analyses of experimental data than the standard ATE analysis does. We revisited a study that reduced like-minded content algorithmically, and showed how we could extend the binary ATE learning to answer how the amount of like-minded content a user sees affects their political attitudes.

We had a blast connecting with the CODE@MIT community and bonding over our shared enthusiasm for not only rigorous measurement in experimentation, but also stats-themed stickers and swag!

One of our stickers this year, can you guess what this is showing?!

We look forward to next year’s iteration of the conference and hope to see you there!

Psst! We’re hiring Data Scientists across a variety of domains at Netflix — check out our open roles.


Netflix Original Research: MIT CODE 2023 was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Causal Machine Learning for Creative Insights

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/causal-machine-learning-for-creative-insights-4b0ce22a8a96

A framework to identify the causal impact of successful visual components.

By Billur Engin, Yinghong Lan, Grace Tang, Cristina Segalin, Kelli Griggs, Vi Iyengar

Introduction

At Netflix, we want our viewers to easily find TV shows and movies that resonate and engage. Our creative team helps make this happen by designing promotional artwork that best represents each title featured on our platform. What if we could use machine learning and computer vision to support our creative team in this process? Through identifying the components that contribute to a successful artwork — one that leads a member to choose and watch it — we can give our creative team data-driven insights to incorporate into their creative strategy, and help in their selection of which artwork to feature.

We are going to make an assumption that the presence of a specific component will lead to an artwork’s success. We will discuss a causal framework that will help us find and summarize the successful components as creative insights, and hypothesize and estimate their impact.

The Challenge

Given Netflix’s vast and increasingly diverse catalog, it is a challenge to design experiments that both work within an A/B test framework and are representative of all genres, plots, artists, and more. In the past, we have attempted to design A/B tests where we investigate one aspect of artwork at a time, often within one particular genre. However, this approach has a major drawback: it is not scalable because we either have to label images manually or create new asset variants differing only in the feature under investigation. The manual nature of these tasks means that we cannot test many titles at a time. Furthermore, given the multidimensional nature of artwork, we might be missing many other possible factors that might explain an artwork’s success, such as figure orientation, the color of the background, facial expressions, etc. Since we want to ensure that our testing framework allows for maximum creative freedom, and avoid any interruption to the design process, we decided to try an alternative approach.

Figure. Given the multidimensional nature of artwork, it is challenging to design an A/B test to investigate one aspect of artwork at a given time. We could be missing many other possible factors that might explain an artwork’s success, such as figure orientation, the color of the background, facial expressions, etc.

The Causal Framework

Thanks to our Artwork Personalization System and vision algorithms (some of which are exemplified here), we have a rich dataset of promotional artwork components and user engagement data to build a causal framework. Utilizing this dataset, we have developed the framework to test creative insights and estimate their causal impact on an artwork’s performance via the dataset generated through our recommendation system. In other words, we can learn which attributes led to a title’s successful selection based on its artwork.

Let’s first explore the workflow of the causal framework, as well as the data and success metrics that power it.

We represent the success of an artwork with the take rate: the probability of an average user to watch the promoted title after seeing its promotional artwork, adjusted for the popularity of the title. Every show on our platform has multiple promotional artwork assets. Using Netflix’s Artwork Personalization, we serve these assets to hundreds of millions of members everyday. To power this recommendation system, we look at user engagement patterns and see whether or not these engagements with artworks resulted in a successful title selection.

With the capability to annotate a given image (some of which are mentioned in an earlier post), an artwork asset in this case, we use a series of computer vision algorithms to gather objective image metadata, latent representation of the image, as well as some of the contextual metadata that a given image contains. This process allows our dataset to consist of both the image features and user data, all in an effort to understand which image components lead to successful user engagement. We also utilize machine learning algorithms, consumer insights¹, and correlational analysis for discovering high-level associations between image features and an artwork’s success. These statistically significant associations become our hypotheses for the next phase.

Once we have a specific hypothesis, we can test it by deploying causal machine learning algorithms. This framework reduces our experimental effort to uncover causal relationships, while taking into account confounding among the high-level variables (i.e. the variables that may influence both the treatment / intervention and outcome).

The Hypothesis and Assumptions

We will use the following hypothesis in the rest of the script: presence of a face in an artwork causally improves the asset performance. (We know that faces work well in artwork, especially images with an expressive facial emotion that’s in line with the tone of the title.)

Here are two promotional artwork assets from Unbreakable Kimmy Schmidt. We know that the image on the left performed better than the image on the right. However, the difference between them is not only the presence of a face. There are many other variances, like the difference in background, text placement, font size, face size, etc. Causal Machine Learning makes it possible for us to understand an artwork’s performance based on the causal impact of its treatment.

To make sure our hypothesis is fit for the causal framework, it’s important we go over the identification assumptions.

  • Consistency: The treatment component is sufficiently well-defined.

We use machine learning algorithms to predict whether or not the artwork contains a face. That’s why the first assumption we make is that our face detection algorithm is mostly accurate (~92% average precision).

  • Positivity / Probabilistic Assignment: Every unit (an artwork) has some chance of getting treated.

We calculate the propensity score (the probability of receiving the treatment based on certain baseline characteristics) of having a face for samples with different covariates. If a certain subset of artwork (such as artwork from a certain genre) has close to a 0 or 1 propensity score for having a face, then we discard these samples from our analysis.

  • Individualistic Assignment / SUTVA (stable unit treatment value assumption): The potential outcomes of a unit do not depend on the treatments assigned to others.

Creatives make the decision to create artwork with or without faces based on considerations limited to the title of interest itself. This decision is not dependent on whether other assets have a face in them or not.

  • Conditional exchangeability (Unconfoundedness): There are no unmeasured confounders.

This assumption is by definition not testable. Given a dataset, we can’t know if there has been an unobserved confounder. However, we can test the sensitivity of our conclusions toward the violation of this assumption in various different ways.

The Models

Now that we have established our hypothesis to be a causal inference problem, we can focus on the Causal Machine Learning Application. Predictive Machine Learning (ML) models are great at finding patterns and associations in order to predict outcomes, however they are not great at explaining cause-effect relationships, as their model structure does not reflect causality (the relationship between cause and effect). As an example, let’s say we looked at the price of Broadway theater tickets and the number of tickets sold. An ML algorithm may find a correlation between price increases and ticket sales. If we have used this algorithm for decision making, we could falsely conclude that increasing the ticket price leads to higher ticket sales if we do not consider the confounder of show popularity, which clearly impacts both ticket prices and sales. It is understandable that a Broadway musical ticket may be more expensive if the show is a hit, however simply increasing ticket prices to gain more customers is counter-intuitive.

Causal ML helps us estimate treatment effects from observational data, where it is challenging to conduct clean randomizations. Back-to-back publications on Causal ML, such as Double ML, Causal Forests, Causal Neural Networks, and many more, showcased a toolset for investigating treatment effects, via combining domain knowledge with ML in the learning system. Unlike predictive ML models, Causal ML explicitly controls for confounders, by modeling both treatment of interest as a function of confounders (i.e., propensity scores) as well as the impact of confounders on the outcome of interest. In doing so, Causal ML isolates out the causal impact of treatment on outcome. Moreover, the estimation steps of Causal ML are carefully set up to achieve better error bounds for the estimated treatment effects, another consideration often overlooked in predictive ML. Compared to more traditional Causal Inference methods anchored on linear models, Causal ML leverages the latest ML techniques to not only better control for confounders (when propensity or outcome models are hard to capture by linear models) but also more flexibly estimate treatment effects (when treatment effect heterogeneity is nonlinear). In short, by utilizing machine learning algorithms, Causal ML provides researchers with a framework for understanding causal relationships with flexible ML methods.

Y : outcome variable (take rate)
T : binary treatment variable (presence of a face or not)
W: a vector of covariates (features of the title and artwork)
X ⊆ W: a vector of covariates (a subset of W) along which treatment effect heterogeneity is evaluated

Let’s dive more into the causal ML (Double ML to be specific) application steps for creative insights.

  1. Build a propensity model to predict treatment probability (T) given the W covariates.

2. Build a potential outcome model to predict Y given the W covariates.

3. Residualization of

  • The treatment (observed T — predicted T via propensity model)
  • The outcome (observed Y — predicted Y via potential outcome model)

4. Fit a third model on the residuals to predict the average treatment effect (ATE) or conditional average treatment effect (CATE).

Where 𝜖 and η are stochastic errors and we assume that E[ 𝜖|T,W] = 0 , E[ η|W] = 0.

For the estimation of the nuisance functions (i.e., the propensity score model and the outcome model), we have implemented the propensity model as a classifier (as we have a binary treatment variable — the presence of face) and the potential outcome model as a regressor (as we have a continuous outcome variable — adjusted take rate). We have used grid search for tuning the XGBoosting classifier & regressor hyperparameters. We have also used k-fold cross-validation to avoid overfitting. Finally, we have used a causal forest on the residuals of treatment and the outcome variables to capture the ATE, as well as CATE on different genres and countries.

Mediation and Moderation

ATE will reveal the impact of the treatment — in this case, having a face in the artwork — across the board. The result will answer the question of whether it is worth applying this approach for all of our titles across our catalog, regardless of potential conditioning variables e.g. genre, country, etc. Another advantage of our multi-feature dataset is that we get to deep dive into the relationships between attributes. To do this, we can employ two methods: mediation and moderation.

In their classic paper, Baron & Kenny define a moderator as “a qualitative (e.g., sex, race, class) or quantitative (e.g., level of reward) variable that affects the direction and/or strength of the relation between an independent or predictor variable and a dependent or criterion variable.”. We can investigate suspected moderators to uncover Conditional Average Treatment Effects (CATE). For example, we might suspect that the effect of the presence of a face in artwork varies across genres (e.g. certain genres, like nature documentaries, probably benefit less from the presence of a human face since titles in those genres tend to focus more on non-human subject matter). We can investigate these relationships by including an interaction term between the suspected moderator and the independent variable. If the interaction term is significant, we can conclude that the third variable is a moderator of the relationship between the independent and dependent variables.

Mediation, on the other hand, occurs when a third variable explains the relationship between an independent and dependent variable. To quote Baron & Kenny once more, “whereas moderator variables specify when certain effects will hold, mediators speak to how or why such effects occur.”

For example, we observed that the presence of more than 3 people tends to negatively impact performance. It could be that higher numbers of faces make it harder for a user to focus on any one face in the asset. However, since face count and face size tend to be negatively correlated (since we fit more information in an image of fixed size, each individual piece of information tends to be smaller), one could also hypothesize that the negative correlation with face count is not driven so much from the number of people featured in the artwork, but rather the size of each individual person’s face, which may affect how visible each person is. To test this, we can run a mediation analysis to see if face size is mediating the effect of face count on the asset’s performance.

The steps of the mediation analysis are as follows: We have already detected a correlation between the independent variable (number of faces) and the outcome variable (user engagement) — in other words, we observed that a higher number of faces is associated with lower user engagement. But, we also observe that the number of faces is negatively correlated with average face size — faces tend to be smaller when more faces are fit into the same fixed-size canvas. To find out the degree to which face size mediates the effect of face count, we regress user engagement on both average face size and the number of faces. If 1) face size is a significant predictor of engagement, and 2) the significance of the predictive contribution of the number of people drops, we can conclude that face size mediates the effect of the number of people in artwork user engagement. If the coefficient for the number of people is no longer significant, it shows that face size fully mediates the effect of the number of faces on engagement.

In this dataset, we found that face size only partially mediates the effect of face count on asset effectiveness. This implies that both factors have an impact on asset effectiveness — fewer faces tend to be more effective even if we control for the effect of face size.

Sensitivity Analysis

As alluded to above, the conditional exchangeability assumption (unconfoundedness) is not testable by definition. It is thus crucial to evaluate how sensitive our findings and insights are to the violation of this assumption. Inspired by prior work, we conducted a suite of sensitivity analyses that stress-tested this assumption from multiple different angles. In addition, we leveraged ideas from academic research (most notably the E-value) and concluded that our estimates are robust even when the unconfoundedness assumption is violated. We are actively working on designing and implementing a standardized framework for sensitivity analysis and will share the various applications in an upcoming blog post — stay tuned for a more detailed discussion!

Finally, we also compared our estimated treatment effects with known effects for specific genres that were derived with other different methods, validating our estimates with consistency across different methods

Conclusion

Using the causal machine learning framework, we can potentially test and identify the various components of promotional artwork and gain invaluable creative insights. With this post, we just started to scratch the surface of this interesting challenge. In the upcoming posts in this series, we will share alternative machine learning and computer vision approaches that can provide insights from a causal perspective. These insights will guide and assist our team of talented strategists and creatives to select and generate the most attractive artwork, leveraging the attributes that these models selected, down to a specific genre. Ultimately this will give Netflix members a better and more personalized experience.

If these types of challenges interest you, please let us know! We are always looking for great people who are inspired by causal inference, machine learning, and computer vision to join our team.

Contributions

The authors contributed to the post as follows.

Billur Engin was the main driver of this blog post, she worked on the causal machine learning theory and its application in the artwork space. Yinghong Lan contributed equally to the causal machine learning theory. Grace Tang worked on the mediation analysis. Cristina Segalin engineered and extracted the visual features at scale from artworks used in the analysis. Grace Tang and Cristina Segalin initiated and conceptualized the problem space that is being used as the illustrative example in this post (studying factors affecting user engagement with a broad multivariate analysis of artwork features), curated the data, and performed initial statistical analysis and construction of predictive models supporting this work.

Acknowledgments

We would like to thank Shiva Chaitanya for reviewing this work, and a special thanks to Shaun Wright , Luca Aldag, Sarah Soquel Morhaim, and Anna Pulido who helped make this possible.

Footnotes

¹The Consumer Insights team at Netflix seeks to understand members and non-members through a wide range of quantitative and qualitative research methods.


Causal Machine Learning for Creative Insights was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Incremental Processing using Netflix Maestro and Apache Iceberg

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/incremental-processing-using-netflix-maestro-and-apache-iceberg-b8ba072ddeeb

by Jun He, Yingyi Zhang, and Pawan Dixit

Incremental processing is an approach to process new or changed data in workflows. The key advantage is that it only incrementally processes data that are newly added or updated to a dataset, instead of re-processing the complete dataset. This not only reduces the cost of compute resources but also reduces the execution time in a significant manner. When workflow execution has a shorter duration, chances of failure and manual intervention reduce. It also improves the engineering productivity by simplifying the existing pipelines and unlocking the new patterns.

In this blog post, we talk about the landscape and the challenges in workflows at Netflix. We will show how we are building a clean and efficient incremental processing solution (IPS) by using Netflix Maestro and Apache Iceberg. IPS provides the incremental processing support with data accuracy, data freshness, and backfill for users and addresses many of the challenges in workflows. IPS enables users to continue to use the data processing patterns with minimal changes.

Introduction

Netflix relies on data to power its business in all phases. Whether in analyzing A/B tests, optimizing studio production, training algorithms, investing in content acquisition, detecting security breaches, or optimizing payments, well structured and accurate data is foundational. As our business scales globally, the demand for data is growing and the needs for scalable low latency incremental processing begin to emerge. There are three common issues that the dataset owners usually face.

  • Data Freshness: Large datasets from Iceberg tables needed to be processed quickly and accurately to generate insights to enable faster product decisions. The hourly processing semantics along with valid–through-timestamp watermark or data signals provided by the Data Platform toolset today satisfies many use cases, but is not the best for low-latency batch processing. Before IPS, the Data Platform did not have a solution for tracking the state and progression of data sets as a single easy to use offering. This has led to a few internal solutions such as Psyberg. These internal libraries process data by capturing the changed partitions, which works only on specific use cases. Additionally, the libraries have tight coupling to the user business logic, which often incurs higher migration costs, maintenance costs, and requires heavy coordination with the Data Platform team.
  • Data Accuracy: Late arriving data causes datasets processed in the past to become incomplete and as a result inaccurate. To compensate for that, ETL workflows often use a lookback window, based on which they reprocess the data in that certain time window. For example, a job would reprocess aggregates for the past 3 days because it assumes that there would be late arriving data, but data prior to 3 days isn’t worth the cost of reprocessing.
  • Backfill: Backfilling datasets is a common operation in big data processing. This requires repopulating data for a historical time period which is before the scheduled processing. The need for backfilling could be due to a variety of factors, e.g. (1) upstream data sets got repopulated due to changes in business logic of its data pipeline, (2) business logic was changed in a data pipeline, (3) anew metric was created that needs to be populated for historical time ranges, (4) historical data was found missing, etc.

These challenges are currently addressed in suboptimal and less cost efficient ways by individual local teams to fulfill the needs, such as

  • Lookback: This is a generic and simple approach that data engineers use to solve the data accuracy problem. Users configure the workflow to read the data in a window (e.g. past 3 hours or 10 days). The window is set based on users’ domain knowledge so that users have a high confidence that the late arriving data will be included or will not matter (i.e. data arrives too late to be useful). It ensures the correctness with a high cost in terms of time and compute resources.
  • Foreach pattern: Users build backfill workflows using Maestro foreach support. It works well to backfill data produced by a single workflow. If the pipeline has multiple stages or many downstream workflows, users have to manually create backfill workflows for each of them and that requires significant manual work.

The incremental processing solution (IPS) described here has been designed to address the above problems. The design goal is to provide a clean and easy to adopt solution for the Incremental processing to ensure data freshness, data accuracy, and to provide easy backfill support.

  • Data Freshness: provide the support for scheduling workflows in a micro batch fashion (e.g. 15 min interval) with state tracking functionality
  • Data Accuracy: provide the support to process all late arriving data to achieve data accuracy needed by the business with significantly improved performance in terms of multifold time and cost efficiency
  • Backfill: provide managed backfill support to build, monitor, and validate the backfill, including automatically propagating changes from upstream to downstream workflows, to greatly improve engineering productivity (i.e. a few days or weeks of engineering work to build backfill workflows vs one click for managed backfill)

Approach Overview

General Concept

Incremental processing is an approach to process data in batch — but only on new or changed data. To support incremental processing, we need an approach for not only capturing incremental data changes but also tracking their states (i.e. whether a change is processed by a workflow or not). It must be aware of the change and can capture the changes from the source table(s) and then keep tracking those changes. Here, changes mean more than just new data itself. For example, a row in an aggregation target table needs all the rows from the source table associated with the aggregation row. Also, if there are multiple source tables, usually the union of the changed data ranges from all input tables gives the full change data set. Thus, change information captured must include all related data including those unchanged rows in the source table as well. Due to previously mentioned complexities, change tracking cannot be simply achieved by using a single watermark. IPS has to track those captured changes in finer granularity.

The changes from the source tables might affect the transformed result in the target table in various ways.

  • If one row in the target table is derived from one row in the source table, newly captured data change will be the complete input dataset for the workflow pipeline.
  • If one row in the target table is derived from multiple rows in the source table, capturing new data will only tell us the rows have to be re-processed. But the dataset needed for ETL is beyond the change data itself. For example, an aggregation based on account id requires all rows from the source table about an account id. The change dataset will tell us which account ids are changed and then the user business logic needs to load all data associated with those account ids found in the change data.
  • If one row in the target table is derived based on the data beyond the changed data set, e.g. joining source table with other tables, newly captured data is still useful and can indicate a range of data to be affected. Then the workflow will re-process the data based on the range. For example, assuming we have a table that keeps the accumulated view time for a given account partitioned by the day. If the view time 3-days ago is updated right now due to late arriving data, then the view time for the following two days has to be re-calculated for this account. In this case, the captured late arriving data will tell us the start of the re-calculation, which is much more accurate than recomputing everything for the past X days by guesstimate, where X is a cutoff lookback window decided by business domain knowledge.

Once the change information (data or range) is captured, a workflow has to write the data to the target table in a slightly more complicated way because the simple INSERT OVERWRITE mechanism won’t work well. There are two alternatives:

  • Merge pattern: In some compute frameworks, e.g. Spark 3, it supports MERGE INTO to allow new data to be merged into the existing data set. That solves the write problem for incremental processing. Note that the workflow/step can be safely restarted without worrying about duplicate data being inserted when using MERGE INTO.
  • Append pattern: Users can also use append only write (e.g. INSERT INTO) to add the new data to the existing data set. Once the processing is completed, the append data is committed to the table. If users want to re-run or re-build the data set, they will run a backfill workflow to completely overwrite the target data set (e.g. INSERT OVERWRITE).

Additionally, the IPS will naturally support the backfill in many cases. Downstream workflows (if there is no business logic change) will be triggered by the data change due to backfill. This enables auto propagation of backfill data in multi-stage pipelines. Note that the backfill support is skipped in this blog. We will talk about IPS backfill support in another following blog post.

Netflix Maestro

Maestro is the Netflix data workflow orchestration platform built to meet the current and future needs of Netflix. It is a general-purpose workflow orchestrator that provides a fully managed workflow-as-a-service (WAAS) to the data platform users at Netflix. It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, in various use cases. Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users.

Since the last blog on Maestro, we have migrated all the workflows to it on behalf of users with minimal interruption. Maestro has been fully deployed in production with 100% workload running on it.

IPS is built upon Maestro as an extension by adding two building blocks, i.e. a new trigger mechanism and step job type, to enable incremental processing for all workflows. It is seamlessly integrated into the whole Maestro ecosystem with minimal onboarding cost.

Apache Iceberg

Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time. It supports expressive SQL, full schema evolution, hidden partitioning, data compaction, and time travel & rollback. In the IPS, we leverage the rich features provided by Apache Iceberg to develop a lightweight approach to capture the table changes.

Incremental Change Capture Design

Using Netflix Maestro and Apache Iceberg, we created a novel solution for incremental processing, which provides the incremental change (data and range) capture in a super lightweight way without copying any data. During our exploration, we see a huge opportunity to improve cost efficiency and engineering productivity using incremental processing.

Here is our solution to achieve incremental change capture built upon Apache Iceberg features. As we know, an iceberg table contains a list of snapshots with a set of metadata data. Snapshots include references to the actual immutable data files. A snapshot can contain data files from different partitions.

Design to achieve incremental change capture built upon Apache Iceberg features

The graph above shows that s0 contains data for Partition P0 and P1 at T1. Then at T2, a new snapshot s1 is committed to the table with a list of new data files, which includes late arriving data for partition P0 and P1 and data for P2.

We implemented a lightweight approach to create an iceberg table (called ICDC table), which has its own snapshot but only includes the new data file references from the original table without copying the data files. It is highly efficient with a low cost. Then workflow pipelines can just load the ICDC table to process only the change data from partition P0, P1, P2 without reprocessing the unchanged data in P0 and P1. Meanwhile, the change range is also captured for the specified data field as the Iceberg table metadata contains the upper and lower bound information of each data field for each data file. Moreover, IPS will track the changes in data file granularity for each workflow.

This lightweight approach is seamlessly integrated with Maestro to allow all (thousands) scheduler users to use this new building block (i.e. incremental processing) in their tens of thousands of workflows. Each workflow using IPS will be injected with a table parameter, which is the table name of the lightweight ICDC table. The ICDC table contains only the change data. Additionally, if the workflow needs the change range, a list of parameters will be injected to the user workflow to include the change range information. The incremental processing can be enabled by a new step job type (ICDC) and/or a new incremental trigger mechanism. Users can use them together with all existing Maestro features, e.g. foreach patterns, step dependencies based on valid–through-timestamp watermark, write-audit-publish templatized pattern, etc.

Main Advantages

With this design, user workflows can adopt incremental processing with very low efforts. The user business logic is also decoupled from the IPS implementation. Multi-stage pipelines can also mix the incremental processing workflows with existing normal workflows. We also found that user workflows can be simplified after using IPS by removing additional steps to handle the complexity of the lookback window or calling some internal libraries.

Adding incremental processing features into Netflix Maestro as new features/building blocks for users will enable users to build their workflows in a much more efficient way and bridge the gaps to solve many challenging problems (e.g. dealing with late arriving data) in a much simpler way.

Emerging Incremental Processing Patterns

While onboarding user pipelines to IPS, we have discovered a few incremental processing patterns:

Incrementally process the captured incremental change data and directly append them to the target table

Incrementally process the captured incremental change data and directly append them to the target table

This is the straightforward incremental processing use case, where the change data carries all the information needed for the data processing. Upstream changes (usually from a single source table) are propagated to the downstream (usually another target table) and the workflow pipeline only needs to process the change data (might join with other dimension tables) and then merge into (usually append) to the target table. This pattern will replace lookback window patterns to take care of late arriving data. Instead of overwriting past X days of data completely by using a lookback window pattern, user workflows just need to MERGE the change data (including late arriving data) into the target table by processing the ICDC table.

Use captured incremental change data as the row level filter list to remove unnecessary transformation

Use captured incremental change data as the row level filter list to remove unnecessary transformation

ETL jobs usually need to aggregate data based on certain group-by keys. Change data will disclose all the group-by keys that require a re-aggregation due to the new landing data from the source table(s). Then ETL jobs can join the original source table with the ICDC table on those group-by keys by using ICDC as a filter to speed up the processing to enable calculations of a much smaller set of data. There is no change to business transform logic and no re-design of ETL workflow. ETL pipelines keep all the benefits of batch workflows.

Use the captured range parameters in the business logic

Use the captured range parameters in the business logic

This pattern is usually used in complicated use cases, such as joining multiple tables and doing complex processings. In this case, the change data do not give the full picture of the input needed by the ETL workflow. Instead, the change data indicates a range of changed data sets for a specific set of fields (might be partition keys) in a given input table or usually multiple input tables. Then, the union of the change ranges from all input tables gives the full change data set needed by the workflow. Additionally, the whole range of data usually has to be overwritten because the transformation is not stateless and depends on the outcome result from the previous ranges. Another example is that the aggregated record in the target table or window function in the query has to be updated based on the whole data set in the partition (e.g. calculating a medium across the whole partition). Basically, the range derived from the change data indicates the dataset to be re-processed.

Use cases

Data workflows at Netflix usually have to deal with late arriving data which is commonly solved by using lookback window pattern due to its simplicity and ease of implementation. In the lookback pattern, the ETL pipeline will always consume the past X number of partition data from the source table and then overwrite the target table in every run. Here, X is a number decided by the pipeline owners based on their domain expertise. The drawback is the cost of computation and execution time. It usually costs almost X times more than the pipeline without considering late arriving data. Given the fact that the late arriving data is sparse, the majority of the processing is done on the data that have been already processed, which is unnecessary. Also, note that this approach is based on domain knowledge and sometimes is subject to changes of the business environment or the domain expertise of data engineers. In certain cases, it is challenging to come up with a good constant number.

Below, we will use a two-stage data pipeline to illustrate how to rebuild it using IPS to improve the cost efficiency. We will observe a significant cost reduction (> 80%) with little changes in the business logic. In this use case, we will set the lookback window size X to be 14 days, which varies in different real pipelines.

Original Data Pipeline with Lookback Window

Original data pipeline with lookback window
  • playback_table: an iceberg table holding playback events from user devices ingested by streaming pipelines with late arriving data, which is sparse, only about few percents of the data is late arriving.
  • playback_daily_workflow: a daily scheduled workflow to process the past X days playback_table data and write the transformed data to the target table for the past X days
  • playback_daily_table: the target table of the playback_daily_workflow and get overwritten every day for the past X days
  • playback_daily_agg_workflow: a daily scheduled workflow to process the past X days’ playback_daily_table data and write the aggregated data to the target table for the past X days
  • playback_daily_agg_table: the target table of the playback_daily_agg_workflow and get overwritten every day for the past 14 days.

We ran this pipeline in a sample dataset using the real business logic and here is the average execution result of sample runs

  • The first stage workflow takes about 7 hours to process playback_table data
  • The second stage workflow takes about 3.5 hours to process playback_daily_table data

New Data Pipeline with Incremental Processing

Using IPS, we rewrite the pipeline to avoid re-processing data as much as possible. The new pipeline is shown below.

New data pipeline with incremental processing

Stage 1:

  • ips_playback_daily_workflow: it is the updated version of playback_daily_workflow.
  • The workflow spark sql job then reads an incremental change data capture (ICDC) iceberg table (i.e. playback_icdc_table), which only includes the new data added into the playback_table. It includes the late arriving data but does not include any unchanged data from playback_table.
  • The business logic will replace INSERT OVERWRITE by MERGE INTO SQL query and then the new data will be merged into the playback_daily_table.

Stage 2:

  • IPS captures the changed data of playback_daily_table and also keeps the change data in an ICDC source table (playback_daily_icdc_table). So we don’t need to hard code the lookback window in the business logic. If there are only Y days having changed data in playback_daily_table, then it only needs to load data for Y days.
  • In ips_playback_daily_agg_workflow, the business logic will be the same for the current day’s partition. We then need to update business logic to take care of late arriving data by
  • JOIN the playback_daily table with playback_daily_icdc_table on the aggregation group-by keys for the past 2 to X days, excluding the current day (i.e. day 1)
  • Because late arriving data is sparse, JOIN will narrow down the playback_daily_table data set so as to only process a very small portion of it.
  • The business logic will use MERGE INTO SQL query then the change will be propagated to the downstream target table
  • For the current day, the business logic will be the same and consume the data from playback_daily_table and then write the outcome to the target table playback_daily_agg_table using INSERT OVERWRITE because there is no need to join with the ICDC table.

With these small changes, the data pipeline efficiency is greatly improved. In our sample run,

  • The first stage workflow takes just about 30 minutes to process X day change data from playback_table.
  • The second stage workflow takes about 15 minutes to process change data between day 2 to day X from playback_daily_table by joining with playback_daily_cdc_table data and takes another 15 minutes to process the current day (i.e. day 1) playback_daily_table change data.

Here the spark job settings are the same in original and new pipelines. So in total, the new IPS based pipeline overall needs around 10% of resources (measured by the execution time) to finish.

Looking Forward

We will improve IPS to support more complicated cases beyond append-only cases. IPS will be able to keep track of the progress of the table changes and support multiple Iceberg table change types (e.g. append, overwrite, etc.). We will also add managed backfill support into IPS to help users to build, monitor, and validate the backfill.

We are taking Big Data Orchestration to the next level and constantly solving new problems and challenges, please stay tuned. If you are motivated to solve large scale orchestration problems, please join us.

Acknowledgements

Thanks to our Product Manager Ashim Pokharel for driving the strategy and requirements. We’d also like to thank Andy Chu, Kyoko Shimada, Abhinaya Shetty, Bharath Mummadisetty, John Zhuge, Rakesh Veeramacheneni, and other stunning colleagues at Netflix for their suggestions and feedback while developing IPS. We’d also like to thank Prashanth Ramdas, Eva Tse, Charles Smith, and other leaders of Netflix engineering organizations for their constructive feedback and suggestions on the IPS architecture and design.


Incremental Processing using Netflix Maestro and Apache Iceberg was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

3. Psyberg: Automated end to end catch up

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/3-psyberg-automated-end-to-end-catch-up-260fbe366fe2

By Abhinaya Shetty, Bharath Mummadisetty

This blog post will cover how Psyberg helps automate the end-to-end catchup of different pipelines, including dimension tables.

In the previous installments of this series, we introduced Psyberg and delved into its core operational modes: Stateless and Stateful Data Processing. Now, let’s explore the state of our pipelines after incorporating Psyberg.

Pipelines After Psyberg

Let’s explore how different modes of Psyberg could help with a multistep data pipeline. We’ll return to the sample customer lifecycle:

Processing Requirement:
Keep track of the end-of-hour state of accounts, e.g., Active/Upgraded/Downgraded/Canceled.

Solution:
One potential approach here would be as follows

  1. Create two stateless fact tables :
    a. Signups
    b. Account Plans
  2. Create one stateful fact table:
    a. Cancels
  3. Create a stateful dimension that reads the above fact tables every hour and derives the latest account state.

Let’s look at how this can be integrated with Psyberg to auto-handle late-arriving data and corresponding end-to-end data catchup.

Navigating the Workflow: How Psyberg Handles Late-Arriving Data

We follow a generic workflow structure for both stateful and stateless processing with Psyberg; this helps maintain consistency and makes debugging and understanding these pipelines easier. The following is a concise overview of the various stages involved; for a more detailed exploration of the workflow specifics, please turn to the second installment of this series.

1. Psyberg Initialization

The workflow starts with the Psyberg initialization (init) step.

  • Input: List of source tables and required processing mode
  • Output: Psyberg identifies new events that have occurred since the last high watermark (HWM) and records them in the session metadata table.

The session metadata table can then be read to determine the pipeline input.

2. Write-Audit-Publish (WAP) Process

This is the general pattern we use in our ETL pipelines.

a. Write
Apply the ETL business logic to the input data identified in Step 1 and write to an unpublished iceberg snapshot based on the Psyberg mode

b. Audit
Run various quality checks on the staged data. Psyberg’s metadata session table is used to identify the partitions included in a batch run. Several audits, such as verifying source and target counts, are performed on this batch of data.

c. Publish
If the audits are successful, cherry-pick the staging snapshot to publish the data to production.

3. Psyberg Commit

Now that the data pipeline has been executed successfully, the new high watermark identified in the initialization step is committed to Psyberg’s high watermark metadata table. This ensures that the next instance of the workflow will pick up newer updates.

Callouts

  • Having the Psyberg step isolated from the core data pipeline allows us to maintain a consistent pattern that can be applied across stateless and stateful processing pipelines with varying requirements.
  • This also enables us to update the Psyberg layer without touching the workflows.
  • This is compatible with both Python and Scala Spark.
  • Debugging/figuring out what was loaded in every run is made easy with the help of workflow parameters and Psyberg Metadata.

The Setup: Automated end-to-end catchup

Let’s go back to our customer lifecycle example. Once we integrate all four components with Psyberg, here’s how we would set it up for automated catchup.

The three fact tables, comprising the signup and plan facts encapsulated in Psyberg’s stateless mode, along with the cancel fact in stateful mode, serve as inputs for the stateful sequential load ETL pipeline. This data pipeline monitors the various stages in the customer lifecycle.

In the sequential load ETL, we have the following features:

  • Catchup Threshold: This defines the lookback period for the data being read. For instance, only consider the last 12 hours of data.
  • Data Load Type: The ETL can either load the missed/new data specifically or reload the entire specified range.
  • Metadata Recording: Metadata is persisted for traceability.

Here is a walkthrough on how this system would automatically catch up in the event of late-arriving data:

Premise: All the tables were last loaded up to hour 5, meaning that any data from hour 6 onwards is considered new, and anything before that is classified as late data (as indicated in red above)

Fact level catchup:

  1. During the Psyberg initialization phase, the signup and plan facts identify the late data from hours 2 and 3, as well as the most recent data from hour 6. The ETL then appends this data to the corresponding partitions within the fact tables.
  2. The Psyberg initialization for the cancel fact identifies late data from hour 5 and additional data from hours 6 and 7. Since this ETL operates in stateful mode, the data in the target table from hours 5 to 7 will be overwritten with the new data.
  3. By focusing solely on updates and avoiding reprocessing of data based on a fixed lookback window, both Stateless and Stateful Data Processing maintain a minimal change footprint. This approach ensures data processing is both efficient and accurate.

Dimension level catchup:

  1. The Psyberg wrapper for this stateful ETL looks at the updates to the upstream Psyberg powered fact tables to determine the date-hour range to reprocess. Here’s how it would calculate the above range:
    MinHr = least(min processing hour from each source table)
    This ensures that we don’t miss out on any data, including late-arriving data. In this case, the minimum hour to process the data is hour 2.
    MaxHr = least(max processing hour from each source table)
    This ensures we do not process partial data, i.e., hours for which data has not been loaded into all source tables. In this case, the maximum hour to process the data is hour 6.
  2. The ETL process uses this time range to compute the state in the changed partitions and overwrite them in the target table. This helps overwrite data only when required and minimizes unnecessary reprocessing.

As seen above, by chaining these Psyberg workflows, we could automate the catchup for late-arriving data from hours 2 and 6. The Data Engineer does not need to perform any manual intervention in this case and can thus focus on more important things!

The Impact: How Psyberg Transformed Our Workflows

The introduction of Psyberg into our workflows has served as a valuable tool in enhancing accuracy and performance. The following are key areas that have seen improvements from using Psyberg:

  • Computational Resources Used:
    In certain instances, we’ve noticed a significant reduction in resource utilization, with the number of Spark cores used dropping by 90% following the implementation of Psyberg, compared to using fixed lookback windows
  • Workflow and Table Onboarding:
    We have onboarded 30 tables and 13 workflows into incremental processing since implementing Psyberg
  • Reliability and Accuracy:
    Since onboarding workflows to Psyberg, we have experienced zero manual catchups or missing data incidents
  • Bootstrap template:
    The process of integrating new tables into incremental processing has been made more accessible and now requires minimal effort using Psyberg

These performance metrics suggest that adopting Psyberg has been beneficial to the efficiency of our data processing workflows.

Next Steps and Conclusion

Integrating Psyberg into our operations has improved our data workflows and opened up exciting possibilities for the future. As we continue to innovate, Netflix’s data platform team is focused on creating a comprehensive solution for incremental processing use cases. This platform-level solution is intended to enhance our data processing capabilities across the organization. Stay tuned for a new post on this!

In conclusion, Psyberg has proven to be a reliable and effective solution for our data processing needs. As we look to the future, we’re excited about the potential for further advancements in our data platform capabilities.


3. Psyberg: Automated end to end catch up was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

2. Diving Deeper into Psyberg: Stateless vs Stateful Data Processing

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/2-diving-deeper-into-psyberg-stateless-vs-stateful-data-processing-1d273b3aaefb

By Abhinaya Shetty, Bharath Mummadisetty

In the inaugural blog post of this series, we introduced you to the state of our pipelines before Psyberg and the challenges with incremental processing that led us to create the Psyberg framework within Netflix’s Membership and Finance data engineering team. In this post, we will delve into a more detailed exploration of Psyberg’s two primary operational modes: stateless and stateful.

Modes of Operation of Psyberg

Psyberg has two main modes of operation or patterns, as we call them. Understanding the nature of the late-arriving data and processing requirements will help decide which pattern is most appropriate for a use case.

  1. Stateless Data Processing: As the name suggests, one should use this pattern in scenarios where the columns in the target table solely depend on the content of the incoming events, irrespective of their order of occurrence. For instance, consider a scenario where we need to keep track of all the customer signups over time. In this case, the order of signups wouldn’t matter, and individual signup records are independent of each other. This information has only one source, and we can append new/late records to the fact table as and when the events are received.
  2. Stateful Data Processing: This pattern is useful when the output depends on a sequence of events across one or more input streams. For example, the customer account lifecycle in a business might involve multiple stages, such as account creation, plan upgrades, downgrades, and cancellation. To derive attributes like the lifetime of an account or the latest plan the account is on, we need to track the sequence of these events across different input streams. A missed event in such a scenario would result in incorrect analysis due to a wrong derived state. Late-arriving data in such cases requires overwriting data that was previously processed to ensure all events are accounted for.

Let’s visualize how these two modes work within our data processing pipeline using a general workflow for loading a fact table. If you would like to learn more about how the workflows are orchestrated in Netflix Maestro scheduler, please check out this blog post from our data platform team.

With this illustration as our guide, let’s explore each mode in more detail.

The Psyberg Initialization Phase

This step invokes Psyberg with the required parameters. Based on these parameters, Psyberg then computes the correct data range for the pipeline processing needs.

Input parameters in this step include the following:

Initialization for Stateless Data Processing

Let’s use the signup fact table as an example here. This table’s workflow runs hourly, with the main input source being an Iceberg table storing all raw signup events partitioned by landing date, hour, and batch id.

Here’s a YAML snippet outlining the configuration for this during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=signup_fact_load
- --src_tables=raw_signups
- --psyberg_session_id=20230914061001
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=1

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateless pattern since etl_pattern_id=1.

Psyberg also uses the provided inputs to detect the Iceberg snapshots that persisted after the latest high watermark available in the watermark table. Using the summary column in snapshot metadata [see the Iceberg Metadata section in post 1 for more details], we parse out the partition information for each Iceberg snapshot of the source table.

Psyberg then retains these processing URIs (an array of JSON strings containing combinations of landing date, hour, and batch IDs) as determined by the snapshot changes. This information and other calculated metadata are stored in the psyberg_session_f table. This stored data is then available for the subsequent LOAD.FACT_TABLE job in the workflow to utilize and for analysis and debugging purposes.

Initialization for Stateful Data Processing

Stateful Data Processing is used when the output depends on a sequence of events across one or more input streams.

Let’s consider the example of creating a cancel fact table, which takes the following as input:

  1. Raw cancellation events indicating when the customer account was canceled
  2. A fact table that stores incoming customer requests to cancel their subscription at the end of the billing period

These inputs help derive additional stateful analytical attributes like the type of churn i.e. voluntary or involuntary, etc.

The initialization step for Stateful Data Processing differs slightly from Stateless. Psyberg offers additional configurations according to the pipeline needs. Here’s a YAML snippet outlining the configuration for the cancel fact table during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=cancel_fact_load
- --src_tables=raw_cancels|processing_ts,cancel_request_fact
- --psyberg_session_id=20230914061501
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=2

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateful pattern since etl_pattern_id is 2.

Notice the additional detail in the src_tables list corresponding to raw_cancels above. The processing_ts here represents the event processing timestamp which is different from the regular Iceberg snapshot commit timestamp i.e. event_landing_ts as described in part 1 of this series.

It is important to capture the range of a consolidated batch of events from all the sources i.e. both raw_cancels and cancel_request_fact, while factoring in late-arriving events. Changes to the source table snapshots can be tracked using different timestamp fields. Knowing which timestamp field to use i.e. event_landing_ts or something like processing_ts helps avoid missing events.

Similar to the approach in stateless data processing, Psyberg uses the provided inputs to parse out the partition information for each Iceberg snapshot of the source table.

Sample parsed input for target snapshot_date 20230914 and snapshot_hour 9

This is then used to query the partitions metadata table which has the min and max range for each column in the source table. In this case, we look at the min and max range of the processing_ts column to determine actual partitions for any late-arriving events. The minimum value here helps determine the lower limit of the data to be processed i.e. the derived minimum date and hour based on the input epoch timestamp.

Lower Limit to be processed = least ( “min” event_processing_ts)

It also tracks the VTTS (Valid To TimeStamp) of all the input streams and determines the minimum VTTS of all the streams together. This helps determine the upper limit of data to be processed, thus restricting the data load based on data completeness of all the streams combined.

Upper Limit to be processed = least (vtts date-hour)

Using this metadata from different streams, Psyberg calculates several parameters like minimum/maximum processing date and hour and event landing date hour. These parameters, along with other metadata, discussed in the previous post, are persisted in the psyberg_session_f table for analysis and debugging purposes.

Write Audit Publish (WAP) process

The Write Audit Publish (WAP) process is a general pattern we use in our ETLs to validate writes to the uncommitted Iceberg snapshot before publishing to the target table. The LOAD.FACT_TABLE step takes psyberg_session_id and process_name as input arguments.

For stateless pattern, the processing URIs to be processed as part of the load step are identified by reading the psyberg_session_f table. This information is then used to filter the source table and apply the business logic to create the signup fact table. Any late-arriving signup events data is appended to the target table partitions as part of this. All these writes go into the uncommitted Iceberg snapshot managed by the WAP pattern.

Similarly, in the stateful pattern, the ETL step reads the psyberg_session_f table to identify the derived minimum and maximum date hour range to be processed, which acts as a filter for different input tables involved in the ETL. After applying the corresponding business logic for cancellation events, we create the cancel fact table along with columns like cancellation type (i.e., voluntary vs involuntary churn) representing the state of the canceled account. If there are any late-arriving events, Psyberg handles them automatically by providing the correct range to the data process to derive the state changes correctly.

Audits

We run different audits on the uncommitted Iceberg snapshot created as part of the job run. Leveraging Psyberg metadata, we can identify the cohort of data involved as part of the job run. This helps in pinpointing changes and applying blocking audits efficiently. Audits like source-to-target count comparison and checking for no missing events in the target Iceberg snapshot ensure data integrity and completeness. Once the audits pass successfully, the data is published to the target table.

HWM Commit

Leveraging Psyberg metadata tables, we determine the latest timestamp associated with the Iceberg snapshot seen as part of the job run. This timestamp is used to update the high watermark table with the new high watermark so that the subsequent pipeline instance can pick up the next set of changes.

Conclusion

This exploration shows how Psyberg brings efficiency, accuracy, and timeliness to Stateless and Stateful Data Processing within the Membership and Finance data engineering team. Join us in the next part of our blog series, where we’ll discuss how it also helps automate the end-to-end catchup of different pipelines.


2. Diving Deeper into Psyberg: Stateless vs Stateful Data Processing was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

1. Streamlining Membership Data Engineering at Netflix with Psyberg

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/1-streamlining-membership-data-engineering-at-netflix-with-psyberg-f68830617dd1

By Abhinaya Shetty, Bharath Mummadisetty

At Netflix, our Membership and Finance Data Engineering team harnesses diverse data related to plans, pricing, membership life cycle, and revenue to fuel analytics, power various dashboards, and make data-informed decisions. Many metrics in Netflix’s financial reports are powered and reconciled with efforts from our team! Given our role on this critical path, accuracy is paramount. In this context, managing the data, especially when it arrives late, can present a substantial challenge!

In this three-part blog post series, we introduce you to Psyberg, our incremental data processing framework designed to tackle such challenges! We’ll discuss batch data processing, the limitations we faced, and how Psyberg emerged as a solution. Furthermore, we’ll delve into the inner workings of Psyberg, its unique features, and how it integrates into our data pipelining workflows. By the end of this series, we hope you will gain an understanding of how Psyberg transformed our data processing, making our pipelines more efficient, accurate, and timely. Let’s dive in!

The Challenge: Incremental Data Processing with Late Arriving Data

Our teams’ data processing model mainly comprises batch pipelines, which run at different intervals ranging from hourly to multiple times a day (also known as intraday) and even daily. We expect complete and accurate data at the end of each run. To meet such expectations, we generally run our pipelines with a lag of a few hours to leave room for late-arriving data.

What is late-arriving data?

Late-arriving data is essentially delayed data due to system retries, network delays, batch processing schedules, system outages, delayed upstream workflows, or reconciliation in source systems.

How does late-arriving data impact us?

You could think of our data as a puzzle. With each new piece of data, we must fit it into the larger picture and ensure it’s accurate and complete. Thus, we must reprocess the missed data to ensure data completeness and accuracy.

Types of late-arriving data

Based on the structure of our upstream systems, we’ve classified late-arriving data into two categories, each named after the timestamps of the updated partition:

Ways to process such data

Our team previously employed some strategies to manage these scenarios, which often led to unnecessarily reprocessing unchanged data. Some techniques we used were:

1. Using fixed lookback windows to always reprocess data, assuming that most late-arriving events will occur within that window. However, this approach usually leads to redundant data reprocessing, thereby increasing ETL processing time and compute costs. It also becomes inefficient as the data scale increases. Imagine reprocessing the past 6 hours of data every hour!

2. Add alerts to flag when late arriving data appears, block the pipelines, and perform a manual intervention where we triggered backfill pipelines to handle the missed events. This approach was a simple solution with minimal extra processing for the most part and, hence, was our preferred solution. However, when the late events occurred, the pain of reprocessing data and catching up on all the dependent pipelines was not worth it! We will talk about this shortly.

At a high level, both these approaches were inefficient for intraday pipelines and impacted cost, performance, accuracy, and time. We developed Psyberg, an incremental processing framework using Iceberg to handle these challenges more effectively.

The state of our pipelines before Psyberg

Before diving into the world of Psyberg, it’s crucial to take a step back and reflect on the state of the data pipelines in our team before its implementation. The complexities involved in these processes and the difficulties they posed led to the development of Psyberg.

At Netflix, our backend microservices continuously generate real-time event data that gets streamed into Kafka. These raw events are the source of various data processing workflows within our team. We ingest this diverse event data and transform it into standardized fact tables. The fact tables then feed downstream intraday pipelines that process the data hourly. The sequential load ETL shown in the diagram below depicts one such pipeline that calculates an account's state every hour.

Raw data for hours 3 and 6 arrive. Hour 6 data flows through the various workflows, while hour 3 triggers a late data audit alert.

Let’s walk through an example to understand the complexity of this pre-Psyberg world.

Consider a simplified version of our pipelines where we process three events: signups, plan changes, and cancels. Now imagine that some signup events from hour 3 were delayed and sent in at hour 6 instead. Our audits would detect this and alert the on-call data engineer (DE). The on-call DE would then face the daunting task of making things right!

Step 1: Dive into the audit logs to identify the late-arriving data and the impacted workflows. In this case, they would discover that the late-arriving data for hour 3 must be included in the signup facts.

Step 2: Stop all impacted workflows and downstream jobs (such as the sequential load ETL) and patch the missed data in the fact tables. Now, the data in the signup fact is patched.

Step 3: Identify the number of partitions to be rerun for the sequential stateful load jobs to account for the delayed data and rerun them from the impacted date-hour. The DE would note that the data for hours 3–6 needs to be reprocessed and will retrigger four instances to be run sequentially. This step is crucial because missing signup events from hour 3 would result in us missing subsequent events for those affected accounts (e.g., a cancel event for a missed signup would have had no effect). As we capture the state of an account based on the sequence of different types of events, rerunning the sequential load ETL from hours 3 to 6 ensures the accurate representation of account states.

Step 4: Now that we’ve spent significant time triaging and resolving the alert, the sequential ETL workflow likely experienced a delay. As a result, we need to catch up to schedule. To compensate for the lost time, the DE must trigger a few additional instances until the latest hour that would have run if the data hadn’t arrived late.

This entire process was challenging and required significant manual intervention from the on-call DE perspective. Note that these are hourly jobs, so the alert could be triggered at any time of the day (or night!). Yes, they were infrequent, but a big pain point when they occurred! Also, the on-call DE was usually not the SME for these pipelines, as the late data could have arrived in any of our upstream pipelines. To solve these problems, we came up with Psyberg!

Psyberg: The Game Changer!

Psyberg automates our data loads, making it suitable for various data processing needs, including intraday pipeline use cases. It leverages Iceberg metadata to facilitate processing incremental and batch-based data pipelines.

One of the critical features of Psyberg is its ability to detect and manage late-arriving data, no matter the partition it lands in. This feature allows data pipelines to handle late-arriving data effectively without manual intervention, ensuring higher data accuracy in our systems. Iceberg metadata and Psyberg’s own metadata form the backbone of its efficient data processing capabilities.

ETL Process High Watermark

This is the last recorded update timestamp for any data pipeline process. This is mainly used to identify new changes since the last update.

Iceberg Metadata

Psyberg primarily harnesses two key iceberg metadata tables — snapshots and partitions — to manage the workload. All Iceberg tables have associated metadata that provide insight into changes or updates within the data tables.

The snapshots metadata table records essential metadata such as:

  • The creation time of a snapshot
  • The type of operation performed (append, overwrite, etc.)
  • A summary of partitions created/updated during the generation of the Iceberg snapshot

These details enable Psyberg to track different operations and identify changes made to a source table since the previous high watermark. For example:

The partitions metadata table is particularly interesting as it stores:

  • Information about partition keys used in the data table
  • Column names and the range of values for each column within a specific partition

One unique aspect of Netflix’s internal implementation is that it provides the range of values for each column within a partition in a deserialized format. This information helps Psyberg comprehend the timestamp ranges for both types of late-arriving data (event and processing time) without querying the actual data.

Psyberg Metadata

In addition to Iceberg metadata, Psyberg maintains its own metadata tables — the session table and the high watermark table. Both these tables are partitioned by the pipeline process name to maintain information related to each data pipeline independently.

The session table captures metadata specific to each pipeline run, including:

  • Process name partition to track all the runs associated with the data pipeline process
  • Session ID to track unique runs within the process
  • Processing URIs to identify the input partitions involved in the load
  • “from date”, “from hour”, “to date” and “to hour” for both event and processing times

The high watermark table stores relevant values from the session table at the end of each pipeline run:

  • Latest and previous high water mark timestamp
  • Metadata related to the latest run

This information is vital for each pipeline run instance as it helps determine the data to be loaded, updates the high water mark after processing, and finally generates output signals to inform downstream workflows about the date-hour up to which data is complete and available. It also serves as an essential resource for debugging and creating audits on the pipeline jobs.

Conclusion

In this post, we described our data architecture at a high level, along with the pain points that led to the development of Psyberg. We also went into details related to the metadata that powers Psyberg. If you understand the challenges faced by the on-call DE and would like to learn more about our solution, please check out the next iteration of this three-part series, where we delve deeper into different modes of Psyberg.


1. Streamlining Membership Data Engineering at Netflix with Psyberg was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Detecting Speech and Music in Audio Content

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/detecting-speech-and-music-in-audio-content-afd64e6a5bf8

Iroro Orife, Chih-Wei Wu and Yun-Ning (Amy) Hung

Introduction

When you enjoy the latest season of Stranger Things or Casa de Papel (Money Heist), have you ever wondered about the secrets to fantastic story-telling, besides the stunning visual presentation? From the violin melody accompanying a pivotal scene to the soaring orchestral arrangement and thunderous sound-effects propelling an edge-of-your-seat action sequence, the various components of the audio soundtrack combine to evoke the very essence of story-telling. To uncover the magic of audio soundtracks and further improve the sonic experience, we need a way to systematically examine the interaction of these components, typically categorized as dialogue, music and effects.

In this blog post, we will introduce speech and music detection as an enabling technology for a variety of audio applications in Film & TV, as well as introduce our speech and music activity detection (SMAD) system which we recently published as a journal article in EURASIP Journal on Audio, Speech, and Music Processing.

Like semantic segmentation for audio, SMAD separately tracks the amount of speech and music in each frame in an audio file and is useful in content understanding tasks during the audio production and delivery lifecycle. The detailed temporal metadata SMAD provides about speech and music regions in a polyphonic audio mixture are a first step for structural audio segmentation, indexing and pre-processing audio for the following downstream tasks. Let’s have a look at a few applications.

Practical use cases for speech & music activity

Audio dataset preparation

Speech & music activity is an important preprocessing step to prepare corpora for training. SMAD classifies & segments long-form audio for use in large corpora, such as

From “Audio Signal Classification” by David Gerhard

Dialogue analysis & processing

  • During encoding at Netflix, speech-gated loudness is computed for every audio master track and used for loudness normalization. Speech-activity metadata is thus a central part of accurate catalog-wide loudness management and improved audio volume experience for Netflix members.
  • Similarly, algorithms for dialogue intelligibility, spoken-language-identification and speech-transcription are only applied to audio regions where there is measured speech.

Music information retrieval

  • There are a few studio use cases where music activity metadata is important, including quality-control (QC) and at-scale multimedia content analysis and tagging.
  • There are also inter-domain tasks like singer-identification and song lyrics transcription, which do not fit neatly into either speech or classical MIR tasks, but are useful for annotating musical passages with lyrics in closed captions and subtitles.
  • Conversely, where neither speech nor music activity is present, such audio regions are estimated to have content classified as noisy, environmental or sound-effects.

Localization & Dubbing

Finally, there are post-production tasks, which take advantage of accurate speech segmentation at the the spoken utterance or sentence level, ahead of translation and dub-script generation. Likewise, authoring accessibility-features like Audio Description (AD) involves music and speech segmentation. The AD narration is typically mixed-in to not overlap with the primary dialogue, while music lyrics strongly tied to the plot of the story, are sometimes referenced by AD creators, especially for translated AD.

A voice actor in the studio

Our Approach to Speech and Music Activity Detection

Although the application of deep learning methods has improved audio classification systems in recent years, this data driven approach for SMAD requires large amounts of audio source material with audio-frame level speech and music activity labels. The collection of such fine-resolution labels is costly and labor intensive and audio content often cannot be publicly shared due to the copyright limitations. We address the challenge from a different angle.

Content, genre and languages

Instead of augmenting or synthesizing training data, we sample the large scale data available in the Netflix catalog with noisy labels. In contrast to clean labels, which indicate precise start and end times for each speech/music region, noisy labels only provide approximate timing, which may impact SMAD classification performance. Nevertheless, noisy labels allow us to increase the scale of the dataset with minimal manual efforts and potentially generalize better across different types of content.

Our dataset, which we introduced as TVSM (TV Speech and Music) in our publication, has a total number of 1608 hours of professionally recorded and produced audio. TVSM is significantly larger than other SMAD datasets and contains both speech and music labels at the frame level. TVSM also contains overlapping music and speech labels, and both classes have a similar total duration.

Training examples were produced between 2016 and 2019, in 13 countries, with 60% of the titles originating in the USA. Content duration ranged from 10 minutes to over 1 hour, across the various genres listed below.

The dataset contains audio tracks in three different languages, namely English, Spanish, and Japanese. The language distribution is shown in the figure below. The name of the episode/TV show for each sample remains unpublished. However, each sample has both a show-ID and a season-ID to help identify the connection between the samples. For instance, two samples from different seasons of the same show would share the same show ID and have different season IDs.

What constitutes music or speech?

To evaluate and benchmark our dataset, we manually labeled 20 audio tracks from various TV shows which do not overlap with our training data. One of the fundamental issues encountered during the annotation of our manually-labeled TVSM-test set, was the definition of music and speech. The heavy usage of ambient sounds and sound effects blurs the boundaries between active music regions and non-music. Similarly, switches between conversational speech and singing voices in certain TV genres obscure where speech starts and music stops. Furthermore, must these two classes be mutually exclusive? To ensure label quality, consistency, and to avoid ambiguity, we converged on the following guidelines for differentiating music and speech:

  • Any music that is perceivable by the annotator at a comfortable playback volume should be annotated.
  • Since sung lyrics are often included in closed-captions or subtitles, human singing voices should all be annotated as both speech and music.
  • Ambient sound or sound effects without apparent melodic contours should not be annotated as music. Traditional phone bell, ringing, or buzzing without apparent melodic contours should not be annotated as music.
  • Filled pauses (uh, um, ah, er), backchannels (mhm, uh-huh), sighing, and screaming should not be annotated as speech.

Audio format and preprocessing

All audio files were originally delivered from the post-production studios in the standard 5.1 surround format at 48 kHz sampling rate. We first normalize all files to an average loudness of −27 LKFS ± 2 LU dialog-gated, then downsample to 16 kHz before creating an ITU downmix.

Model Architecture

Our modeling choices take advantage of both convolutional and recurrent architectures, which are known to work well on audio sequence classification tasks, and are well supported by previous investigations. We adapted the SOTA convolutional recurrent neural network (CRNN) architecture to accommodate our requirements for input/output dimensionality and model complexity. The best model was a CRNN with three convolutional layers, followed by two bi-directional recurrent layers and one fully connected layer. The model has 832k trainable parameters and emits frame-level predictions for both speech and music with a temporal resolution of 5 frames per second.

For training, we leveraged our large and diverse catalog dataset with noisy labels, introduced above. Applying a random sampling strategy, each training sample is a 20 second segment obtained by randomly selecting an audio file and corresponding starting timecode offset on the fly. All models in our experiments were trained by minimizing binary cross-entropy (BCE) loss.

Evaluation

In order to understand the influence of different variables in our experimental setup, e.g. model architecture, training data or input representation variants like log-Mel Spectrogram versus per-channel energy normalization (PCEN), we setup a detailed ablation study, which we encourage the reader to explore fully in our EURASIP journal article.

For each experiment, we reported the class-wise F-score and error rate with a segment size of 10ms. The error rate is the summation of deletion rate (false negative) and insertion rate (false positive). Since a binary decision must be attained for music and speech to calculate the F-score, a threshold of 0.5 was used to quantize the continuous output of speech and music activity functions.

Results

We evaluated our models on four open datasets comprising audio data from TV programs, YouTube clips and various content such as concert, radio broadcasts, and low-fidelity folk music. The excellent performance of our models demonstrates the importance of building a robust system that detects overlapping speech and music and supports our assumption that a large but noisy-labeled real-world dataset can serve as a viable solution for SMAD.

Conclusion

At Netflix, tasks throughout the content production and delivery lifecycle work are most often interested in one part of the soundtrack. Tasks that operate on just dialogue, music or effects are performed hundreds of times a day, by teams around the globe, in dozens of different audio languages. So investments in algorithmically-assisted tools for automatic audio content understanding like SMAD, can yield substantial productivity returns at scale while minimizing tedium.

Additional Resources

We have made audio features and labels available via Zenodo. There is also GitHub repository with the following audio tools:

  • Python code for data pre-processing, including scripts for 5.1 downmixing, Mel spectrogram generation, MFCCs generation, VGGish features generation, and the PCEN implementation.
  • Python code for reproducing all experiments, including scripts of data loaders, model implementations, training and evaluation pipelines.
  • Pre-trained models for each conducted experiment.
  • Prediction outputs for all audio in the evaluation datasets.

Special thanks to the entire Audio Algorithms team, as well as Amir Ziai, Anna Pulido, and Angie Pollema.


Detecting Speech and Music in Audio Content was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

The Next Step in Personalization: Dynamic Sizzles

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/the-next-step-in-personalization-dynamic-sizzles-4dc4ce2011ef

Authors:Bruce Wobbe, Leticia Kwok

Additional Credits:Sanford Holsapple, Eugene Lok, Jeremy Kelly

Introduction

At Netflix, we strive to give our members an excellent personalized experience, helping them make the most successful and satisfying selections from our thousands of titles. We already personalize artwork and trailers, but we hadn’t yet personalized sizzle reels — until now.

A sizzle reel is a montage of video clips from different titles strung together into a seamless A/V asset that gets members excited about upcoming launches (for example, our Emmys nominations or holiday collections). Now Netflix can create a personalized sizzle reel dynamically in real time and on demand. The order of the clips and included titles are personalized per member, giving each a unique and effective experience. These new personalized reels are called Dynamic Sizzles.

In this post, we will dive into the exciting details of how we create Dynamic Sizzles with minimal human intervention, including the challenges we faced and the solutions we developed.

An example of a Dynamic Sizzle created for Chuseok, the Korean mid-autumn harvest festival collection.

Overview

In the past, each sizzle reel was created manually. The time and cost of doing this prevents scaling and misses the invaluable benefit of personalization, which is a bedrock principle at Netflix. We wanted to figure out how to efficiently scale sizzle reel production, while also incorporating personalization — all in an effort to yield greater engagement and enjoyment for our members.

Enter the creation of Dynamic Sizzles. We developed a systems-based approach that uses our interactive and creative technology to programmatically stitch together multiple video clips alongside a synced audio track. The process involves compiling personalized multi-title/multi-talent promotional A/V assets on the fly into a Mega Asset. A Mega Asset is a large A/V asset made up of video clips from various titles, acting as a library from which the Dynamic Sizzle pulls media. These clips are then used to construct a personalized Dynamic Sizzle according to a predefined cadence.

With Dynamic Sizzles, we can utilize more focused creative work from editors and generate a multitude of personalized sizzle reels efficiently and effectively — up to 70% in terms of time and cost savings than a manually created one. This gives us the ability to create thousands, if not millions, of combinations of video clips and assets that result in optimized and personalized sizzle reel experiences for Netflix members.

Creating the Mega Asset

Where To Begin

Our first challenge was figuring out how to create the Mega Asset, as each video clip needs to be precise in its selection and positioning. A Mega Asset can contain any number of clips, and millions of unique Dynamic Sizzles can be produced from a single Mega Asset.

We accomplished this by using human editors to select the clips — ensuring that they are well-defined from both a creative and technical standpoint — then laying them out in a specific known order in a timeline. We also need each clip marked with an index to its location — an extremely tedious and time consuming process for an editor. To solve this, we created an Adobe Premiere plug-in to automate the process. Further verifications can also be done programmatically via ingestion of the timecode data, as we can validate the structure of the Mega Asset by looking at the timecodes.

An example of a title’s video clips layout.

The above layout shows how a single title’s clips are ordered in a Mega Asset and in 3 different lengths: 160, 80 and 40 frame rates. Each clip should be unique per title; however, when using multiple titles, they may share the same frame rate. This gives us more variety to choose from while maintaining a structured order in the layout.

Cadence

The cadence is a predetermined collection of clip lengths that indicates when, where, and for how long a title shows within a Dynamic Sizzle. The cadence ensures that when a Dynamic Sizzle is played, it will show a balanced view of any titles chosen, while still giving more time to a member’s higher ranked titles. Cadence is something we can personalize or randomize, and will continue to evolve as needed.

Sample Cadence

In the above sample cadence, Title A refers to the highest ranked title in a member’s personalized sort, Title B the second highest, and so on. The cadence is made up of 3 distinct segments with 5 chosen titles (A-E) played in sequence using various clip lengths. Each clip in the cadence refers to a different clip in the Mega Asset. For example, the 80 frame clip for title A in the first (red) segment is different from the 80 frame clip for title A in the third (purple) segment.

Composing the Dynamic Sizzle

Personalization

When a request comes in for a sizzle reel, our system determines what titles are in the Mega Asset and based on the request, a personalized list of titles is created and sorted. The top titles for a member are then used to construct the Dynamic Sizzle by leveraging the clips in the Mega Asset. Higher ranked titles get more weight in placement and allotted time.

Finding Timecodes

For the Dynamic Sizzle process, we have to quickly and dynamically determine the timecodes for each clip in the Mega Asset and make sure they are easily accessed at runtime. We accomplish this by utilizing Netflix’s Hollow technology. Hollow allows us to store timecodes for quick searches and use timecodes as a map — a key can be used to find the timecodes needed as defined by the cadence. The key can be as simple as titleId-clip-1.

Building The Reel

The ordering of the clips are set by the predefined cadence, which dictates the final layout and helps easily build the Dynamic Sizzle. For example, if the system knows to use title 17 within the Mega Asset, we can easily calculate the time offset for all the clips because of the known ordering of the titles and clips within the Mega Asset. This all comes together in the following way:

The result is a series of timecodes indicating the start and stop times for each clip. These codes appear in the order they should be played and the player uses them to construct a seamless video experience as seen in the examples below:

The Beautiful Game Sizzle
The Beautiful Game Dynamic Sizzle

With Dynamic Sizzles, each member experiences a personalized sizzle reel.

Example of what 2 different profiles might see for the same sizzle

Playing the Dynamic Sizzle

Delivering To The Player

The player leverages the Mega Asset by using timecodes to know where to start and stop each clip, and then seamlessly plays each one right after the other. This required a change in the API that devices normally use to get trailers. The API change was twofold. First, on the request we need the device to indicate that it can support Dynamic Sizzles. Second, on the response the timecode list needs to be sent. (Changing the API and rolling it out took time, so this all had to be implemented before Dynamic Sizzles could actually be used, tested, and productized.)

Challenges With The Player

There were two main challenges with the player. First, in order to support features like background music across multiple unique video segments, we needed to support asymmetrical segment streaming from discontiguous locations in the Mega Asset. This involved modifying existing schemas and adding corresponding support to the player to allow for the stitching of the video and audio together separately while still keeping the timecodes in sync. Second, we needed to optimize our streaming algorithms to account for these much shorter segments, as some of our previous assumptions were incorrect when dealing with dozens of discontiguous tiny segments in the asset.

Building Great Things Together

We are just getting started on this journey to build truly great experiences. While the challenges may seem endless, the work is incredibly fulfilling. The core to bringing these great engineering solutions to life is the direct collaboration we have with our colleagues and innovating together to solve these challenges.

If you are interested in working on great technology like Dynamic Sizzles, we’d love to talk to you! We are hiring: jobs.netflix.com


The Next Step in Personalization: Dynamic Sizzles was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Building In-Video Search

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/building-in-video-search-936766f0017c

Boris Chen, Ben Klein, Jason Ge, Avneesh Saluja, Guru Tahasildar, Abhishek Soni, Juan Vimberg, Elliot Chow, Amir Ziai, Varun Sekhri, Santiago Castro, Keila Fong, Kelli Griggs, Mallia Sherzai, Robert Mayer, Andy Yao, Vi Iyengar, Jonathan Solorzano-Hamilton, Hossein Taghavi, Ritwik Kumar

Introduction

Today we’re going to take a look at the behind the scenes technology behind how Netflix creates great trailers, Instagram reels, video shorts and other promotional videos.

Suppose you’re trying to create the trailer for the action thriller The Gray Man, and you know you want to use a shot of a car exploding. You don’t know if that shot exists or where it is in the film, and you have to look for it it by scrubbing through the whole film.

Exploding cars — The Gray Man (2022)

Or suppose it’s Christmas, and you want to create a great instagram piece out all the best scenes across Netflix films of people shouting “Merry Christmas”! Or suppose it’s Anya Taylor Joy’s birthday, and you want to create a highlight reel of all her most iconic and dramatic shots.

Making these comes down to finding the right video clips amongst hundreds of thousands movies and TV shows to find the right line of dialogue or the right visual elements (objects, scenes, emotions, actions, etc.). We have built an internal system that allows someone to perform in-video search across the entire Netflix video catalog, and we’d like to share our experience in building this system.

Building in-video search

To build such a visual search engine, we needed a machine learning system that can understand visual elements. Our early attempts included object detection, but found that general labels were both too limiting and too specific, yet not specific enough. Every show has special objects that are important (e.g. Demogorgon in Stranger Things) that don’t translate to other shows. The same was true for action recognition, and other common image and video tasks.

The Approach

We found that contrastive learning between images and text pairs work well for our goals because these models are able to learn joint embedding spaces between the two modalities. This approach is also able to learn about objects, scenes, emotions, actions, and more in a single model. We also found that extending contrastive learning to videos and text provided a substantial improvement over frame-level models.

In order to train the model on internal training data (video clips with aligned text descriptions), we implemented a scalable version on Ray Train and switched to a more performant video decoding library. Lastly, the embeddings from the video encoder exhibit strong zero or few-shot performance on multiple video and content understanding tasks at Netflix and are used as a starting point in those applications.

The recent success of large-scale models that jointly train image and text embeddings has enabled new use cases around multimodal retrieval. These models are trained on large amounts of image-caption pairs via in-batch contrastive learning. For a (large) batch of N examples, we wish to maximize the embedding (cosine) similarity of the N correct image-text pairs, while minimizing the similarity of the other N²-N paired embeddings. This is done by treating the similarities as logits and minimizing the symmetric cross-entropy loss, which gives equal weighting to the two settings (treating the captions as labels to the images and vice versa).

Consider the following two images and captions:

Images are from Glass Onion: A Knives Out Mystery (2022)

Once properly trained, the embeddings for the corresponding images and text (i.e. captions) will be close to each other and farther away from unrelated pairs.

Typically embedding spaces are hundred/thousand dimensional.

At query time, the input text query can be mapped into this embedding space, and we can return the closest matching images.

The query may have not existed in the training set. Cosine similarity can be used as a similarity measure.

While these models are trained on image-text pairs, we have found that they are an excellent starting point to learning representations of video units like shots and scenes. As videos are a sequence of images (frames), additional parameters may need to be introduced to compute embeddings for these video units, although we have found that for shorter units like shots, an unparameterized aggregation like averaging (mean-pooling) can be more effective. To train these parameters as well as fine-tune the pretrained image-text model weights, we leverage in-house datasets that pair shots of varying durations with rich textual descriptions of their content. This additional adaptation step improves performance by 15–25% on video retrieval tasks (given a text prompt), depending on the starting model used and metric evaluated.

On top of video retrieval, there are a wide variety of video clip classifiers within Netflix that are trained specifically to find a particular attribute (e.g. closeup shots, caution elements). Instead of training from scratch, we have found that using the shot-level embeddings can give us a significant head start, even beyond the baseline image-text models that they were built on top of.

Lastly, shot embeddings can also be used for video-to-video search, a particularly useful application in the context of trailer and promotional asset creation.

Engineering and Infrastructure

Our trained model gives us a text encoder and a video encoder. Video embeddings are precomputed on the shot level, stored in our media feature store, and replicated to an elastic search cluster for real-time nearest neighbor queries. Our media feature management system automatically triggers the video embedding computation whenever new video assets are added, ensuring that we can search through the latest video assets.

The embedding computation is based on a large neural network model and has to be run on GPUs for optimal throughput. However, shot segmentation from a full-length movie is CPU-intensive. To fully utilize the GPUs in the cloud environment, we first run shot segmentation in parallel on multi-core CPU machines, store the result shots in S3 object storage encoded in video formats such as mp4. During GPU computation, we stream mp4 video shots from S3 directly to the GPUs using a data loader that performs prefetching and preprocessing. This approach ensures that the GPUs are efficiently utilized during inference, thereby increasing the overall throughput and cost-efficiency of our system.

At query time, a user submits a text string representing what they want to search for. For visual search queries, we use the text encoder from the trained model to extract an text embedding, which is then used to perform appropriate nearest neighbor search. Users can also select a subset of shows to search over, or perform a catalog wide search, which we also support.

If you’re interested in more details, see our other post covering the Media Understanding Platform.

Conclusion

Finding a needle in a haystack is hard. We learned from talking to video creatives who make trailers and social media videos that being able to find needles was key, and a big pain point. The solution we described has been fruitful, works well in practice, and is relatively simple to maintain. Our search system allows our creatives to iterate faster, try more ideas, and make more engaging videos for our viewers to enjoy.

We hope this post has been interesting to you. If you are interested in working on problems like this, Netflix is always hiring great researchers, engineers and creators.


Building In-Video Search was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Streaming SQL in Data Mesh

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/streaming-sql-in-data-mesh-0d83f5a00d08

Democratizing Stream Processing @ Netflix

By Guil Pires, Mark Cho, Mingliang Liu, Sujay Jain

Data powers much of what we do at Netflix. On the Data Platform team, we build the infrastructure used across the company to process data at scale.

In our last blog post, we introduced “Data Mesh” — A Data Movement and Processing Platform. When a user wants to leverage Data Mesh to move and transform data, they start by creating a new Data Mesh pipeline. The pipeline is composed of individual “Processors” that are connected by Kafka topics. The Processors themselves are implemented as Flink jobs that use the DataStream API.

Since then, we have seen many use cases (including Netflix Graph Search) adopt Data Mesh for stream processing. We were able to onboard many of these use cases by offering some commonly used Processors out of the box, such as Projection, Filtering, Unioning, and Field Renaming.

An example of a Data Mesh pipeline which moves and transforms data using Union, GraphQL Enrichment, and Column Rename Processor before writing to an Iceberg table.

By keeping the logic of individual Processors simple, it allowed them to be reusable so we could centrally manage and operate them at scale. It also allowed them to be composable, so users could combine the different Processors to express the logic they needed.

However, this design decision led to a different set of challenges.

Some teams found the provided building blocks were not expressive enough. For use cases which were not solvable using existing Processors, users had to express their business logic by building a custom Processor. To do this, they had to use the low-level DataStream API from Flink and the Data Mesh SDK, which came with a steep learning curve. After it was built, they also had to operate the custom Processors themselves.

Furthermore, many pipelines needed to be composed of multiple Processors. Since each Processor was implemented as a Flink Job connected by Kafka topics, it meant there was a relatively high runtime overhead cost for many pipelines.

We explored various options to solve these challenges, and eventually landed on building the Data Mesh SQL Processor that would provide additional flexibility for expressing users’ business logic.

The existing Data Mesh Processors have a lot of overlap with SQL. For example, filtering and projection can be expressed in SQL through SELECT and WHERE clauses. Additionally, instead of implementing business logic by composing multiple individual Processors together, users could express their logic in a single SQL query, avoiding the additional resource and latency overhead that came from multiple Flink jobs and Kafka topics. Furthermore, SQL can support User Defined Functions (UDFs) and custom connectors for lookup joins, which can be used to extend expressiveness.

Data Mesh SQL Processor

Since Data Mesh Processors are built on top of Flink, it made sense to consider using Flink SQL instead of continuing to build additional Processors for every transform operation we needed to support.

The Data Mesh SQL Processor is a platform-managed, parameterized Flink Job that takes schematized sources and a Flink SQL query that will be executed against those sources. By leveraging Flink SQL within a Data Mesh Processor, we were able to support the streaming SQL functionality without changing the architecture of Data Mesh.

Underneath the hood, the Data Mesh SQL Processor is implemented using Flink’s Table API, which provides a powerful abstraction to convert between DataStreams and Dynamic Tables. Based on the sources that the processor is connected to, the SQL Processor will automatically convert the upstream sources as tables within Flink’s SQL engine. User’s query is then registered with the SQL engine and translated into a Flink job graph consisting of physical operators that can be executed on a Flink cluster. Unlike the low-level DataStream API, users do not have to manually build a job graph using low-level operators, as this is all managed by Flink’s SQL engine.

SQL Experience on Data Mesh

The SQL Processor enables users to fully leverage the capabilities of the Data Mesh platform. This includes features such as autoscaling, the ability to manage pipelines declaratively via Infrastructure as Code, and a rich connector ecosystem.

In order to ensure a seamless user experience, we’ve enhanced the Data Mesh platform with SQL-centric features. These enhancements include an Interactive Query Mode, real-time query validation, and automated schema inference.

To understand how these features help the users be more productive, let’s take a look at a typical user workflow when using the Data Mesh SQL Processor.

  • Users start their journey by live sampling their upstream data sources using the Interactive Query Mode.
  • As the user iterate on their SQL query, the query validation service provides real-time feedback about the query.
  • With a valid query, users can leverage the Interactive Query Mode again to execute the query and get the live results streamed back to the UI within seconds.
  • For more efficient schema management and evolution, the platform will automatically infer the output schema based on the fields selected by the SQL query.
  • Once the user is done editing their query, it is saved to the Data Mesh Pipeline, which will then be deployed as a long running, streaming SQL job.
Overview of the SQL Processor workflow.

Users typically iterate on their SQL query multiple times before deploying it. Validating and analyzing queries at runtime after deployment will not only slow down their iteration, but also make it difficult to automate schema evolution in Data Mesh.

To address this challenge, we have implemented a query validation service that can verify a Flink SQL query and provide a meaningful error message for violations in real time. This enables users to have prompt validation feedback while they are editing the query. We leverage Apache Flink’s internal Planner classes to parse and transform SQL queries without creating a fully-fledged streaming table environment. This makes the query service lightweight, scalable, and execution agnostic.

To effectively operate thousands of use cases at the platform layer, we built opinionated guardrails to limit some functionalities of Flink SQL. We plan on gradually expanding the supported capabilities over time. We implemented the guardrails by recursively inspecting the Calcite tree constructed from user’s query. If the tree contains nodes that we currently don’t support, the query will be rejected from being deployed. Additionally, we translate Flink’s internal exceptions containing cryptic error messages into more meaningful error messages for our users. We plan on continuing our investments into improving the guardrails, as having proper guardrails help to improve the user experience. Some ideas for the future include rules to reject expensive and suboptimal queries.

To help Data Mesh users iterate quickly on their business logic, we have built the Interactive Query Mode as part of the platform. Users can start live sampling their streaming data by executing a simple `SELECT * FROM <table>` query. Using the Interactive Query Mode, Data Mesh platform will execute the Flink SQL query and display the results in the UI in seconds. Since this is a Flink SQL query on streaming data, new results will continue to be delivered to the user in real-time.

Users can continue to iterate and modify their Flink SQL query and once they’re satisfied with their query output, they can save the query as part of their stream processing pipeline.

To provide this interactive experience, we maintain an always-running Flink Session Cluster that can run concurrent parameterized queries. These queries will output their data to a Mantis sink in order to stream the results back to the user’s browser.

An animated gif showing the interactive query mode in action
Interactive Query mode in action

Learnings from our journey

In hindsight, we wish we had invested in enabling Flink SQL on the DataMesh platform much earlier. If we had the Data Mesh SQL Processor earlier, we would’ve been able to avoid spending engineering resources to build smaller building blocks such as the Union Processor, Column Rename Processor, Projection and Filtering Processor.

Since we’ve productionized Data Mesh SQL Processor, we’ve seen excitement and quick adoption from our Data Mesh users. Thanks to the flexibility of Flink SQL, users have a new way to express their streaming transformation logic other than writing a custom processor using the low-level DataStream API.

While Flink SQL is a powerful tool, we view the Data Mesh SQL Processor as a complimentary addition to our platform. It is not meant to be a replacement for custom processors and Flink jobs using low-level DataStream API. Since SQL is a higher-level abstraction, users no longer have control over low-level Flink operators and state. This means that if state evolution is critical to the user’s business logic, then having complete control over the state can only be done through low-level abstractions like the DataStream API. Even with this limitation, we have seen that there are many new use cases that are unlocked through the Data Mesh SQL Processor.

Our early investment in guardrails has helped set clear expectations with our users and keep the operational burden manageable. It has allowed us to productionize queries and patterns that we are confident about supporting, while providing a framework to introduce new capabilities gradually.

Future of SQL on Data Mesh

While introducing the SQL Processor to the Data Mesh platform was a great step forward, we still have much more work to do in order to unlock the power of stream processing at Netflix. We’ve been working with our partner teams to prioritize and build the next set of features to extend the SQL Processor. These include stream enrichment using Slowly-Changing-Dimension (SCD) tables, temporal joins, and windowed aggregations.

Stay tuned for more updates!


Streaming SQL in Data Mesh was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.