All posts by Netflix Technology Blog

Data ingestion pipeline with Operation Management

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-ingestion-pipeline-with-operation-management-3c5c638740a8

by Varun Sekhri, Meenakshi Jindal, Burak Bacioglu

Introduction

At Netflix, to promote and recommend the content to users in the best possible way there are many Media Algorithm teams which work hand in hand with content creators and editors. Several of these algorithms aim to improve different manual workflows so that we show the personalized promotional image, trailer or the show to the user.

These media focused machine learning algorithms as well as other teams generate a lot of data from the media files, which we described in our previous blog, are stored as annotations in Marken. We designed a unique concept called Annotation Operations which allows teams to create data pipelines and easily write annotations without worrying about access patterns of their data from different applications.

Goals

Annotation Operations

Lets pick an example use case of identifying objects (like trees, cars etc.) in a video file. As described in the above picture

  • During the first run of the algorithm it identified 500 objects in a particular Video file. These 500 objects were stored as annotations of a specific schema type, let’s say Objects, in Marken.
  • The Algorithm team improved their algorithm. Now when we re-ran the algorithm on the same video file it created 600 annotations of schema type Objects and stored them in our service.

Notice that we cannot update the annotations from previous runs because we don’t know how many annotations a new algorithm run will result into. It is also very expensive for us to keep track of which annotation needs to be updated.

The goal is that when the consumer comes and searches for annotations of type Objects for the given video file then the following should happen.

  • Before Algo run 1, if they search they should not find anything.
  • After the completion of Algo run 1, the query should find the first set of 500 annotations.
  • During the time when Algo run 2 was creating the set of 600 annotations, clients search should still return the older 500 annotations.
  • When all of the 600 annotations are successfully created, they should replace the older set of 500.
  • So now when clients search annotations for Objects then they should get 600 annotations.

Does this remind you of something? This seems very similar (not exactly same) to a distributed transaction.

Typically, an algorithm run can have 2k-5k annotations. There are many naive solutions possible for this problem for example:

  • Write different runs in different databases. This is obviously very expensive.
  • Write algo runs into files. But we cannot search or present low latency retrievals from files
  • Etc.

Instead our challenge was to implement this feature on top of Cassandra and ElasticSearch databases because that’s what Marken uses. The solution which we present in this blog is not limited to annotations and can be used for any other domain which uses ES and Cassandra as well.

Marken Architecture

Marken’s architecture diagram is as follows. We refer the reader to our previous blog article for details. We use Cassandra as a source of truth where we store the annotations while we index annotations in ElasticSearch to provide rich search functionalities.

Marken Architecture

Our goal was to help teams at Netflix to create data pipelines without thinking about how that data is available to the readers or the client teams. Similarly, client teams don’t have to worry about when or how the data is written. This is what we call decoupling producer flows from clients of the data.

Lifecycle of a movie goes through a lot of creative stages. We have many temporary files which are delivered before we get to the final file of the movie. Similarly, a movie has many different languages and each of those languages can have different files delivered. Teams generally want to run algorithms and create annotations using all those media files.

Since algorithms can be run on a different permutations of how the media files are created and delivered we can simplify an algorithm run as follows

  • Annotation Schema Type — identifies the schema for the annotation generated by the Algorithm.
  • Annotation Schema Version — identifies the schema version of the annotation generated by the Algorithm.
  • PivotId — a unique string identifier which identifies the file or method which is used to generate the annotations. This could be the SHA hash of the file or simply the movie Identifier number.

Given above we can describe the data model for an annotation operation as follows.

{
"annotationOperationKeys": [
{
"annotationType": "string", ❶
"annotationTypeVersion": “integer”,
"pivotId": "string",
"operationNumber": “integer” ❷
}
],
"id": "UUID",
"operationStatus": "STARTED", ❸
"isActive": true ❹
}
  1. We already explained AnnotationType, AnnotationTypeVersion and PivotId above.
  2. OperationNumber is an auto incremented number for each new operation.
  3. OperationStatus — An operation goes through three phases, Started, Finished and Canceled.
  4. IsActive — Whether an operation and its associated annotations are active and searchable.

As you can see from the data model that the producer of an annotation has to choose an AnnotationOperationKey which lets them define how they want UPSERT annotations in an AnnotationOperation. Inside, AnnotationOperationKey the important field is pivotId and how it is generated.

Cassandra Tables

Our source of truth for all objects in Marken in Cassandra. To store Annotation Operations we have the following main tables.

  • AnnotationOperationById — It stores the AnnotationOperations
  • AnnotationIdByAnnotationOperationId — it stores the Ids of all annotations in an operation.

Since Cassandra is NoSql, we have more tables which help us create reverse indices and run admin jobs so that we can scan all annotation operations whenever there is a need.

ElasticSearch

Each annotation in Marken is also indexed in ElasticSearch for powering various searches. To record the relationship between annotation and operation we also index two fields

  • annotationOperationId — The ID of the operation to which this annotation belongs
  • isAnnotationOperationActive — Whether the operation is in an ACTIVE state.

APIs

We provide three APIs to our users. In following sections we describe the APIs and the state management done within the APIs.

StartAnnotationOperation

When this API is called we store the operation with its OperationKey (tuple of annotationType, annotationType Version and pivotId) in our database. This new operation is marked to be in STARTED state. We store all OperationIDs which are in STARTED state in a distributed cache (EVCache) for fast access during searches.

StartAnnotationOperation

UpsertAnnotationsInOperation

Users call this API to upsert the annotations in an Operation. They pass annotations along with the OperationID. We store the annotations and also record the relationship between the annotation IDs and the Operation ID in Cassandra. During this phase operations are in isAnnotationOperationActive = ACTIVE and operationStatus = STARTED state.

Note that typically in one operation run there can be 2K to 5k annotations which can be created. Clients can call this API from many different machines or threads for fast upserts.

UpsertAnnotationsInOperation

FinishAnnotationOperation

Once the annotations have been created in an operation clients call FinishAnnotationOperation which changes following

  • Marks the current operation (let’s say with ID2) to be operationStatus = FINISHED and isAnnotationOperationActive=ACTIVE.
  • We remove the ID2 from the Memcache since it is not in STARTED state.
  • Any previous operation (let’s say with ID1) which was ACTIVE is now marked isAnnotationOperationActive=FALSE in Cassandra.
  • Finally, we call updateByQuery API in ElasticSearch. This API finds all Elasticsearch documents with ID1 and marks isAnnotationOperationActive=FALSE.
FinishAnnotationOperation

Search API

This is the key part for our readers. When a client calls our search API we must exclude

  • any annotations which are from isAnnotationOperationActive=FALSE operations or
  • for which Annotation operations are currently in STARTED state. We do that by excluding the following from all queries in our system.

To achieve above

  1. We add a filter in our ES query to exclude isAnnotationOperationStatus is FALSE.
  2. We query EVCache to find out all operations which are in STARTED state. Then we exclude all those annotations with annotationId found in memcache. Using memcache allows us to keep latencies for our search low (most of our queries are less than 100ms).

Error handling

Cassandra is our source of truth so if an error happens we fail the client call. However, once we commit to Cassandra we must handle Elasticsearch errors. In our experience, all errors have happened when the Elasticsearch database is having some issue. In the above case, we created a retry logic for updateByQuery calls to ElasticSearch. If the call fails we push a message to SQS so we can retry in an automated fashion after some interval.

Future work

In near term, we want to write a high level abstraction single API which can be called by our clients instead of calling three APIs. For example, they can store the annotations in a blob storage like S3 and give us a link to the file as part of the single API.


Data ingestion pipeline with Operation Management was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Scaling Media Machine Learning at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/scaling-media-machine-learning-at-netflix-f19b400243

By Gustavo Carmo, Elliot Chow, Nagendra Kamath, Akshay Modi, Jason Ge, Wenbing Bai, Jackson de Campos, Lingyi Liu, Pablo Delgado, Meenakshi Jindal, Boris Chen, Vi Iyengar, Kelli Griggs, Amir Ziai, Prasanna Padmanabhan, and Hossein Taghavi

Figure 1 – Media Machine Learning Infrastructure

Introduction

In 2007, Netflix started offering streaming alongside its DVD shipping services. As the catalog grew and users adopted streaming, so did the opportunities for creating and improving our recommendations. With a catalog spanning thousands of shows and a diverse member base spanning millions of accounts, recommending the right show to our members is crucial.

Why should members care about any particular show that we recommend? Trailers and artworks provide a glimpse of what to expect in that show. We have been leveraging machine learning (ML) models to personalize artwork and to help our creatives create promotional content efficiently.

Our goal in building a media-focused ML infrastructure is to reduce the time from ideation to productization for our media ML practitioners. We accomplish this by paving the path to:

  • Accessing and processing media data (e.g. video, image, audio, and text)
  • Training large-scale models efficiently
  • Productizing models in a self-serve fashion in order to execute on existing and newly arriving assets
  • Storing and serving model outputs for consumption in promotional content creation

In this post, we will describe some of the challenges of applying machine learning to media assets, and the infrastructure components that we have built to address them. We will then present a case study of using these components in order to optimize, scale, and solidify an existing pipeline. Finally, we’ll conclude with a brief discussion of the opportunities on the horizon.

Infrastructure challenges and components

In this section, we highlight some of the unique challenges faced by media ML practitioners, along with the infrastructure components that we have devised to address them.

Media Access: Jasper

In the early days of media ML efforts, it was very hard for researchers to access media data. Even after gaining access, one needed to deal with the challenges of homogeneity across different assets in terms of decoding performance, size, metadata, and general formatting.

To streamline this process, we standardized media assets with pre-processing steps that create and store dedicated quality-controlled derivatives with associated snapshotted metadata. In addition, we provide a unified library that enables ML practitioners to seamlessly access video, audio, image, and various text-based assets.

Media Feature Storage: Amber Storage

Media feature computation tends to be expensive and time-consuming. Many ML practitioners independently computed identical features against the same asset in their ML pipelines.

To reduce costs and promote reuse, we have built a feature store in order to memoize features/embeddings tied to media entities. This feature store is equipped with a data replication system that enables copying data to different storage solutions depending on the required access patterns.

Compute Triggering and Orchestration: Amber Orchestration

Productized models must run over newly arriving assets for scoring. In order to satisfy this requirement, ML practitioners had to develop bespoke triggering and orchestration components per pipeline. Over time, these bespoke components became the source of many downstream errors and were difficult to maintain.

Amber is a suite of multiple infrastructure components that offers triggering capabilities to initiate the computation of algorithms with recursive dependency resolution.

Training Performance

Media model training poses multiple system challenges in storage, network, and GPUs. We have developed a large-scale GPU training cluster based on Ray, which supports multi-GPU / multi-node distributed training. We precompute the datasets, offload the preprocessing to CPU instances, optimize model operators within the framework, and utilize a high-performance file system to resolve the data loading bottleneck, increasing the entire training system throughput 3–5 times.

Serving and Searching

Media feature values can be optionally synchronized to other systems depending on necessary query patterns. One of these systems is Marken, a scalable service used to persist feature values as annotations, which are versioned and strongly typed constructs associated with Netflix media entities such as videos and artwork.

This service provides a user-friendly query DSL for applications to perform search operations over these annotations with specific filtering and grouping. Marken provides unique search capabilities on temporal and spatial data by time frames or region coordinates, as well as vector searches that are able to scale up to the entire catalog.

ML practitioners interact with this infrastructure mostly using Python, but there is a plethora of tools and platforms being used in the systems behind the scenes. These include, but are not limited to, Conductor, Dagobah, Metaflow, Titus, Iceberg, Trino, Cassandra, Elastic Search, Spark, Ray, MezzFS, S3, Baggins, FSx, and Java/Scala-based applications with Spring Boot.

Case study: scaling match cutting using the media ML infra

The Media Machine Learning Infrastructure is empowering various scenarios across Netflix, and some of them are described here. In this section, we showcase the use of this infrastructure through the case study of Match Cutting.

Background

Match Cutting is a video editing technique. It’s a transition between two shots that uses similar visual framing, composition, or action to fluidly bring the viewer from one scene to the next. It is a powerful visual storytelling tool used to create a connection between two scenes.

Figure 2 – a series of frame match cuts from Wednesday.

In an earlier post, we described how we’ve used machine learning to find candidate pairs. In this post, we will focus on the engineering and infrastructure challenges of delivering this feature.

Where we started

Initially, we built Match Cutting to find matches across a single title (i.e. either a movie or an episode within a show). An average title has 2k shots, which means that we need to enumerate and process ~2M pairs.

Figure 3- The original Match Cutting pipeline before leveraging media ML infrastructure components.

This entire process was encapsulated in a single Metaflow flow. Each step was mapped to a Metaflow step, which allowed us to control the amount of resources used per step.

Step 1

We download a video file and produce shot boundary metadata. An example of this data is provided below:

SB = {0: [0, 20], 1: [20, 30], 2: [30, 85], …}

Each key in the SB dictionary is a shot index and each value represents the frame range corresponding to that shot index. For example, for the shot with index 1 (the second shot), the value captures the shot frame range [20, 30], where 20 is the start frame and 29 is the end frame (i.e. the end of the range is exclusive while the start is inclusive).

Using this data, we then materialized individual clip files (e.g. clip0.mp4, clip1.mp4, etc) corresponding to each shot so that they can be processed in Step 2.

Step 2

This step works with the individual files produced in Step 1 and the list of shot boundaries. We first extract a representation (aka embedding) of each file using a video encoder (i.e. an algorithm that converts a video to a fixed-size vector) and use that embedding to identify and remove duplicate shots.

In the following example SB_deduped is the result of deduplicating SB:

# the second shot (index 1) was removed and so was clip1.mp4
SB_deduped = {0: [0, 20], 2: [30, 85], …}

SB_deduped along with the surviving files are passed along to step 3.

Step 3

We compute another representation per shot, depending on the flavor of match cutting.

Step 4

We enumerate all pairs and compute a score for each pair of representations. These scores are stored along with the shot metadata:

[
# shots with indices 12 and 729 have a high matching score
{shot1: 12, shot2: 729, score: 0.96},
# shots with indices 58 and 419 have a low matching score
{shot1: 58, shot2: 410, score: 0.02},

]

Step 5

Finally, we sort the results by score in descending order and surface the top-K pairs, where K is a parameter.

The problems we faced

This pattern works well for a single flavor of match cutting and finding matches within the same title. As we started venturing beyond single-title and added more flavors, we quickly faced a few problems.

Lack of standardization

The representations we extract in Steps 2 and Step 3 are sensitive to the characteristics of the input video files. In some cases such as instance segmentation, the output representation in Step 3 is a function of the dimensions of the input file.

Not having a standardized input file format (e.g. same encoding recipes and dimensions) created matching quality issues when representations across titles with different input files needed to be processed together (e.g. multi-title match cutting).

Wasteful repeated computations

Segmentation at the shot level is a common task used across many media ML pipelines. Also, deduplicating similar shots is a common step that a subset of those pipelines shares.

We realized that memoizing these computations not only reduces waste but also allows for congruence between algo pipelines that share the same preprocessing step. In other words, having a single source of truth for shot boundaries helps us guarantee additional properties for the data generated downstream. As a concrete example, knowing that algo A and algo B both used the same shot boundary detection step, we know that shot index i has identical frame ranges in both. Without this knowledge, we’ll have to check if this is actually true.

Gaps in media-focused pipeline triggering and orchestration

Our stakeholders (i.e. video editors using match cutting) need to start working on titles as quickly as the video files land. Therefore, we built a mechanism to trigger the computation upon the landing of new video files. This triggering logic turned out to present two issues:

  1. Lack of standardization meant that the computation was sometimes re-triggered for the same video file due to changes in metadata, without any content change.
  2. Many pipelines independently developed similar bespoke components for triggering computation, which created inconsistencies.

Additionally, decomposing the pipeline into modular pieces and orchestrating computation with dependency semantics did not map to existing workflow orchestrators such as Conductor and Meson out of the box. The media machine learning domain needed to be mapped with some level of coupling between media assets metadata, media access, feature storage, feature compute and feature compute triggering, in a way that new algorithms could be easily plugged with predefined standards.

This is where Amber comes in, offering a Media Machine Learning Feature Development and Productization Suite, gluing all aspects of shipping algorithms while permitting the interdependency and composability of multiple smaller parts required to devise a complex system.

Each part is in itself an algorithm, which we call an Amber Feature, with its own scope of computation, storage, and triggering. Using dependency semantics, an Amber Feature can be plugged into other Amber Features, allowing for the composition of a complex mesh of interrelated algorithms.

Match Cutting across titles

Step 4 entails a computation that is quadratic in the number of shots. For instance, matching across a series with 10 episodes with an average of 2K shots per episode translates into 200M comparisons. Matching across 1,000 files (across multiple shows) would take approximately 200 trillion computations.

Setting aside the sheer number of computations required momentarily, editors may be interested in considering any subset of shows for matching. The naive approach is to pre-compute all possible subsets of shows. Even assuming that we only have 1,000 video files, this means that we have to pre-compute 2¹⁰⁰⁰ subsets, which is more than the number of atoms in the observable universe!

Ideally, we want to use an approach that avoids both issues.

Where we landed

The Media Machine Learning Infrastructure provided many of the building blocks required for overcoming these hurdles.

Standardized video encodes

The entire Netflix catalog is pre-processed and stored for reuse in machine learning scenarios. Match Cutting benefits from this standardization as it relies on homogeneity across videos for proper matching.

Shot segmentation and deduplication reuse

Videos are matched at the shot level. Since breaking videos into shots is a very common task across many algorithms, the infrastructure team provides this canonical feature that can be used as a dependency for other algorithms. With this, we were able to reuse memoized feature values, saving on compute costs and guaranteeing coherence of shot segments across algos.

Orchestrating embedding computations

We have used Amber’s feature dependency semantics to tie the computation of embeddings to shot deduplication. Leveraging Amber’s triggering, we automatically initiate scoring for new videos as soon as the standardized video encodes are ready. Amber handles the computation in the dependency chain recursively.

Feature value storage

We store embeddings in Amber, which guarantees immutability, versioning, auditing, and various metrics on top of the feature values. This also allows other algorithms to be built on top of the Match Cutting output as well as all the intermediate embeddings.

Compute pairs and sink to Marken

We have also used Amber’s synchronization mechanisms to replicate data from the main feature value copies to Marken, which is used for serving.

Media Search Platform

Used to serve high-scoring pairs to video editors in internal applications via Marken.

The following figure depicts the new pipeline using the above-mentioned components:

Figure 4 – Match cutting pipeline built using media ML infrastructure components. Interactions between algorithms are expressed as a feature mesh, and each Amber Feature encapsulates triggering and compute.

Conclusion and Future Work

The intersection of media and ML holds numerous prospects for innovation and impact. We examined some of the unique challenges that media ML practitioners face and presented some of our early efforts in building a platform that accommodates the scaling of ML solutions.

In addition to the promotional media use cases we discussed, we are extending the infrastructure to facilitate a growing set of use cases. Here are just a few examples:

  • ML-based VFX tooling
  • Improving recommendations using a suite of content understanding models
  • Enriching content understanding ML and creative tooling by leveraging personalization signals and insights

In future posts, we’ll dive deeper into more details about the solutions built for each of the components we have briefly described in this post.

If you’re interested in media ML, we’re always looking for engineers and ML researchers and practitioners to join us!

Acknowledgments

Special thanks to Ben Klein, Fernando Amat Gil, Varun Sekhri, Guru Tahasildar, and Burak Bacioglu for contributing to ideas, designs, and discussions.


Scaling Media Machine Learning at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Discovering Creative Insights in Promotional Artwork

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/discovering-creative-insights-in-promotional-artwork-295e4d788db5

By Grace Tang, Aneesh Vartakavi, Julija Bagdonaite, Cristina Segalin, and Vi Iyengar

When members are shown a title on Netflix, the displayed artwork, trailers, and synopses are personalized. That means members see the assets that are most likely to help them make an informed choice. These assets are a critical source of information for the member to make a decision to watch, or not watch, a title. The stories on Netflix are multidimensional and there are many ways that a single story could appeal to different members. We want to show members the images, trailers, and synopses that are most helpful to them for making a watch decision.

In a previous blog post we explained how our artwork personalization algorithm can pick the best image for each member, but how do we create a good set of images to choose from? What data would you like to have if you were designing an asset suite?

In this blog post, we talk about two approaches to create effective artwork. Broadly, they are:

  1. The top-down approach, where we preemptively identify image properties to investigate, informed by our initial beliefs.
  2. The bottom-up approach, where we let the data naturally surface important trends.

The role of promotional artwork

Great promotional media helps viewers discover titles they’ll love. In addition to helping members quickly find titles already aligned with their tastes, they help members discover new content. We want to make artwork that is compelling and personally relevant, but we also want to represent the title authentically. We don’t want to make clickbait.

Here’s an example: Purple Hearts is a film about an aspiring singer-songwriter who commits to a marriage of convenience with a soon-to-deploy Marine. This title has storylines that might appeal to both fans of romance as well as military and war themes. This is reflected in our artwork suite for this title.

Images for the title “Purple Hearts”

Creative Insights

To create suites that are relevant, attractive, and authentic, we’ve relied on creative strategists and designers with intimate knowledge of the titles to recommend and create the right art for upcoming titles. To supplement their domain expertise, we’ve built a suite of tools to help them look for trends. By inspecting past asset performance from thousands of titles that have already been launched on Netflix, we achieve a beautiful intersection of art & science. However, there are some downsides to this approach: It is tedious to manually scrub through this large collection of data, and looking for trends this way could be subjective and vulnerable to confirmation bias.

Creators often have years of experience and expert knowledge on what makes a good piece of art. However, it is still useful to test our assumptions, especially in the context of the specific canvases we use on the Netflix product. For example, certain traditional art styles that are effective in traditional media like movie posters might not translate well to the Netflix UI in your living room. Compared to a movie poster or physical billboard, Netflix artwork on TV screens and mobile phones have very different size, aspect ratios, and amount of attention paid to them. As a consequence, we need to conduct research into the effectiveness of artwork on our unique user interfaces instead of extrapolating from established design principles.

Given these challenges, we develop data-driven recommendations and surface them to creators in an actionable, user-friendly way. These insights complement their extensive domain expertise in order to help them to create more effective asset suites. We do this in two ways, a top-down approach that can find known features that have worked well in the past, and a bottom-up approach that surfaces groups of images with no prior knowledge or assumptions.

Top-down approach

In our top-down approach, we describe an image using attributes and find features that make images successful. We collaborate with experts to identify a large set of features based on their prior knowledge and experience, and model them using Computer Vision and Machine Learning techniques. These features range from low level features like color and texture, to higher level features like the number of faces, composition, and facial expressions.

An example of the features we might capture for this image include: number of people (two), where they’re facing (facing each other), emotion (neutral to positive), saturation (low), objects present (military uniform)

We can use pre-trained models/APIs to create some of these features, like face detection and object labeling. We also build internal datasets and models for features where pre-trained models are not sufficient. For example, common Computer Vision models can tell us that an image contains two people facing each other with happy facial expressions — are they friends, or in a romantic relationship? We have built human-in-the-loop tools to help experts train ML models rapidly and efficiently, enabling them to build custom models for subjective and complex attributes.

Once we describe an image with features, we employ various predictive and causal methods to extract insights about which features are most important for effective artwork, which are leveraged to create artwork for upcoming titles. An example insight is that when we look across the catalog, we found that single person portraits tend to perform better than images featuring more than one person.

Single Character Portraits

Bottom-up approach

The top-down approach can deliver clear actionable insights supported by data, but these insights are limited to the features we are able to identify beforehand and model computationally. We balance this using a bottom-up approach where we do not make any prior guesses, and let the data surface patterns and features. In practice, we surface clusters of similar images and have our creative experts derive insights, patterns and inspiration from these groups.

One such method we use for image clustering is leveraging large pre-trained convolutional neural networks to model image similarity. Features from the early layers often model low level similarity like colors, edges, textures and shape, while features from the final layers group images depending on the task (eg. similar objects if the model is trained for object detection). We could then use an unsupervised clustering algorithm (like k-means) to find clusters within these images.

Using our example title above, one of the characters in Purple Hearts is in the Marines. Looking at clusters of images from similar titles, we see a cluster that contains imagery commonly associated with images of military and war, featuring characters in military uniform.

An example cluster of imagery related to military and war.

Sampling some images from the cluster above, we see many examples of soldiers or officers in uniform, some holding weapons, with serious facial expressions, looking off camera. A creator could find this pattern of images within the cluster below, confirm that the pattern has worked well in the past using performance data, and use this as inspiration to create final artwork.

A creator can draw inspiration from images in the cluster to the left, and use this to create effective artwork for new titles, such as the image for Purple Hearts on the right.

Similarly, the title has a romance storyline, so we find a cluster of images that show romance. From such a cluster, a creator could infer that showing close physical proximity and body language convey romance, and use this as inspiration to create the artwork below.

On the flip side, creatives can also use these clusters to learn what not to do. For example, here are images within the same cluster with military and war imagery above. If, hypothetically speaking, they were presented with historical evidence that these kinds of images didn’t perform well for a given canvas, a creative strategist could infer that highly saturated silhouettes don’t work as well in this context, confirm it with a test to establish a causal relationship, and decide not to use it for their title.

A creator can also spot patterns that didn’t work in the past, and avoid using it for future titles.

Member clustering

Another complementary technique is member clustering, where we group members based on their preferences. We can group them by viewing behavior, or also leverage our image personalization algorithm to find groups of members that positively responded to the same image asset. As we observe these patterns across many titles, we can learn to predict which user clusters might be interested in a title, and we can also learn which assets might resonate with these user clusters.

As an example, let’s say we are able to cluster Netflix members into two broad clusters — one that likes romance, and another that enjoys action. We can look at how these two groups of members responded to a title after its release. We might find that 80% of viewers of Purple Hearts belong to the romance cluster, while 20% belong to the action cluster. Furthermore, we might find that a representative romance fan (eg. the cluster centroid) responds most positively to images featuring the star couple in an embrace. Meanwhile, viewers in the action cluster respond most strongly to images featuring a soldier on the battlefield. As we observe these patterns across many titles, we can learn to predict which user clusters might be interested in similar upcoming titles, and we can also learn which themes might resonate with these user clusters. Insights like these can guide artwork creation strategy for future titles.

Conclusion

Our goal is to empower creatives with data-driven insights to create better artwork. Top-down and bottom-up methods approach this goal from different angles, and provide insights with different tradeoffs.

Top-down features have the benefit of being clearly explainable and testable. On the other hand, it is relatively difficult to model the effects of interactions and combinations of features. It is also challenging to capture complex image features, requiring custom models. For example, there are many visually distinct ways to convey a theme of “love”: heart emojis, two people holding hands, or people gazing into each others’ eyes and so on, which are all very visually different. Another challenge with top-down approaches is that our lower level features could miss the true underlying trend. For example, we might detect that the colors green and blue are effective features for nature documentaries, but what is really driving effectiveness may be the portrayal of natural settings like forests or oceans.

In contrast, bottom-up methods model complex high-level features and their combinations, but their insights are less explainable and subjective. Two users may look at the same cluster of images and extract different insights. However, bottom-up methods are valuable because they can surface unexpected patterns, providing inspiration and leaving room for creative exploration and interpretation without being prescriptive.

The two approaches are complementary. Unsupervised clusters can give rise to observable trends that we can then use to create new testable top-down hypotheses. Conversely, top-down labels can be used to describe unsupervised clusters to expose common themes within clusters that we might not have spotted at first glance. Our users synthesize information from both sources to design better artwork.

There are many other important considerations that our current models don’t account for. For example, there are factors outside of the image itself that might affect its effectiveness, like how popular a celebrity is locally, cultural differences in aesthetic preferences or how certain themes are portrayed, what device a member is using at the time and so on. As our member base becomes increasingly global and diverse, these are factors we need to account for in order to create an inclusive and personalized experience.

Acknowledgements

This work would not have been possible without our cross-functional partners in the creative innovation space. We would like to specifically thank Ben Klein and Amir Ziai for helping to build the technology we describe here.


Discovering Creative Insights in Promotional Artwork was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Scalable Annotation Service — Marken

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/scalable-annotation-service-marken-f5ba9266d428

Scalable Annotation Service — Marken

by Varun Sekhri, Meenakshi Jindal

Introduction

At Netflix, we have hundreds of micro services each with its own data models or entities. For example, we have a service that stores a movie entity’s metadata or a service that stores metadata about images. All of these services at a later point want to annotate their objects or entities. Our team, Asset Management Platform, decided to create a generic service called Marken which allows any microservice at Netflix to annotate their entity.

Annotations

Sometimes people describe annotations as tags but that is a limited definition. In Marken, an annotation is a piece of metadata which can be attached to an object from any domain. There are many different kinds of annotations our client applications want to generate. A simple annotation, like below, would describe that a particular movie has violence.

  • Movie Entity with id 1234 has violence.

But there are more interesting cases where users want to store temporal (time-based) data or spatial data. In Pic 1 below, we have an example of an application which is used by editors to review their work. They want to change the color of gloves to rich black so they want to be able to mark up that area, in this case using a blue circle, and store a comment for it. This is a typical use case for a creative review application.

An example for storing both time and space based data would be an ML algorithm that can identify characters in a frame and wants to store the following for a video

  • In a particular frame (time)
  • In some area in image (space)
  • A character name (annotation data)
Pic 1 : Editors requesting changes by drawing shapes like the blue circle shown above.

Goals for Marken

We wanted to create an annotation service which will have the following goals.

  • Allows to annotate any entity. Teams should be able to define their data model for annotation.
  • Annotations can be versioned.
  • The service should be able to serve real-time, aka UI, applications so CRUD and search operations should be achieved with low latency.
  • All data should be also available for offline analytics in Hive/Iceberg.

Schema

Since the annotation service would be used by anyone at Netflix we had a need to support different data models for the annotation object. A data model in Marken can be described using schema — just like how we create schemas for database tables etc.

Our team, Asset Management Platform, owns a different service that has a json based DSL to describe the schema of a media asset. We extended this service to also describe the schema of an annotation object.

{
"type": "BOUNDING_BOX", ❶
"version": 0, ❷
"description": "Schema describing a bounding box",
"keys": {
"properties": { ❸
"boundingBox": {
"type": "bounding_box",
"mandatory": true
},
"boxTimeRange": {
"type": "time_range",
"mandatory": true
}
}
}
}

In the above example, the application wants to represent in a video a rectangular area which spans a range of time.

  1. Schema’s name is BOUNDING_BOX
  2. Schemas can have versions. This allows users to make add/remove properties in their data model. We don’t allow incompatible changes, for example, users can not change the data type of a property.
  3. The data stored is represented in the “properties” section. In this case, there are two properties
  4. boundingBox, with type “bounding_box”. This is basically a rectangular area.
  5. boxTimeRange, with type “time_range”. This allows us to specify start and end time for this annotation.

Geometry Objects

To represent spatial data in an annotation we used the Well Known Text (WKT) format. We support following objects

  • Point
  • Line
  • MultiLine
  • BoundingBox
  • LinearRing

Our model is extensible allowing us to easily add more geometry objects as needed.

Temporal Objects

Several applications have a requirement to store annotations for videos that have time in it. We allow applications to store time as frame numbers or nanoseconds.

To store data in frames clients must also store frames per second. We call this a SampleData with following components:

  • sampleNumber aka frame number
  • sampleNumerator
  • sampleDenominator

Annotation Object

Just like schema, an annotation object is also represented in JSON. Here is an example of annotation for BOUNDING_BOX which we discussed above.

{  
"annotationId": { ❶
"id": "188c5b05-e648-4707-bf85-dada805b8f87",
"version": "0"
},
"associatedId": { ❷
"entityType": "MOVIE_ID",
"id": "1234"
},
"annotationType": "ANNOTATION_BOUNDINGBOX", ❸
"annotationTypeVersion": 1,
"metadata": { ❹
"fileId": "identityOfSomeFile",
"boundingBox": {
"topLeftCoordinates": {
"x": 20,
"y": 30
},
"bottomRightCoordinates": {
"x": 40,
"y": 60
}
},
"boxTimeRange": {
"startTimeInNanoSec": 566280000000,
"endTimeInNanoSec": 567680000000
}
}
}
  1. The first component is the unique id of this annotation. An annotation is an immutable object so the identity of the annotation always includes a version. Whenever someone updates this annotation we automatically increment its version.
  2. An annotation must be associated with some entity which belongs to some microservice. In this case, this annotation was created for a movie with id “1234”
  3. We then specify the schema type of the annotation. In this case it is BOUNDING_BOX.
  4. Actual data is stored in the metadata section of json. Like we discussed above there is a bounding box and time range in nanoseconds.

Base schemas

Just like in Object Oriented Programming, our schema service allows schemas to be inherited from each other. This allows our clients to create an “is-a-type-of” relationship between schemas. Unlike Java, we support multiple inheritance as well.

We have several ML algorithms which scan Netflix media assets (images and videos) and create very interesting data for example identifying characters in frames or identifying match cuts. This data is then stored as annotations in our service.

As a platform service we created a set of base schemas to ease creating schemas for different ML algorithms. One base schema (TEMPORAL_SPATIAL_BASE) has the following optional properties. This base schema can be used by any derived schema and not limited to ML algorithms.

  • Temporal (time related data)
  • Spatial (geometry data)

And another one BASE_ALGORITHM_ANNOTATION which has the following optional properties which is typically used by ML algorithms.

  • label (String)
  • confidenceScore (double) — denotes the confidence of the generated data from the algorithm.
  • algorithmVersion (String) — version of the ML algorithm.

By using multiple inheritance, a typical ML algorithm schema derives from both TEMPORAL_SPATIAL_BASE and BASE_ALGORITHM_ANNOTATION schemas.

{
"type": "BASE_ALGORITHM_ANNOTATION",
"version": 0,
"description": "Base Schema for Algorithm based Annotations",
"keys": {
"properties": {
"confidenceScore": {
"type": "decimal",
"mandatory": false,
"description": "Confidence Score",
},
"label": {
"type": "string",
"mandatory": false,
"description": "Annotation Tag",
},
"algorithmVersion": {
"type": "string",
"description": "Algorithm Version"
}
}
}
}

Architecture

Given the goals of the service we had to keep following in mind.

  • Our service will be used by a lot of internal UI applications hence the latency for CRUD and search operations must be low.
  • Besides applications we will have ML algorithm data stored. Some of this data can be on the frame level for videos. So the amount of data stored can be large. The databases we pick should be able to scale horizontally.
  • We also anticipated that the service will have high RPS.

Some other goals came from search requirements.

  • Ability to search the temporal and spatial data.
  • Ability to search with different associated and additional associated Ids as described in our Annotation Object data model.
  • Full text searches on many different fields in the Annotation Object
  • Stem search support

As time progressed the requirements for search only increased and we will discuss these requirements in detail in a different section.

Given the requirements and the expertise in our team we decided to choose Cassandra as the source of truth for storing annotations. For supporting different search requirements we chose ElasticSearch. Besides to support various features we have bunch of internal auxiliary services for eg. zookeeper service, internationalization service etc.

Marken architecture

Above picture represents the block diagram of the architecture for our service. On the left we show data pipelines which are created by several of our client teams to automatically ingest new data into our service. The most important of such a data pipeline is created by the Machine Learning team.

One of the key initiatives at Netflix, Media Search Platform, now uses Marken to store annotations and perform various searches explained below. Our architecture makes it possible to easily onboard and ingest data from Media algorithms. This data is used by various teams for eg. creators of promotional media (aka trailers, banner images) to improve their workflows.

Search

Success of Annotation Service (data labels) depends on the effective search of those labels without knowing much of input algorithms details. As mentioned above, we use the base schemas for every new annotation type (depending on the algorithm) indexed into the service. This helps our clients to search across the different annotation types consistently. Annotations can be searched either by simply data labels or with more added filters like movie id.

We have defined a custom query DSL to support searching, sorting and grouping of the annotation results. Different types of search queries are supported using the Elasticsearch as a backend search engine.

  • Full Text Search — Clients may not know the exact labels created by the ML algorithms. As an example, the label can be ‘shower curtain’. With full text search, clients can find the annotation by searching using label ‘curtain’ . We also support fuzzy search on the label values. For example, if the clients want to search ‘curtain’ but they wrongly typed ‘curtian` — annotation with the ‘curtain’ label will be returned.
  • Stem Search — With global Netflix content supported in different languages, our clients have the requirement to support stem search for different languages. Marken service contains subtitles for a full catalog of titles in Netflix which can be in many different languages. As an example for stem search , `clothing` and `clothes` can be stemmed to the same root word `cloth`. We use ElasticSearch to support stem search for 34 different languages.
  • Temporal Annotations Search — Annotations for videos are more relevant if it is defined along with the temporal (time range with start and end time) information. Time range within video is also mapped to the frame numbers. We support labels search for the temporal annotations within the provided time range/frame number also.
  • Spatial Annotation Search — Annotations for video or image can also include the spatial information. For example a bounding box which defines the location of the labeled object in the annotation.
  • Temporal and Spatial Search — Annotation for video can have both time range and spatial coordinates. Hence, we support queries which can search annotations within the provided time range and spatial coordinates range.
  • Semantics Search — Annotations can be searched after understanding the intent of the user provided query. This type of search provides results based on the conceptually similar matches to the text in the query, unlike the traditional tag based search which is expected to be exact keyword matches with the annotation labels. ML algorithms also ingest annotations with vectors instead of actual labels to support this type of search. User provided text is converted into a vector using the same ML model, and then search is performed with the converted text-to-vector to find the closest vectors with the searched vector. Based on the clients feedback, such searches provide more relevant results and don’t return empty results in case there are no annotations which exactly match to the user provided query labels. We support semantic search using Open Distro for ElasticSearch . We will cover more details on Semantic Search support in a future blog article.
Semantic search
  • Range Intersection — We recently started supporting the range intersection queries across multiple annotation types for a specific title in the real time. This allows the clients to search with multiple data labels (resulted from different algorithms so they are different annotation types) within video specific time range or the complete video, and get the list of time ranges or frames where the provided set of data labels are present. A common example of this query is to find the `James in the indoor shot drinking wine`. For such queries, the query processor finds the results of both data labels (James, Indoor shot) and vector search (drinking wine); and then finds the intersection of resulting frames in-memory.

Search Latency

Our client applications are studio UI applications so they expect low latency for the search queries. As highlighted above, we support such queries using Elasticsearch. To keep the latency low, we have to make sure that all the annotation indices are balanced, and hotspot is not created with any algorithm backfill data ingestion for the older movies. We followed the rollover indices strategy to avoid such hotspots (as described in our blog for asset management application) in the cluster which can cause spikes in the cpu utilization and slow down the query response. Search latency for the generic text queries are in milliseconds. Semantic search queries have comparatively higher latency than generic text searches. Following graph shows the average search latency for generic search and semantic search (including KNN and ANN search) latencies.

Average search latency
Semantic search latency

Scaling

One of the key challenges while designing the annotation service is to handle the scaling requirements with the growing Netflix movie catalog and ML algorithms. Video content analysis plays a crucial role in the utilization of the content across the studio applications in the movie production or promotion. We expect the algorithm types to grow widely in the coming years. With the growing number of annotations and its usage across the studio applications, prioritizing scalability becomes essential.

Data ingestions from the ML data pipelines are generally in bulk specifically when a new algorithm is designed and annotations are generated for the full catalog. We have set up a different stack (fleet of instances) to control the data ingestion flow and hence provide consistent search latency to our consumers. In this stack, we are controlling the write throughput to our backend databases using Java threadpool configurations.

Cassandra and Elasticsearch backend databases support horizontal scaling of the service with growing data size and queries. We started with a 12 nodes cassandra cluster, and scaled up to 24 nodes to support current data size. This year, annotations are added approximately for the Netflix full catalog. Some titles have more than 3M annotations (most of them are related to subtitles). Currently the service has around 1.9 billion annotations with data size of 2.6TB.

Analytics

Annotations can be searched in bulk across multiple annotation types to build data facts for a title or across multiple titles. For such use cases, we persist all the annotation data in iceberg tables so that annotations can be queried in bulk with different dimensions without impacting the real time applications CRUD operations latency.

One of the common use cases is when the media algorithm teams read subtitle data in different languages (annotations containing subtitles on a per frame basis) in bulk so that they can refine the ML models they have created.

Future work

There is a lot of interesting future work in this area.

  1. Our data footprint keeps increasing with time. Several times we have data from algorithms which are revised and annotations related to the new version are more accurate and in-use. So we need to do cleanups for large amounts of data without affecting the service.
  2. Intersection queries over a large scale of data and returning results with low latency is an area where we want to invest more time.

Acknowledgements

Burak Bacioglu and other members of the Asset Management Platform contributed in the design and development of Marken.


Scalable Annotation Service — Marken was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Ready-to-go sample data pipelines with Dataflow

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/ready-to-go-sample-data-pipelines-with-dataflow-17440a9e141d

by Jasmine Omeke, Obi-Ike Nwoke, Olek Gorajek

Intro

This post is for all data practitioners, who are interested in learning about bootstrapping, standardization and automation of batch data pipelines at Netflix.

You may remember Dataflow from the post we wrote last year titled Data pipeline asset management with Dataflow. That article was a deep dive into one of the more technical aspects of Dataflow and didn’t properly introduce this tool in the first place. This time we’ll try to give justice to the intro and then we will focus on one of the very first features Dataflow came with. That feature is called sample workflows, but before we start in let’s have a quick look at Dataflow in general.

Dataflow

Dataflow

Dataflow is a command line utility built to improve experience and to streamline the data pipeline development at Netflix. Check out this high level Dataflow help command output below:

$ dataflow --help
Usage: dataflow [OPTIONS] COMMAND [ARGS]...

Options:
--docker-image TEXT Url of the docker image to run in.
--run-in-docker Run dataflow in a docker container.
-v, --verbose Enables verbose mode.
--version Show the version and exit.
--help Show this message and exit.

Commands:
migration Manage schema migration.
mock Generate or validate mock datasets.
project Manage a Dataflow project.
sample Generate fully functional sample workflows.

As you can see Dataflow CLI is divided into four main subject areas (or commands). The most commonly used one is dataflow project, which helps folks in managing their data pipeline repositories through creation, testing, deployment and few other activities.

The dataflow migration command is a special feature, developed single handedly by Stephen Huenneke, to fully automate the communication and tracking of a data warehouse table changes. Thanks to the Netflix internal lineage system (built by Girish Lingappa) Dataflow migration can then help you identify downstream usage of the table in question. And finally it can help you craft a message to all the owners of these dependencies. After your migration has started Dataflow will also keep track of its progress and help you communicate with the downstream users.

Dataflow mock command is another standalone feature. It lets you create YAML formatted mock data files based on selected tables, columns and a few rows of data from the Netflix data warehouse. Its main purpose is to enable easy unit testing of your data pipelines, but it can technically be used in any other situations as a readable data format for small data sets.

All the above commands are very likely to be described in separate future blog posts, but right now let’s focus on the dataflow sample command.

Sample workflows

Dataflow sample workflows is a set of templates anyone can use to bootstrap their data pipeline project. And by “sample” we mean “an example”, like food samples in your local grocery store. One of the main reasons this feature exists is just like with food samples, to give you “a taste” of the production quality ETL code that you could encounter inside the Netflix data ecosystem.

All the code you get with the Dataflow sample workflows is fully functional, adjusted to your environment and isolated from other sample workflows that others generated. This pipeline is safe to run the moment it shows up in your directory. It will, not only, build a nice example aggregate table and fill it up with real data, but it will also present you with a complete set of recommended components:

  • clean DDL code,
  • proper table metadata settings,
  • transformation job (in a language of choice) wrapped in an optional WAP (Write, Audit, Publish) pattern,
  • sample set of data audits for the generated data,
  • and a fully functional unit test for your transformation logic.

And last, but not least, these sample workflows are being tested continuously as part of the Dataflow code change protocol, so you can be sure that what you get is working. This is one way to build trust with our internal user base.

Next, let’s have a look at the actual business logic of these sample workflows.

Business Logic

There are several variants of the sample workflow you can get from Dataflow, but all of them share the same business logic. This was a conscious decision in order to clearly illustrate the difference between various languages in which your ETL could be written in. Obviously not all tools are made with the same use case in mind, so we are planning to add more code samples for other (than classical batch ETL) data processing purposes, e.g. Machine Learning model building and scoring.

The example business logic we use in our template computes the top hundred movies/shows in every country where Netflix operates on a daily basis. This is not an actual production pipeline running at Netflix, because it is a highly simplified code but it serves well the purpose of illustrating a batch ETL job with various transformation stages. Let’s review the transformation steps below.

Step 1: on a daily basis, incrementally, sum up all viewing time of all movies and shows in every country

WITH STEP_1 AS (
SELECT
title_id
, country_code
, SUM(view_hours) AS view_hours
FROM some_db.source_table
WHERE playback_date = CURRENT_DATE
GROUP BY
title_id
, country_code
)

Step 2: rank all titles from most watched to least in every county

WITH STEP_2 AS (
SELECT
title_id
, country_code
, view_hours
, RANK() OVER (
PARTITION BY country_code
ORDER BY view_hours DESC
) AS title_rank
FROM STEP_1
)

Step 3: filter all titles to the top 100

WITH STEP_3 AS (
SELECT
title_id
, country_code
, view_hours
, title_rank
FROM STEP_2
WHERE title_rank <= 100
)

Now, using the above simple 3-step transformation we will produce data that can be written to the following Iceberg table:

CREATE TABLE IF NOT EXISTS ${TARGET_DB}.dataflow_sample_results (
title_id INT COMMENT "Title ID of the movie or show."
, country_code STRING COMMENT "Country code of the playback session."
, title_rank INT COMMENT "Rank of a given title in a given country."
, view_hours DOUBLE COMMENT "Total viewing hours of a given title in a given country."
)
COMMENT
"Example dataset brought to you by Dataflow. For more information on this
and other examples please visit the Dataflow documentation page."
PARTITIONED BY (
date DATE COMMENT "Playback date."
)
STORED AS ICEBERG;

As you can infer from the above table structure we are going to load about 19,000 rows into this table on a daily basis. And they will look something like this:

 sql> SELECT * FROM foo.dataflow_sample_results 
WHERE date = 20220101 and country_code = 'US'
ORDER BY title_rank LIMIT 5;

title_id | country_code | title_rank | view_hours | date
----------+--------------+------------+------------+----------
11111111 | US | 1 | 123 | 20220101
44444444 | US | 2 | 111 | 20220101
33333333 | US | 3 | 98 | 20220101
55555555 | US | 4 | 55 | 20220101
22222222 | US | 5 | 11 | 20220101
(5 rows)

With the business logic out of the way, we can now start talking about the components, or the boiler-plate, of our sample workflows.

Components

Let’s have a look at the most common workflow components that we use at Netflix. These components may not fit into every ETL use case, but are used often enough to be included in every template (or sample workflow). The workflow author, after all, has the final word on whether they want to use all of these patterns or keep only some. Either way they are here to start with, ready to go, if needed.

Workflow Definitions

Below you can see a typical file structure of a sample workflow package written in SparkSQL.

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

Above bolded files define a series of steps (a.k.a. jobs) their cadence, dependencies, and the sequence in which they should be executed.

This is one way we can tie components together into a cohesive workflow. In every sample workflow package there are three workflow definition files that work together to provide flexible functionality. The sample workflow code assumes a daily execution pattern, but it is very easy to adjust them to run at different cadence. For the workflow orchestration we use Netflix homegrown Maestro scheduler.

The main workflow definition file holds the logic of a single run, in this case one day-worth of data. This logic consists of the following parts: DDL code, table metadata information, data transformation and a few audit steps. It’s designed to run for a single date, and meant to be called from the daily or backfill workflows. This main workflow can also be called manually during development with arbitrary run-time parameters to get a feel for the workflow in action.

The daily workflow executes the main one on a daily basis for the predefined number of previous days. This is sometimes necessary for the purpose of catching up on some late arriving data. This is where we define a trigger schedule, notifications schemes, and update the “high water mark” timestamps on our target table.

The backfill workflow executes the main for a specified range of days. This is useful for restating data, most often because of a transformation logic change, but sometimes as a response to upstream data updates.

DDL

Often, the first step in a data pipeline is to define the target table structure and column metadata via a DDL statement. We understand that some folks choose to have their output schema be an implicit result of the transform code itself, but the explicit statement of the output schema is not only useful for adding table (and column) level comments, but also serves as one way to validate the transform logic.

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

Generally, we prefer to execute DDL commands as part of the workflow itself, instead of running outside of the schedule, because it simplifies the development process. See below example of hooking the table creation SQL file into the main workflow definition.

      - job:
id: ddl
type: Spark
spark:
script: $S3{./ddl/dataflow_sparksql_sample.sql}
parameters:
TARGET_DB: ${TARGET_DB}

Metadata

The metadata step provides context on the output table itself as well as the data contained within. Attributes are set via Metacat, which is a Netflix internal metadata management platform. Below is an example of plugging that metadata step in the main workflow definition

     - job:
id: metadata
type: Metadata
metacat:
tables:
- ${CATALOG}/${TARGET_DB}/${TARGET_TABLE}
owner: ${username}
tags:
- dataflow
- sample
lifetime: 123
column_types:
date: pk
country_code: pk
rank: pk

Transformation

The transformation step (or steps) can be executed in the developer’s language of choice. The example below is using SparkSQL.

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

Optionally, this step can use the Write-Audit-Publish pattern to ensure that data is correct before it is made available to the rest of the company. See example below:

      - template:
id: wap
type: wap
tables:
- ${CATALOG}/${DATABASE}/${TABLE}
write_jobs:
- job:
id: write
type: Spark
spark:
script: $S3{./src/sparksql_write.sql}

Audits

Audit steps can be defined to verify data quality. If a “blocking” audit fails, the job will halt and the write step is not committed, so invalid data will not be exposed to users. This step is optional and configurable, see a partial example of an audit from the main workflow below.

         data_auditor:
audits:
- function: columns_should_not_have_nulls
blocking: true
params:
table: ${TARGET_TABLE}
columns:
- title_id

High-Water-Mark Timestamp

A successful write will typically be followed by a metadata call to set the valid time (or high-water mark) of a dataset. This allows other processes, consuming our table, to be notified and start their processing. See an example high water mark job from the main workflow definition.

      - job:
id: hwm
type: HWM
metacat:
table: ${CATALOG}/${TARGET_DB}/${TARGET_TABLE}
hwm_datetime: ${EXECUTION_DATE}
hwm_timezone: ${EXECUTION_TIMEZONE}

Unit Tests

Unit test artifacts are also generated as part of the sample workflow structure. They consist of data mocks, the actual test code, and a simple execution harness depending on the workflow language. See the bolded file below.

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

These unit tests are intended to test one “unit” of data transform in isolation. They can be run during development to quickly capture code typos and syntax issues, or during automated testing/deployment phase, to make sure that code changes have not broken any tests.

We want unit tests to run quickly so that we can have continuous feedback and fast iterations during the development cycle. Running code against a production database can be slow, especially with the overhead required for distributed data processing systems like Apache Spark. Mocks allow you to run tests locally against a small sample of “real” data to validate your transformation code functionality.

Languages

Over time, the extraction of data from Netflix’s source systems has grown to encompass a wider range of end-users, such as engineers, data scientists, analysts, marketers, and other stakeholders. Focusing on convenience, Dataflow allows for these differing personas to go about their work seamlessly. A large number of our data users employ SparkSQL, pyspark, and Scala. A small but growing contingency of data scientists and analytics engineers use R, backed by the Sparklyr interface or other data processing tools, like Metaflow.

With an understanding that the data landscape and the technologies employed by end-users are not homogenous, Dataflow creates a malleable path toward. It solidifies different recipes or repeatable templates for data extraction. Within this section, we’ll preview a few methods, starting with sparkSQL and python’s manner of creating data pipelines with dataflow. Then we’ll segue into the Scala and R use cases.

To begin, after installing Dataflow, a user can run the following command to understand how to get started.

$ dataflow sample workflow --help                                                         
Dataflow (0.6.16)

Usage: dataflow sample workflow [OPTIONS] RECIPE [TARGET_PATH]

Create a sample workflow based on selected RECIPE and land it in the
specified TARGET_PATH.

Currently supported workflow RECIPEs are: spark-sql, pyspark,
scala and sparklyr.

If TARGET_PATH:
- if not specified, current directory is assumed
- points to a directory, it will be used as the target location

Options:
--source-path TEXT Source path of the sample workflows.
--workflow-shortname TEXT Workflow short name.
--workflow-id TEXT Workflow ID.
--skip-info Skip the info about the workflow sample.
--help Show this message and exit.

Once again, let’s assume we have a directory called stranger-data in which the user creates workflow templates in all four languages that Dataflow offers. To better illustrate how to generate the sample workflows using Dataflow, let’s look at the full command one would use to create one of these workflows, e.g:

$ cd stranger-data
$ dataflow sample workflow spark-sql ./sparksql-workflow

By repeating the above command for each type of transformation language we can arrive at the following directory structure

.
├── pyspark-workflow
│ ├── main.sch.yaml
│ ├── daily.sch.yaml
│ ├── backfill.sch.yaml
│ ├── ddl
│ │ └── ...
│ ├── src
│ │ └── ...
│ └── tox.ini
├── scala-workflow
│ ├── build.gradle
│ └── ...
├── sparklyR-workflow
│ └── ...
└── sparksql-workflow
└── ...

Earlier we talked about the business logic of these sample workflows and we showed the Spark SQL version of that example data transformation. Now let’s discuss different approaches to writing the data in other languages.

PySpark

This partial pySpark code below will have the same functionality as the SparkSQL example above, but it utilizes Spark dataframes Python interface.

def main(args, spark):

source_table_df = spark.table(f"{some_db}.{source_table})

viewing_by_title_country = (
source_table_df.select("title_id", "country_code",
"view_hours")
.filter(col("date") == date)
.filter("title_id IS NOT NULL AND view_hours > 0")
.groupBy("title_id", "country_code")
.agg(F.sum("view_hours").alias("view_hours"))
)

window = Window.partitionBy(
"country_code"
).orderBy(col("view_hours").desc())

ranked_viewing_by_title_country = viewing_by_title_country.withColumn(
"title_rank", rank().over(window)
)

ranked_viewing_by_title_country.filter(
col("title_rank") <= 100
).withColumn(
"date", lit(int(date))
).select(
"title_id",
"country_code",
"title_rank",
"view_hours",
"date",
).repartition(1).write.byName().insertInto(
target_table, overwrite=True
)

Scala

Scala is another Dataflow supported recipe that offers the same business logic in a sample workflow out of the box.

package com.netflix.spark

object ExampleApp {
import spark.implicits._

def readSourceTable(sourceDb: String, dataDate: String): DataFrame =
spark
.table(s"$someDb.source_table")
.filter($"playback_start_date" === dataDate)

def viewingByTitleCountry(sourceTableDF: DataFrame): DataFrame = {
sourceTableDF
.select($"title_id", $"country_code", $"view_hours")
.filter($"title_id".isNotNull)
.filter($"view_hours" > 0)
.groupBy($"title_id", $"country_code")
.agg(F.sum($"view_hours").as("view_hours"))
}

def addTitleRank(viewingDF: DataFrame): DataFrame = {
viewingDF.withColumn(
"title_rank", F.rank().over(
Window.partitionBy($"country_code").orderBy($"view_hours".desc)
)
)
}

def writeViewing(viewingDF: DataFrame, targetTable: String, dataDate: String): Unit = {
viewingDF
.select($"title_id", $"country_code", $"title_rank", $"view_hours")
.filter($"title_rank" <= 100)
.repartition(1)
.withColumn("date", F.lit(dataDate.toInt))
.writeTo(targetTable)
.overwritePartitions()
}

def main():
sourceTableDF = readSourceTable("some_db", "source_table", 20200101)
viewingDf = viewingByTitleCountry(sourceTableDF)
titleRankedDf = addTitleRank(viewingDF)
writeViewing(titleRankedDf)

R / sparklyR

As Netflix has a growing cohort of R users, R is the latest recipe available in Dataflow.

suppressPackageStartupMessages({
library(sparklyr)
library(dplyr)
})

...

main <- function(args, spark) {
title_df <- tbl(spark, g("{some_db}.{source_table}"))

title_activity_by_country <- title_df |>
filter(title_date == date) |>
filter(!is.null(title_id) & event_count > 0) |>
select(title_id, country_code, event_type) |>
group_by(title_id, country_code) |>
summarize(event_count = sum(event_type, na.rm = TRUE))

ranked_title_activity_by_country <- title_activity_by_country |>
group_by(country_code) |>
mutate(title_rank = rank(desc(event_count)))

top_25_title_by_country <- ranked_title_activity_by_country |>
ungroup() |>
filter(title_rank <= 25) |>
mutate(date = as.integer(date)) |>
select(
title_id,
country_code,
title_rank,
event_count,
date
)

top_25_title_by_country |>
sdf_repartition(partitions = 1) |>
spark_insert_table(target_table, mode = "overwrite")
}
main(args = args, spark = spark)
}

Conclusions

As you can see we try to make Netflix data engineering life easier by offering paved paths and suggestions on how to structure their code, while trying to keep the variety of options wide enough so they can pick and choose what works best for them in any particular case.

Having a well-defined set of defaults for data pipeline creation across Netflix makes onboarding easier, provides standardization and centralization best practices. Let’s review them below.

Onboarding

Ramping up on a new team or a business vertical always takes some effort, especially in a “highly aligned, loosely coupled” culture. Having a well-documented starting point removes some of the struggle that comes with starting from scratch and considerably speeds up the first iteration of the development cycle.

Standardization

Standardization makes life easier for new team members as well as those already familiar with the domain and tech stack.

Some transfer of work between people or teams is inevitable. Having standardized layout and patterns removes friction from this exchange. Also, code reviews and suggestions are easier to manage when working from a similar baseline.

Standardization also makes project layout more intuitive and minimizes risk of human error as the codebase evolves.

Centralized Best Practices

Data infrastructure evolves continually. Having easy access to a centralized set of good defaults is critical to ensure that best practices evolve along with the technology, and that users are aware of what’s the latest on the tech-stack menu.

Even better, Dataflow offers executable best practices, which present these concepts in the context of an actual use case. Instead of reading documentation, you can initialize a “real” project, change it as needed, and iterate from there.

Credits

Special thanks to Daniel Watson, Jim Hester, Stephen Huenneke, Girish Lingappa for their contributions to Dataflow sample workflows and to Andrea Hairston for the Dataflow logo design.

Next Episode

Hopefully you won’t need to wait another year to read about other features of Dataflow. Here are a few topics that we could write about next. Please have a look at the subjects below and, if you feel strongly about any of them, let us know in the comments section:

  • Branch driven deployment — to explain how Dataflow lets anyone customize their CI/CD jobs based on the git branch for easy testing in isolated environments.
  • Local SparkSQL unit testing— to clarify how Dataflow helps in making robust unit tests for Spark SQL transform code, with ease.
  • Data migrations made easy — to show how Dataflow can be used to plan a table migration, support the communication with downstream users and help in monitoring it to completion.


Ready-to-go sample data pipelines with Dataflow was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

For your eyes only: improving Netflix video quality with neural networks

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/for-your-eyes-only-improving-netflix-video-quality-with-neural-networks-5b8d032da09c

by Christos G. Bampis, Li-Heng Chen and Zhi Li

When you are binge-watching the latest season of Stranger Things or Ozark, we strive to deliver the best possible video quality to your eyes. To do so, we continuously push the boundaries of streaming video quality and leverage the best video technologies. For example, we invest in next-generation, royalty-free codecs and sophisticated video encoding optimizations. Recently, we added another powerful tool to our arsenal: neural networks for video downscaling. In this tech blog, we describe how we improved Netflix video quality with neural networks, the challenges we faced and what lies ahead.

How can neural networks fit into Netflix video encoding?

There are, roughly speaking, two steps to encode a video in our pipeline:

  1. Video preprocessing, which encompasses any transformation applied to the high-quality source video prior to encoding. Video downscaling is the most pertinent example herein, which tailors our encoding to screen resolutions of different devices and optimizes picture quality under varying network conditions. With video downscaling, multiple resolutions of a source video are produced. For example, a 4K source video will be downscaled to 1080p, 720p, 540p and so on. This is typically done by a conventional resampling filter, like Lanczos.
  2. Video encoding using a conventional video codec, like AV1. Encoding drastically reduces the amount of video data that needs to be streamed to your device, by leveraging spatial and temporal redundancies that exist in a video.

We identified that we can leverage neural networks (NN) to improve Netflix video quality, by replacing conventional video downscaling with a neural network-based one. This approach, which we dub “deep downscaler,” has a few key advantages:

  • A learned approach for downscaling can improve video quality and be tailored to Netflix content.
  • It can be integrated as a drop-in solution, i.e., we do not need any other changes on the Netflix encoding side or the client device side. Millions of devices that support Netflix streaming automatically benefit from this solution.
  • A distinct, NN-based, video processing block can evolve independently, be used beyond video downscaling and be combined with different codecs.

Of course, we believe in the transformative potential of NN throughout video applications, beyond video downscaling. While conventional video codecs remain prevalent, NN-based video encoding tools are flourishing and closing the performance gap in terms of compression efficiency. The deep downscaler is our pragmatic approach to improving video quality with neural networks.

Our approach to NN-based video downscaling

The deep downscaler is a neural network architecture designed to improve the end-to-end video quality by learning a higher-quality video downscaler. It consists of two building blocks, a preprocessing block and a resizing block. The preprocessing block aims to prefilter the video signal prior to the subsequent resizing operation. The resizing block yields the lower-resolution video signal that serves as input to an encoder. We employed an adaptive network design that is applicable to the wide variety of resolutions we use for encoding.

Architecture of the deep downscaler model, consisting of a preprocessing block followed by a resizing block.

During training, our goal is to generate the best downsampled representation such that, after upscaling, the mean squared error is minimized. Since we cannot directly optimize for a conventional video codec, which is non-differentiable, we exclude the effect of lossy compression in the loop. We focus on a robust downscaler that is trained given a conventional upscaler, like bicubic. Our training approach is intuitive and results in a downscaler that is not tied to a specific encoder or encoding implementation. Nevertheless, it requires a thorough evaluation to demonstrate its potential for broad use for Netflix encoding.

Improving Netflix video quality with neural networks

The goal of the deep downscaler is to improve the end-to-end video quality for the Netflix member. Through our experimentation, involving objective measurements and subjective visual tests, we found that the deep downscaler improves quality across various conventional video codecs and encoding configurations.

For example, for VP9 encoding and assuming a bicubic upscaler, we measured an average VMAF Bjøntegaard-Delta (BD) rate gain of ~5.4% over the traditional Lanczos downscaling. We have also measured a ~4.4% BD rate gain for VMAF-NEG. We showcase an example result from one of our Netflix titles below. The deep downscaler (red points) delivered higher VMAF at similar bitrate or yielded comparable VMAF scores at a lower bitrate.

Besides objective measurements, we also conducted human subject studies to validate the visual improvements of the deep downscaler. In our preference-based visual tests, we found that the deep downscaler was preferred by ~77% of test subjects, across a wide range of encoding recipes and upscaling algorithms. Subjects reported a better detail preservation and sharper visual look. A visual example is shown below.

Left: Lanczos downscaling; right: deep downscaler. Both videos are encoded with VP9 at the same bitrate and were upscaled to FHD resolution (1920×1080). You may need to zoom in to see the visual difference.

We also performed A/B testing to understand the overall streaming impact of the deep downscaler, and detect any device playback issues. Our A/B tests showed QoE improvements without any adverse streaming impact. This shows the benefit of deploying the deep downscaler for all devices streaming Netflix, without playback risks or quality degradation for our members.

How do we apply neural networks at scale efficiently?

Given our scale, applying neural networks can lead to a significant increase in encoding costs. In order to have a viable solution, we took several steps to improve efficiency.

  • The neural network architecture was designed to be computationally efficient and also avoid any negative visual quality impact. For example, we found that just a few neural network layers were sufficient for our needs. To reduce the input channels even further, we only apply NN-based scaling on luma and scale chroma with a standard Lanczos filter.
  • We implemented the deep downscaler as an FFmpeg-based filter that runs together with other video transformations, like pixel format conversions. Our filter can run on both CPU and GPU. On a CPU, we leveraged oneDnn to further reduce latency.

Integrating neural networks into our next-generation encoding platform

The Encoding Technologies and Media Cloud Engineering teams at Netflix have jointly innovated to bring Cosmos, our next-generation encoding platform, to life. Our deep downscaler effort was an excellent opportunity to showcase how Cosmos can drive future media innovation at Netflix. The following diagram shows a top-down view of how the deep downscaler was integrated within a Cosmos encoding microservice.

A top-down view of integrating the deep downscaler into Cosmos.

A Cosmos encoding microservice can serve multiple encoding workflows. For example, a service can be called to perform complexity analysis for a high-quality input video, or generate encodes meant for the actual Netflix streaming. Within a service, a Stratum function is a serverless layer dedicated to running stateless and computationally-intensive functions. Within a Stratum function invocation, our deep downscaler is applied prior to encoding. Fueled by Cosmos, we can leverage the underlying Titus infrastructure and run the deep downscaler on all our multi-CPU/GPU environments at scale.

What lies ahead

The deep downscaler paves the path for more NN applications for video encoding at Netflix. But our journey is not finished yet and we strive to improve and innovate. For example, we are studying a few other use cases, such as video denoising. We are also looking at more efficient solutions to applying neural networks at scale. We are interested in how NN-based tools can shine as part of next-generation codecs. At the end of the day, we are passionate about using new technologies to improve Netflix video quality. For your eyes only!

Acknowledgments

We would like to acknowledge the following individuals for their help with the deep downscaler project:

Lishan Zhu, Liwei Guo, Aditya Mavlankar, Kyle Swanson and Anush Moorthy (Video Image and Encoding team), Mariana Afonso and Lukas Krasula (Video Codecs and Quality team), Ameya Vasani (Media Cloud Engineering team), Prudhvi Kumar Chaganti (Streaming Encoding Pipeline team), Chris Pham and Andy Rhines (Data Science and Engineering team), Amer Ather (Netflix performance team), the Netflix Metaflow team and Prof. Alan Bovik (University of Texas at Austin).


For your eyes only: improving Netflix video quality with neural networks was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Match Cutting at Netflix: Finding Cuts with Smooth Visual Transitions

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/match-cutting-at-netflix-finding-cuts-with-smooth-visual-transitions-31c3fc14ae59

By Boris Chen, Kelli Griggs, Amir Ziai, Yuchen Xie, Becky Tucker, Vi Iyengar, Ritwik Kumar

Creating Media with Machine Learning episode 1

Introduction

At Netflix, part of what we do is build tools to help our creatives make exciting videos to share with the world. Today, we’d like to share some of the work we’ve been doing on match cuts.

In film, a match cut is a transition between two shots that uses similar visual framing, composition, or action to fluidly bring the viewer from one scene to the next. It is a powerful visual storytelling tool used to create a connection between two scenes.

[Spoiler alert] consider this scene from Squid Game:

The players voted to leave the game after red-light green-light, and are back in the real world. After a rough night, Gi Hung finds another calling card and considers returning to the game. As he waits for the van, a series of powerful match cuts begins, showing the other characters doing the exact same thing. We never see their stories, but because of the way it was edited, we instinctively understand that they made the same decision. This creates an emotional bond between these characters and ties them together.

A more common example is a cut from an older person to a younger person (or vice versa), usually used to signify a flashback (or flashforward). This is sometimes used to develop the story of a character. This could be done with words verbalized by a narrator or a character, but that could ruin the flow of a film, and it is not nearly as elegant as a single well executed match cut.

An example from Oldboy. A child wipes their eyes on a train, which cuts to a flashback of a younger child also wiping their eyes. We as the viewer understand that the next scene must be from this child’s upbringing.
A flashforward from a young Indian Jones to an older Indian Jones conveys to the viewer that what we just saw about his childhood makes him the person he is today.

Here is one of the most famous examples from Stanley Kubrik’s 2001: A Space Odyssey. A bone is thrown into the air. As it spins, a single instantaneous cut brings the viewer from the prehistoric first act of the film into the futuristic second act. This highly artistic cut suggests that mankind’s evolution from primates to space technology is natural and inevitable.

Match cutting is also widely used outside of film. They can be found in trailers, like this sequence of shots from the trailer for Firefly Lane.

Match cutting is considered one of the most difficult video editing techniques, because finding a pair of shots that match can take days, if not weeks. An editor typically watches one or more long-form videos and relies on memory or manual tagging to identify shots that would match to a reference shot observed earlier.

A typical two hour movie might have around 2,000 shots, which means there are roughly 2 million pairs of shots to compare. It quickly becomes impossible to do this many comparisons manually, especially when trying to find match cuts across a 10 episode series, or multiple seasons of a show, or across multiple different shows.

What’s needed in the art of match cutting is tools to help editors find shots that match well together, which is what we’ve started building.

Our Initial Approach

Collecting training data is much more difficult compared to more common computer vision tasks. While some types of match cuts are more obvious, others are more subtle and subjective.

For instance, consider this match cut from Lawrence of Arabia. A man blows a match out, which cuts into a long, silent shot of a sunrise. It’s difficult to explain why this works, but many creatives recognize this as one of the greatest match cuts in film.

To avoid such complexities, we started with a more well-defined flavor of match cuts: ones where the visual framing of a person is aligned, aka frame matching. This came from the intuition of our video editors, who said that a large percentage of match cuts are centered around matching the silhouettes of people.

Frame matches from Stranger Things.

We tried several approaches, but ultimately what worked well for frame matching was instance segmentation. The output of segmentation models gives us a pixel mask of which pixels belong to which objects. We take the segmentation output of two different frames, and compute intersection over union (IoU) between the two. We then rank pairs using IoU and surface high-scoring pairs as candidates.

A few other details were added along the way. To deal with not having to brute force every single pair of frames, we only took the middle frame of each shot, since many frames look visually similar within a single shot. To deal with similar frames from different shots, we performed image deduplication upfront. In our early research, we simply discarded any mask that wasn’t a person to keep things simple. Later on, we added non-person masks back to be able to find frame match cuts of animals and objects.

A series of frame match cuts of animals from Our planet.
Object frame match from Paddington 2.

Action and Motion

At this point, we decided to move onto a second flavor of match cutting: action matching. This type of match cut involves the continuation of motion of object or person A’s motion to the object or person B’s motion in another shot (A and B can be the same so long as the background, clothing, time of day, or some other attribute changes between the two shots).

An action match cut from Resident Evil.
A series of action mat cuts from Extraction, Red Notice, Sandman, Glow, Arcane, Sea Beast, and Royalteen.

To capture this type of information, we had to move beyond image level and extend into video understanding, action recognition, and motion. Optical flow is a common technique used to capture motion, so that’s what we tried first.

Consider the following shots and the corresponding optical flow representations:

Shots from The Umbrella Academy.

A red pixel means the pixel is moving to the right. A blue pixel means the pixel is moving to the left. The intensity of the color represents the magnitude of the motion. The optical flow representations on the right show a temporal average of all the frames. While averaging can be a simple way to match the dimensionality of the data for clips of different duration, the downside is that some valuable information is lost.

When we substituted optical flow in as the shot representations (replacing instance segmentation masks) and used cosine similarity in place of IoU, we found some interesting results.

Shots from The Umbrella Academy.

We saw that a large percentage of the top matches were actually matching based on similar camera movement. In the example above, purple in the optical flow diagram means the pixel is moving up. This wasn’t what we were expecting, but it made sense after we saw the results. For most shots, the number of background pixels outnumbers the number of foreground pixels. Therefore, it’s not hard to see why a generic similarity metric giving equal weight to each pixel would surface many shots with similar camera movement.

Here are a couple of matches found using this method:

Camera movement match cut from Bridgerton.
Camera movement match cut from Blood & Water.

While this wasn’t what we were initially looking for, our video editors were delighted by this output, so we decided to ship this feature as is.

Our research into true action matching still remains as future work, where we hope to leverage action recognition and foreground-background segmentation.

Match cutting system

The two flavors of match cutting we explored share a number of common components. We realized that we can break the process of finding matching pairs into five steps.

System diagram for match cutting. The input is a video file (film or series episode) and the output is K match cut candidates of the desired flavor. Each colored square represents a different shot. The original input video is broken into a sequence of shots in step 1. In Step 2, duplicate shots are removed (in this example the fourth shot is removed). In step 3, we compute a representation of each shot depending on the flavor of match cutting that we’re interested in. In step 4 we enumerate all pairs and compute a score for each pair. Finally, in step 5, we sort pairs and extract the top K (e.g. K=3 in this illustration).

1- Shot segmentation

Movies, or episodes in a series, consist of a number of scenes. Scenes typically transpire in a single location and continuous time. Each scene can be one or many shots- where a shot is defined as a sequence of frames between two cuts. Shots are a very natural unit for match cutting, and our first task was to segment a movie into shots.

Stranger Things season 1 episode 1 broken down into scenes and shots.

Shots are typically a few seconds long, but can be much shorter (less than a second) or minutes long in rare cases. Detecting shot boundaries is largely a visual task and very accurate computer vision algorithms have been designed and are available. We used an in-house shot segmentation algorithm, but similar results can be achieved with open source solutions such as PySceneDetect and TransNet v2.

2- Shot deduplication

Our early attempts surfaced many near-duplicate shots. Imagine two people having a conversation in a scene. It’s common to cut back and forth as each character delivers a line.

A dialogue sequence from Stranger Things Season 1.

These near-duplicate shots are not very interesting for match cutting and we quickly realized that we need to filter them out. Given a sequence of shots, we identified groups of near-duplicate shots and only retained the earliest shot from each group.

Identifying near-duplicate shots

Given the following pair of shots, how do you determine if the two are near-duplicates?

Near-duplicate shots from Stranger Things.

You would probably inspect the two visually and look for differences in colors, presence of characters and objects, poses, and so on. We can use computer vision algorithms to mimic this approach. Given a shot, we can use an algorithm that’s been trained on a large dataset of videos (or images) and can describe it using a vector of numbers.

An encoder represents a shot from Stranger Things using a vector of numbers.

Given this algorithm (typically called an encoder in this context), we can extract a vector (aka embedding) for a pair of shots, and compute how similar they are. The vectors that such encoders produce tend to be high dimensional (hundreds or thousands of dimensions).

To build some intuition for this process, let’s look at a contrived example with 2 dimensional vectors.

Three shots from Stranger Things and the corresponding vector representations.

The following is a depiction of these vectors:

Shots 1 and 3 are near-duplicates. The vectors representing these shots are close to each other. All shots are from Stranger Things.

Shots 1 and 3 are near-duplicates and we see that vectors 1 and 3 are close to each other. We can quantify closeness between a pair of vectors using cosine similarity, which is a value between -1 and 1. Vectors with cosine similarity close to 1 are considered similar.

The following table shows the cosine similarity between pairs of shots:

Shots 1 and 3 have high cosine similarity (0.96) and are considered near-duplicates while shots 1 and 2 have a smaller cosine similarity value (0.42) and are not considered near-duplicates. Note that the cosine similarity of a vector with itself is 1 (i.e. it’s perfectly similar to itself) and that cosine similarity is commutative. All shots are from Stranger Things.

This approach helps us to formalize a concrete algorithmic notion of similarity.

3- Compute representations

Steps 1 and 2 are agnostic to the flavor of match cutting that we’re interested in finding. This step is meant for capturing the matching semantics that we are interested in. As we discussed earlier, for frame match cutting, this can be instance segmentation, and for camera movement, we can use optical flow.

However, there are many other possible options to represent each shot that can help us do the matching. These can be heuristically defined ahead of time based on our knowledge of the flavors, or can be learned from labeled data.

4- Compute pair scores

In this step, we compute a similarity score for all pairs. The similarity score function takes a pair of representations and produces a number. The higher this number, the more similar the pairs are deemed to be.

Steps 3 and 4 for a pair of shots from Stranger Things. In this example the representation is the person instance segmentation mask and the metric is IoU.

5- Extract top-K results

Similar to the first two steps, this step is also agnostic to the flavor. We simply rank pairs by the computed score in step 4, and take the top K (a parameter) pairs to be surfaced to our video editors.

Using this flexible abstraction, we have been able to explore many different options by picking different concrete implementations for steps 3 and 4.

Dataset

How well does this system work? To answer this question, we decided to collect a labeled dataset of approximately 20k labeled pairs. Each pair was annotated by 3 video editors. For frame match cutting, the three video editors were in perfect agreement (i.e. all three selected the same label) 84% of the time. For motion match cutting, which is a more nuanced and subjective task, perfect agreement was 75%.

We then took the majority label for each pair and used it to evaluate our model.

We started with 100 movies, which produced 128k shots and 8.2 billion unique pairs. This diagram depicts the process of reducing this set down to the final set of 19,305 pairs that were annotated.

Evaluation

Binary classification with frozen embeddings

With the above dataset with binary labels, we are armed to train our first model. We extracted fixed embeddings from a variety of image, video, and audio encoders (a model or algorithm that extracts a representation given a video clip) for each pair and then aggregated the results into a single feature vector to learn a classifier on top of.

We extracted fixed embeddings using the same encoder for each shot. Then we aggregated the embeddings and passed the aggregation results to a classification model.

We surface top ranking pairs to video editors. A high quality match cutting system places match cuts at the top of the list by producing higher scores. We used Average Precision (AP) as our evaluation metric. AP is an information retrieval metric that is suitable for ranking scenarios such as ours. AP ranges between 0 and 1, where higher values reflect a higher quality model.

The following table summarizes our results:

Reporting AP on the test set. Baseline is a random ranking of the pairs, which for AP is equivalent to the positive prevalence of each task in expectation.

EfficientNet7 and R(2+1)D perform best for frame and motion respectively.

Metric learning

A second approach we considered was metric learning. This approach gives us transformed embeddings which can be indexed and retrieved using Approximate Nearest Neighbor (ANN) methods.

Reporting AP on the test set. Baseline is a random ranking of the pairs similar to the previous section.

Leveraging ANN, we have been able to find matches across hundreds of shows (on the order of tens of millions of shots) in seconds.

If you’re interested in more technical details make sure you take a look at our preprint paper here.

Conclusion

There are many more ideas that have yet to be tried: other types of match cuts such as action, light, color, and sound, better representations, and end-to-end model training, just to name a few.

Match cuts from Partner Track.
An action match cut from Lost In Space and Cowboy Bebop.
A series of match cuts from 1899.

We’ve only scratched the surface of this work and will continue to build tools like this to empower our creatives. If this type of work interests you, we are always looking for collaboration opportunities and hiring great machine learning engineers, researchers, and interns to help build exciting tools.

We’ll leave you with this teaser for Firefly Lane, edited by Aly Parmelee, which was the first piece made with the help of the match cutting tool:


Match Cutting at Netflix: Finding Cuts with Smooth Visual Transitions was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Helping VFX studios pave a path to the cloud

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/helping-vfx-studios-pave-a-path-to-the-cloud-d308ee43c079

By: Peter Cioni (Netflix), Alex Schworer (Netflix), Mac Moore (Conductor Tech.), Rachel Kelley (AWS), Ranjit Raju (AWS)

Rendering is core to the the VFX process

VFX studios around the world create amazing imagery for Netflix productions. Nearly every show that is produced today includes digital visual effects, from the creatures in Stranger Things, to recreating historic London in Bridgerton.

Netflix production teams work with a global roster of VFX studios (both large and small) and their artists to create this amazing imagery. But it’s not easy: to pull this off, VFX studios need to build and operate serious technical infrastructure (compute, storage, networking, and software licensing), otherwise known as a “render farm.”

Rendering is the final step in the VFX creation process, and processing on a render farm often can take several hours to complete just a single frame of a show, even when this process runs on the latest high-end hardware. Many shows have needs that exceed 100,000 frames, so aggregate rendering time can impact the timely delivery of a show on Netflix.

We’ve found that when VFX teams dedicate more compute capacity to rendering, they are able to handle more projects, iterations, and schedule crunches while maintaining on-time deliveries. This ultimately results in more compelling entertainment for Netflix members.

“Without cloud-based rendering, this ambitious project would not have met its targeted delivery date! Over the last four months, cloud-based rendering has accounted for over 50% of all frames rendered for the show, and has given the VFX vendor the ability to consistently deliver 4K UHD EXRs.” — Glenn Kelly, VFX Producer, Dance Monsters

The infrastructure challenge

For small and mid-sized VFX studios, scaling up and managing infrastructure is a perennial challenge: it takes considerable capital, technical talent, and scale to get the most out of a render farm, especially when building out infrastructure on-premises or in a local datacenter. At the same time, with VFX complexity and scale demands by studios like Netflix reaching new levels, it’s really hard for VFX teams to accurately estimate how much infrastructure is too much (or too little!).

Example rendering workload over a project’s lifespan vs the capability of on-premises infrastructure (in cores required)

Cloud infrastructure providers like AWS offer an attractive option for VFX rendering with pay as you go pricing, especially when combined with cost effective interruptible instances, like Amazon Elastic Compute Cloud (EC2) Spot instances.

But there’s a problem: while the cloud has potential to enable more creativity with enhanced flexibility, VFX studios often have trouble finding the technical resources they need in order to tap into the cloud. Every VFX studio has a slightly different architecture and workflow, and a one-size-fits-all solution often isn’t enough to bridge the gap.

Forging relationships to assist VFX studios

Netflix is proud to announce that we have teamed-up with key partners AWS and Conductor Technologies to provide our diverse roster of VFX studios around the globe with special access to essential resources that simplify the migration path to cloud infrastructure. As a result of these collaborations with AWS and Conductor Technologies, VFX studios working on Netflix projects receive dedicated technical support, hands-on solutions architecture engagement, and streamlined rate cards for both compute pricing and licensing costs. VFX studios of varying sizes and locations can leverage these solutions to meet the unique rendering needs of their productions.

AWS

AWS provides a suite of services that a VFX studio, regardless of size, can use to leverage the cloud, including AWS Thinkbox Deadline, Amazon File Cache, and Render Farm Deployment Kit on AWS (RFDK). Rendering on AWS provides the flexibility to control how quickly a project is completed. Once a rendering pipeline is integrated with AWS, studios can scale rendering workloads to thousands, or even tens of thousands, of cores in minutes. They can also scale down just as quickly as they scale up, providing incredible compute elasticity and cost control. Netflix is collaborating with AWS to help VFX studios by connecting them to the best AWS resources to help get their render workloads up and running on the cloud.

“Cloud technology has introduced new ways for studios and artists across the globe to create incredible content,” said Antony Passemard, general manager of Creative Tools at AWS. “We look forward to working alongside Netflix to enable access for more creators to streamlined infrastructure and high-performance compute power on the world’s leading cloud. This partnership will be compelling for any company that wants to bring the breadth and depth of the AWS portfolio into their workflows, built with the highest standards for security, speed, and resilience.”

Conductor Technologies

Netflix also has a collaboration with Conductor Technologies, a SaaS platform that enables VFX studios to leverage cloud rendering infrastructure by streamlining the transition to cloud-based workflows. Conductor works on three simple principles: ease of use, collaboration, and optimizing turnaround time. It supports the industry’s most widely used software applications — via direct plug-ins, and is available on multi-cloud platform services. Additionally, Conductor supports render management systems — including AWS Thinkbox Deadline and Pixar’s Tractor.

The collaboration is designed to help studios of all sizes within Netflix’s ecosystem tap into near-infinite compute in a matter of minutes, offload their render workloads, and reduce the overhead of compute and licensing costs with the Netflix and Conductor pricing agreement.

“Netflix has repeatedly proven itself a pioneer in the entertainment industry, and embracing the cloud for rendering at scale furthers this pattern. We’re thrilled to help Netflix reduce barriers to digital content production and look forward to seeing what incredible worlds are brought to life as a result,” said Mac Moore, Conductor CEO.

Just the beginning

This announcement is just the start, and we hope to expand the scope of technical solutions that our VFX studios around the globe can access as part of our relationship with these vendor partners and others.

Ultimately, Netflix is committed to supporting a healthy VFX ecosystem. In establishing this initiative, our goal is to ensure that the studios that work on Netflix productions have access to the technical resources they need to create amazing stories for our members to enjoy. This program is just one example of the many ways Netflix strives to entertain the world.


Helping VFX studios pave a path to the cloud was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

New Series: Creating Media with Machine Learning

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/new-series-creating-media-with-machine-learning-5067ac110bcd

By Vi Iyengar, Keila Fong, Hossein Taghavi, Andy Yao, Kelli Griggs, Boris Chen, Cristina Segalin, Apurva Kansara, Grace Tang, Billur Engin, Amir Ziai, James Ray, Jonathan Solorzano-Hamilton

Welcome to the first post in our multi-part series on how Netflix is developing and using machine learning (ML) to help creators make better media — from TV shows to trailers to movies to promotional art and so much more.

Media is at the heart of Netflix. It’s our medium for delivering a range of emotions and experiences to our members. Through each engagement, media is how we bring our members continued joy.

This blog series will take you behind the scenes, showing you how we use the power of machine learning to create stunning media at a global scale.

At Netflix, we launch thousands of new TV shows and movies every year for our members across the globe. Each title is promoted with a custom set of artworks and video assets in support of helping each title find their audience of fans. Our goal is to empower creators with innovative tools that support them in effectively and efficiently create the best media possible.

With media-focused ML algorithms, we’ve brought science and art together to revolutionize how content is made. Here are just a few examples:

  • We maintain a growing suite of video understanding models that categorize characters, storylines, emotions, and cinematography. These timecode tags enable efficient discovery, freeing our creators from hours of categorizing footage so they can focus on creative decisions instead.
  • We arm our creators with rich insights derived from our personalization system, helping them better understand our members and gain knowledge to produce content that maximizes their joy.
  • We invest in novel algorithms for bringing hard-to-execute editorial techniques easily to creators’ fingertips, such as match cutting and automated rotoscoping/matting.

One of our competitive advantages is the instant feedback we get from our members and creator teams, like the success of assets for content choosing experiences and internal asset creation tools. We use these measurements to constantly refine our research, examining which algorithms and creative strategies we invest in. The feedback we collect from our members also powers our causal machine learning algorithms, providing invaluable creative insights on asset generation.

In this blog series, we will explore our media-focused ML research, development, and opportunities related to the following areas:

  • Computer vision: video understanding search and match cut tools
  • VFX and Computer graphics: matting/rotoscopy, volumetric capture to digitize actors/props/sets, animation, and relighting
  • Audio and Speech
  • Content: understanding, extraction, and knowledge graphs
  • Infrastructure and paradigms

We are continuously investing in the future of media-focused ML. One area we are expanding into is multimodal content understanding — a fundamental ML research that utilizes multiple sources of information or modality (e.g. video, audio, closed captions, scripts) to capture the full meaning of media content. Our teams have demonstrated value and observed success by modeling different combinations of modalities, such as video and text, video and audio, script alone, as well as video, audio and scripts together. Multimodal content understanding is expected to solve the most challenging problems in content production, VFX, promo asset creation, and personalization.

We are also using ML to transform the way we create Netflix TV shows and movies. Our filmmakers are embracing Virtual Production (filming on specialized light and MoCap stages while being able to view a virtual environment and characters). Netflix is building prototype stages and developing deep learning algorithms that will maximize cost efficiency and adoption of this transformational tech. With virtual production, we can digitize characters and sets as 3D models, estimate lighting, easily relight scenes, optimize color renditions, and replace in-camera backgrounds via semantic segmentation.

Most importantly, in close collaboration with creators, we are building human-centric approaches to creative tools, from VFX to trailer editing. Context, not control, guides the work for data scientists and algorithm engineers at Netflix. Contributors enjoy a tremendous amount of latitude to come up with experiments and new approaches, rapidly test them in production contexts, and scale the impact of their work. Our leadership in this space hinges on our reliance on each individual’s ideas and drive towards a common goal — making Netflix the home of the best content and creative experience in the world.

Working on media ML at Netflix is a unique opportunity to push the boundaries of what’s technically and creatively possible. It’s a cutting edge and quickly evolving research area. The progress we’ve made so far is just the beginning. Our goal is to research and develop machine learning and computer vision tools that put power into the hands of creators and support them in making the best media possible.

We look forward to sharing our work with you across this blog series and beyond.

If these types of challenges interest you, please let us know! We are always looking for great people who are inspired by machine learning and computer vision to join our team.


New Series: Creating Media with Machine Learning was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Machine Learning for Fraud Detection in Streaming Services

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/machine-learning-for-fraud-detection-in-streaming-services-b0b4ef3be3f6

By Soheil Esmaeilzadeh, Negin Salajegheh, Amir Ziai, Jeff Boote

Introduction

Streaming services serve content to millions of users all over the world. These services allow users to stream or download content across a broad category of devices including mobile phones, laptops, and televisions. However, some restrictions are in place, such as the number of active devices, the number of streams, and the number of downloaded titles. Many users across many platforms make for a uniquely large attack surface that includes content fraud, account fraud, and abuse of terms of service. Detection of fraud and abuse at scale and in real-time is highly challenging.

Data analysis and machine learning techniques are great candidates to help secure large-scale streaming platforms. Even though such techniques can scale security solutions proportional to the service size, they bring their own set of challenges such as requiring labeled data samples, defining effective features, and finding appropriate algorithms. In this work, by relying on the knowledge and experience of streaming security experts, we define features based on the expected streaming behavior of the users and their interactions with devices. We present a systematic overview of the unexpected streaming behaviors together with a set of model-based and data-driven anomaly detection strategies to identify them.

Background on Anomaly Detection

Anomalies (also known as outliers) are defined as certain patterns (or incidents) in a set of data samples that do not conform to an agreed-upon notion of normal behavior in a given context.

There are two main anomaly detection approaches, namely, (i) rule-based, and (ii) model-based. Rule-based anomaly detection approaches use a set of rules which rely on the knowledge and experience of domain experts. Domain experts specify the characteristics of anomalous incidents in a given context and develop a set of rule-based functions to discover the anomalous incidents. As a result of this reliance, the deployment and use of rule-based anomaly detection methods become prohibitively expensive and time-consuming at scale, and cannot be used for real-time analyses. Furthermore, the rule-based anomaly detection approaches require constant supervision by experts in order to keep the underlying set of rules up-to-date for identifying novel threats. Reliance on experts can also make rule-based approaches biased or limited in scope and efficacy.

On the other hand, in model-based anomaly detection approaches, models are built and used to detect anomalous incidents in a fairly automated manner. Although model-based anomaly detection approaches are more scalable and suitable for real-time analysis, they highly rely on the availability of (often labeled) context-specific data. Model-based anomaly detection approaches, in general, are of three kinds, namely, (i) supervised, (ii) semi-supervised, and (iii) unsupervised. Given a labeled dataset, a supervised anomaly detection model can be built to distinguish between anomalous and benign incidents. In semi-supervised anomaly detection models, only a set of benign examples are required for training. These models learn the distributions of benign samples and leverage that knowledge for identifying anomalous samples at the inference time. Unsupervised anomaly detection models do not require any labeled data samples, but it is not straightforward to reliably evaluate their efficacy.

Figure 1. Schematic of a streaming service platform: (a) illustrates device types that can be used for streaming, (b) designates the set of authentication and authorization systems such as license and manifest servers for providing encrypted contents as well as decryption keys and manifests, and (c) shows the streaming service provider, as a surrogate entity for digital content providers, that interacts with the other two components.

Streaming Platforms

Commercial streaming platforms shown in Figure 1 mainly rely on Digital Rights Management (DRM) systems. DRM is a collection of access control technologies that are used for protecting the copyrights of digital media such as movies and music tracks. DRM helps the owners of digital products prevent illegal access, modification, and distribution of their copyrighted work. DRM systems provide continuous content protection against unauthorized actions on digital content and restrict it to streaming and in-time consumption. The backbone of DRM is the use of digital licenses, which specify a set of usage rights for the digital content and contain the permissions from the owner to stream the content via an on-demand streaming service.

On the client’s side, a request is sent to the streaming server to obtain the protected encrypted digital content. In order to stream the digital content, the user requests a license from the clearinghouse that verifies the user’s credentials. Once a license gets assigned to a user, using a Content Decryption Module (CDM), the protected content gets decrypted and becomes ready for preview according to the usage rights enforced by the license. A decryption key gets generated using the license, which is specific to a certain movie title, can only be used by a particular account on a given device, has a limited lifetime, and enforces a limit on how many concurrent streams are allowed.

Another relevant component that is involved in a streaming experience is the concept of manifest. Manifest is a list of video, audio, subtitles, etc. which comes in the form of a few Uniform Resource Locators (URLs) that are used by the clients to get the movie streams. Manifest is requested by the client and gets delivered to the player before the license request, and it itemizes the available streams.

Data

Data Labeling

For the task of anomaly detection in streaming platforms, as we have neither an already trained model nor any labeled data samples, we use structural a priori domain-specific rule-based assumptions, for data labeling. Accordingly, we define a set of rule-based heuristics used for identifying anomalous streaming behaviors of clients and label them as anomalous or benign. The fraud categories that we consider in this work are (i) content fraud, (ii) service fraud, and (iii) account fraud. With the help of security experts, we have designed and developed heuristic functions in order to discover a wide range of suspicious behaviors. We then use such heuristic functions for automatically labeling the data samples. In order to label a set of benign (non-anomalous) accounts a group of vetted users that are highly trusted to be free of any forms of fraud is used.

Next, we share three examples as a subset of our in-house heuristics that we have used for tagging anomalous accounts:

  • (i) Rapid license acquisition: a heuristic that is based on the fact that benign users usually watch one content at a time and it takes a while for them to move on to another content resulting in a relatively low rate of license acquisition. Based on this reasoning, we tag all the accounts that acquire licenses very quickly as anomalous.
  • (ii) Too many failed attempts at streaming: a heuristic that relies on the fact that most devices stream without errors while a device, in trial and error mode, in order to find the “right’’ parameters leaves a long trail of errors behind. Abnormally high levels of errors are an indicator of a fraud attempt.
  • (iii) Unusual combinations of device types and DRMs: a heuristic that is based on the fact that a device type (e.g., a browser) is normally matched with a certain DRM system (e.g., Widevine). Unusual combinations could be a sign of compromised devices that attempt to bypass security enforcements.

It should be noted that the heuristics, even though work as a great proxy to embed the knowledge of security experts in tagging anomalous accounts, may not be completely accurate and they might wrongly tag accounts as anomalous (i.e., false-positive incidents), for example in the case of a buggy client or device. That’s up to the machine learning model to discover and avoid such false-positive incidents.

Data Featurization

A complete list of features used in this work is presented in Table 1. The features mainly belong to two distinct classes. One class accounts for the number of distinct occurrences of a certain parameter/activity/usage in a day. For instance, the dist_title_cnt feature characterizes the number of distinct movie titles streamed by an account. The second class of features on the other hand captures the percentage of a certain parameter/activity/usage in a day.

Due to confidentiality reasons, we have partially obfuscated the features, for instance, dev_type_a_pct, drm_type_a_pct, and end_frmt_a_pct are intentionally obfuscated and we do not explicitly mention devices, DRM types, and encoding formats.

Table 1. The list of streaming related features with the suffixes pct and cnt respectively referring to percentage and count

Data Statistics

In this part, we present the statistics of the features presented in Table 1. Over 30 days, we have gathered 1,030,005 benign and 28,045 anomalous accounts. The anomalous accounts have been identified (labeled) using the heuristic-aware approach. Figure 2(a) shows the number of anomalous samples as a function of fraud categories with 8,741 (31%), 13,299 (47%), 6,005 (21%) data samples being tagged as content fraud, service fraud, and account fraud, respectively. Figure 2(b) shows that out of 28,045 data samples being tagged as anomalous by the heuristic functions, 23,838 (85%), 3,365 (12%), and 842 (3%) are respectively considered as incidents of one, two, and three fraud categories.

Figure 3 presents the correlation matrix of the 23 data features described in Table 1 for clean and anomalous data samples. As we can see in Figure 3 there are positive correlations between features that correspond to device signatures, e.g., dist_cdm_cnt and dist_dev_id_cnt, and between features that refer to title acquisition activities, e.g., dist_title_cnt and license_cnt.

Figure 2. Number of anomalous samples as a function of (a) fraud categories and (b) number of tagged categories.
Figure 3. Correlation matrix of the features presented in Table 1 for (a) clean and (b) anomalous data samples.

Label Imbalance Treatment

It is well known that class imbalance can compromise the accuracy and robustness of the classification models. Accordingly, in this work, we use the Synthetic Minority Over-sampling Technique (SMOTE) to over-sample the minority classes by creating a set of synthetic samples.

Figure 4 shows a high-level schematic of Synthetic Minority Over-sampling Technique (SMOTE) with two classes shown in green and red where the red class has fewer number of samples present, i.e., is the minority class, and gets synthetically upsampled.

Figure 4. Synthetic Minority Over-sampling Technique

Evaluation Metrics

For evaluating the performance of the anomaly detection models we consider a set of evaluation metrics and report their values. For the one-class as well as binary anomaly detection task, such metrics are accuracy, precision, recall, f0.5, f1, and f2 scores, and area under the curve of the receiver operating characteristic (ROC AUC). For the multi-class multi-label task we consider accuracy, precision, recall, f0.5, f1, and f2 scores together with a set of additional metrics, namely, exact match ratio (EMR) score, Hamming loss, and Hamming score.

Model Based Anomaly Detection

In this section, we briefly describe the modeling approaches that are used in this work for anomaly detection. We consider two model-based anomaly detection approaches, namely, (i) semi-supervised, and (ii) supervised as presented in Figure 5.

Figure 5. Model-based anomaly detection approaches: (a) semi-supervised and (b) supervised.

Semi-Supervised Anomaly Detection

The key point about the semi-supervised model is that at the training step the model is supposed to learn the distribution of the benign data samples so that at the inference time it would be able to distinguish between the benign samples (that has been trained on) and the anomalous samples (that has not observed). Then at the inference stage, the anomalous samples would simply be those that fall out of the distribution of the benign samples. The performance of One-Class methods could become sub-optimal when dealing with complex and high-dimensional datasets. However, supported by the literature, deep neural autoencoders can perform better than One-Class methods on complex and high-dimensional anomaly detection tasks.

As the One-Class anomaly detection approaches, in addition to a deep auto-encoder, we use the One-Class SVM, Isolation Forest, Elliptic Envelope, and Local Outlier Factor approaches.

Supervised Anomaly Detection

Binary Classification: In the anomaly detection task using binary classification, we only consider two classes of samples namely benign and anomalous and we do not make distinctions between the types of the anomalous samples, i.e., the three fraud categories. For the binary classification task we use multiple supervised classification approaches, namely, (i) Support Vector Classification (SVC), (ii) K-Nearest Neighbors classification, (iii) Decision Tree classification, (iv) Random Forest classification, (v) Gradient Boosting, (vi) AdaBoost, (vii) Nearest Centroid classification (viii) Quadratic Discriminant Analysis (QDA) classification (ix) Gaussian Naive Bayes classification (x) Gaussian Process Classifier (xi) Label Propagation classification (xii) XGBoost. Finally, upon doing stratified k-fold cross-validation, we carry out an efficient grid search to tune the hyper-parameters in each of the aforementioned models for the binary classification task and only report the performance metrics for the optimally tuned hyper-parameters.

Multi-Class Multi-Label Classification: In the anomaly detection task using multi-class multi-label classification, we consider the three fraud categories as the possible anomalous classes (hence multi-class), and each data sample is assigned one or more than one of the fraud categories as its set of labels (hence multi-label) using the heuristic-aware data labeling strategy presented earlier. For the multi-class multi-label classification task we use multiple supervised classification techniques, namely, (i) K-Nearest Neighbors, (ii) Decision Tree, (iii) Extra Trees, (iv) Random Forest, and (v) XGBoost.

Results and Discussion

Table 2 shows the values of the evaluation metrics for the semi-supervised anomaly detection methods. As we see from Table 2, the deep auto-encoder model performs the best among the semi-supervised anomaly detection approaches with an accuracy of around 96% and f1 score of 94%. Figure 6(a) shows the distribution of the Mean Squared Error (MSE) values for the anomalous and benign samples at the inference stage.

Table 2. The values of the evaluation metrics for a set of semi-supervised anomaly detection models.
Figure 6. For the deep auto-encoder model: (a) distribution of the Mean Squared Error (MSE) values for anomalous and benign samples at the inference stage — (b) confusion matrix across benign and anomalous samples- (c) Mean Squared Error (MSE) values averaged across the anomalous and benign samples for each of the 23 features.
Table 3. The values of the evaluation metrics for a set of supervised binary anomaly detection classifiers.
Table 4. The values of the evaluation metrics for a set of supervised multi-class multi-label anomaly detection approaches. The values in parenthesis refer to the performance of the models trained on the original (not upsampled) dataset.

Table 3 shows the values of the evaluation metrics for a set of supervised binary anomaly detection models. Table 4 shows the values of the evaluation metrics for a set of supervised multi-class multi-label anomaly detection models.

In Figure 7(a), for the content fraud category, the three most important features are the count of distinct encoding formats (dist_enc_frmt_cnt), the count of distinct devices (dist_dev_id_cnt), and the count of distinct DRMs (dist_drm_cnt). This implies that for content fraud the uses of multiple devices, as well as encoding formats, stand out from the other features. For the service fraud category in Figure 7(b) we see that the three most important features are the count of content licenses associated with an account (license_cnt), the count of distinct devices (dist_dev_id_cnt), and the percentage use of type (a) devices by an account (dev_type_a_pct). This shows that in the service fraud category the counts of content licenses and distinct devices of type (a) stand out from the other features. Finally, for the account fraud category in Figure 7(c), we see that the count of distinct devices (dist_dev_id_cnt) dominantly stands out from the other features.

Figure 7. The normalized feature importance values (NFIV) for the multi-class multi-label anomaly detection task using the XGBoost approach in Table 4 across the three anomaly classes, i.e., (a) content fraud, (b) service fraud, and (c) account fraud.

You can find more technical details in our paper here.

Are you interested in solving challenging problems at the intersection of machine learning and security? We are always looking for great people to join us.


Machine Learning for Fraud Detection in Streaming Services was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Seeing through hardware counters: a journey to threefold performance increase

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/seeing-through-hardware-counters-a-journey-to-threefold-performance-increase-2721924a2822

By Vadim Filanovsky and Harshad Sane

In one of our previous blogposts, A Microscope on Microservices we outlined three broad domains of observability (or “levels of magnification,” as we referred to them) — Fleet-wide, Microservice and Instance. We described the tools and techniques we use to gain insight within each domain. There is, however, a class of problems that requires an even stronger level of magnification going deeper down the stack to introspect CPU microarchitecture. In this blogpost we describe one such problem and the tools we used to solve it.

The problem

It started off as a routine migration. At Netflix, we periodically reevaluate our workloads to optimize utilization of available capacity. We decided to move one of our Java microservices — let’s call it GS2 — to a larger AWS instance size, from m5.4xl (16 vCPUs) to m5.12xl (48 vCPUs). The workload of GS2 is computationally heavy where CPU is the limiting resource. While we understand it’s virtually impossible to achieve a linear increase in throughput as the number of vCPUs grow, a near-linear increase is attainable. Consolidating on the larger instances reduces the amortized cost of background tasks, freeing up additional resources for serving requests and potentially offsetting the sub-linear scaling. Thus, we expected to roughly triple throughput per instance from this migration, as 12xl instances have three times the number of vCPUs compared to 4xl instances. A quick canary test was free of errors and showed lower latency, which is expected given that our standard canary setup routes an equal amount of traffic to both the baseline running on 4xl and the canary on 12xl. As GS2 relies on AWS EC2 Auto Scaling to target-track CPU utilization, we thought we just had to redeploy the service on the larger instance type and wait for the ASG (Auto Scaling Group) to settle on the CPU target. Unfortunately, the initial results were far from our expectations:

The first graph above represents average per-node throughput overlaid with average CPU utilization, while the second graph shows average request latency. We can see that as we reached roughly the same CPU target of 55%, the throughput increased only by ~25% on average, falling far short of our desired goal. What’s worse, average latency degraded by more than 50%, with both CPU and latency patterns becoming more “choppy.” GS2 is a stateless service that receives traffic through a flavor of round-robin load balancer, so all nodes should receive nearly equal amounts of traffic. Indeed, the RPS (Requests Per Second) data shows very little variation in throughput between nodes:

But as we started looking at the breakdown of CPU and latency by node, a strange pattern emerged:

Although we confirmed fairly equal traffic distribution between nodes, CPU and latency metrics surprisingly demonstrated a very different, bimodal distribution pattern. There is a “lower band” of nodes exhibiting much lower CPU and latency with hardly any variation; and there is an “upper band” of nodes with significantly higher CPU/latency and wide variation. We noticed only ~12% of the nodes fall into the lower band, a figure that was suspiciously consistent over time. In both bands, performance characteristics remain consistent for the entire uptime of the JVM on the node, i.e. nodes never jumped the bands. This was our starting point for troubleshooting.

First attempt at solving it

Our first (and rather obvious) step at solving the problem was to compare flame graphs for the “slow” and “fast” nodes. While flame graphs clearly reflected the difference in CPU utilization as the number of collected samples, the distribution across the stacks remained the same, thus leaving us with no additional insight. We turned to JVM-specific profiling, starting with the basic hotspot stats, and then switching to more detailed JFR (Java Flight Recorder) captures to compare the distribution of the events. Again, we came away empty-handed as there was no noticeable difference in the amount or the distribution of the events between the “slow” and “fast” nodes. Still suspecting something might be off with JIT behavior, we ran some basic stats against symbol maps obtained by perf-map-agent only to hit another dead end.

False Sharing

Convinced we’re not missing anything on the app-, OS- and JVM- levels, we felt the answer might be hidden at a lower level. Luckily, the m5.12xl instance type exposes a set of core PMCs (Performance Monitoring Counters, a.k.a. PMU counters), so we started by collecting a baseline set of counters using PerfSpect:

In the table above, the nodes showing low CPU and low latency represent a “fast node”, while the nodes with higher CPU/latency represent a “slow node”. Aside from obvious CPU differences, we can see that the slow node has almost 3x CPI (Cycles Per Instruction) of the fast node. We also see much higher L1 cache activity combined with 4x higher count of MACHINE_CLEARS. One common cause of these symptoms is so-called “false sharing” — a usage pattern occurring when 2 cores reading from / writing to unrelated variables that happen to share the same L1 cache line. Cache line is a concept similar to memory page — a contiguous chunk of data (typically 64 bytes on x86 systems) transferred to and from the cache. This diagram illustrates it:

Each core in this diagram has its own private cache. Since both cores are accessing the same memory space, caches have to be consistent. This consistency is ensured with so-called “cache coherency protocol.” As Thread 0 writes to the “red” variable, coherency protocol marks the whole cache line as “modified” in Thread 0’s cache and as “invalidated” in Thread 1’s cache. Later, when Thread 1 reads the “blue” variable, even though the “blue” variable is not modified, coherency protocol forces the entire cache line to be reloaded from the cache that had the last modification — Thread 0’s cache in this example. Resolving coherency across private caches takes time and causes CPU stalls. Additionally, ping-ponging coherency traffic has to be monitored through the last level shared cache’s controller, which leads to even more stalls. We take CPU cache consistency for granted, but this “false sharing” pattern illustrates there’s a huge performance penalty for simply reading a variable that is neighboring with some other unrelated data.

Armed with this knowledge, we used Intel vTune to run microarchitecture profiling. Drilling down into “hot” methods and further into the assembly code showed us blocks of code with some instructions exceeding 100 CPI, which is extremely slow. This is the summary of our findings:

Numbered markers from 1 to 6 denote the same code/variables across the sources and vTune assembly view. The red arrow indicates that the CPI value likely belongs to the previous instruction — this is due to the profiling skid in absence of PEBS (Processor Event-Based Sampling), and usually it’s off by a single instruction. Based on the fact that (5) “repne scan” is a rather rare operation in the JVM codebase, we were able to link this snippet to the routine for subclass checking (the same code exists in JDK mainline as of the writing of this blogpost). Going into the details of subtype checking in HotSpot is far beyond the scope of this blogpost, but curious readers can learn more about it from the 2002 publication Fast Subtype Checking in the HotSpot JVM. Due to the nature of the class hierarchy used in this particular workload, we keep hitting the code path that keeps updating (6) the “_secondary_super_cache” field, which is a single-element cache for the last-found secondary superclass. Note how this field is adjacent to the “_secondary_supers”, which is a list of all superclasses and is being read (1) in the beginning of the scan. Multiple threads do these read-write operations, and if fields (1) and (6) fall into the same cache line, then we hit a false sharing use case. We highlighted these fields with red and blue colors to connect to the false sharing diagram above.

Note that since the cache line size is 64 bytes and the pointer size is 8 bytes, we have a 1 in 8 chance of these fields falling on separate cache lines, and a 7 in 8 chance of them sharing a cache line. This 1-in-8 chance is 12.5%, matching our previous observation on the proportion of the “fast” nodes. Fascinating!

Although the fix involved patching the JDK, it was a simple change. We inserted padding between “_secondary_super_cache” and “_secondary_supers” fields to ensure they never fall into the same cache line. Note that we did not change the functional aspect of JDK behavior, but rather the data layout:

The results of deploying the patch were immediately noticeable. The graph below is a breakdown of CPU by node. Here we can see a red-black deployment happening at noon, and the new ASG with the patched JDK taking over by 12:15:

Both CPU and latency (graph omitted for brevity) showed a similar picture — the “slow” band of nodes was gone!

True Sharing

We didn’t have much time to marvel at these results, however. As the autoscaling reached our CPU target, we noticed that we still couldn’t push more than ~150 RPS per node — well short of our goal of ~250 RPS. Another round of vTune profiling on the patched JDK version showed the same bottleneck around secondary superclass cache lookup. It was puzzling at first to see seemingly the same problem coming back right after we put in a fix, but upon closer inspection we realized we’re dealing with “true sharing” now. Unlike “false sharing,” where 2 independent variables share a cache line, “true sharing” refers to the same variable being read and written by multiple threads/cores. In this case, CPU-enforced memory ordering is the cause of slowdown. We reasoned that removing the obstacle of false sharing and increasing the overall throughput resulted in increased execution of the same JVM superclass caching code path. Essentially, we have higher execution concurrency, causing excessive pressure on the superclass cache due to CPU-enforced memory ordering protocols. The common way to resolve this is to avoid writing to the shared variable altogether, effectively bypassing the JVM’s secondary superclass cache. Since this change altered the behavior of the JDK, we gated it behind a command line flag. This is the entirety of our patch:

And here are the results of running with disabled superclass cache writes:

Our fix pushed the throughput to ~350 RPS at the same CPU autoscaling target of 55%. To put this in perspective, that’s a 3.5x improvement over the throughput we initially reached on m5.12xl, along with a reduction in both average and tail latency.

Future work

Disabling writes to the secondary superclass cache worked well in our case, and even though this might not be a desirable solution in all cases, we wanted to share our methodology, toolset and the fix in the hope that it would help others encountering similar symptoms. While working through this problem, we came across JDK-8180450 — a bug that’s been dormant for more than five years that describes exactly the problem we were facing. It seems ironic that we could not find this bug until we actually figured out the answer. We believe our findings complement the great work that has been done in diagnosing and remediating it.

Conclusion

We tend to think of modern JVMs as highly optimized runtime environments, in many cases rivaling more “performance-oriented” languages like C++. While it holds true for the majority of workloads, we were reminded that performance of certain workloads running within JVMs can be affected not only by the design and implementation of the application code, but also by the implementation of the JVM itself. In this blogpost we described how we were able to leverage PMCs in order to find a bottleneck in the JVM’s native code, patch it, and subsequently realize better than a threefold increase in throughput for the workload in question. When it comes to this class of performance issues, the ability to introspect the execution at the level of CPU microarchitecture proved to be the only solution. Intel vTune provides valuable insight even with the core set of PMCs, such as those exposed by m5.12xl instance type. Exposing a more comprehensive set of PMCs along with PEBS across all instance types and sizes in the cloud environment would pave the way for deeper performance analysis and potentially even larger performance gains.

Special thanks to Sandhya Viswanathan, Jennifer Dimatteo, Brendan Gregg, Susie Xia, Jason Koch, Mike Huang, Amer Ather, Chris Berry, Chris Sanden, and Guy Cirino


Seeing through hardware counters: a journey to threefold performance increase was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Consistent caching mechanism in Titus Gateway

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/consistent-caching-mechanism-in-titus-gateway-6cb89b9ce296

by Tomasz Bak and Fabio Kung

Introduction

Titus is the Netflix cloud container runtime that runs and manages containers at scale. In the time since it was first presented as an advanced Mesos framework, Titus has transparently evolved from being built on top of Mesos to Kubernetes, handling an ever-increasing volume of containers. As the number of Titus users increased over the years, the load and pressure on the system increased substantially. The original assumptions and architectural choices were no longer viable. This blog post presents how our current iteration of Titus deals with high API call volumes by scaling out horizontally.

We introduce a caching mechanism in the API gateway layer, allowing us to offload processing from singleton leader elected controllers without giving up strict data consistency and guarantees clients observe. Titus API clients always see the latest (not stale) version of the data regardless of which gateway node serves their request, and in which order.

Overview

The figure below depicts a simplified high-level architecture of a single Titus cluster (a.k.a cell):

Titus Job Coordinator is a leader elected process managing the active state of the system. Active data includes jobs and tasks that are currently running. When a new leader is elected it loads all data from external storage. Mutations are first persisted to the active data store before in-memory state is changed. Data for completed jobs and tasks is moved to the archive store first, and only then removed from the active data store and from the leader memory.

Titus Gateway handles user requests. A user request could be a job creation request, a query to the active data store, or a query to the archive store (the latter handled directly in Titus Gateway). Requests are load balanced across all Titus Gateway nodes. All reads are consistent, so it does not matter which Titus Gateway instance is serving a query. For example, it is OK to send writes through one instance, and do reads from another one with full data read consistency guarantees. Titus Gateways always connect to the current Titus Job Coordinator leader. During leader failovers, all writes and reads of the active data are rejected until a connection to the active leader is re-established.

In the original version of the system, all queries to the active data set were forwarded to a singleton Titus Job Coordinator. The freshest data is served to all requests, and clients never observe read-your-write or monotonic-read consistency issues¹:

Data consistency on the Titus API is highly desirable as it simplifies client implementation. Causal consistency, which includes read-your-writes and monotonic-reads, frees clients from implementing client-side synchronization mechanisms. In PACELC terms we choose PC/EC and have the same level of availability for writes of our previous system while improving our theoretical availability for reads.

For example, a batch workflow orchestration system may create multiple jobs which are part of a single workflow execution. After the jobs are created, it monitors their execution progress. If the system creates a new job, followed immediately by a query to get its status, and there is a data propagation lag, it might decide that the job was lost and a replacement must be created. In that scenario, the system would need to deal with the data propagation latency directly, for example, by use of timeouts or client-originated update tracking mechanisms. As Titus API reads are always consistently reflecting the up-to-date state, such workarounds are not needed.

With traffic growth, a single leader node handling all request volume started becoming overloaded. We started seeing increased response latencies and leader servers running at dangerously high utilization. To mitigate this issue we decided to handle all query requests directly from Titus Gateway nodes but still preserve the original consistency guarantees:

The state from Titus Job Coordinator is replicated over a persistent stream connection, with low event propagation latencies. A new wire protocol provided by Titus Job Coordinator allows monitoring of the cache consistency level and guarantees that clients always receive the latest data version. The cache is kept in sync with the current leader process. When there is a failover (because of node failures with the current leader or a system upgrade), a new snapshot from the freshly elected leader is loaded, replacing the previous cache state. Titus Gateways handling client requests can now be horizontally scaled out. The details and workings of these mechanisms are the primary topics of this blog post.

How do I know that my cache is up to date?

It is an easy answer for systems that were built from the beginning with a consistent data versioning scheme and can depend on clients to follow the established protocol. Kubernetes is a good example here. Each object and each collection read from the Kubernetes cluster has a unique revision which is a monotonically increasing number. A user may request all changes since the last received revision. For more details, see Kubernetes API Concepts and the Shared Informer Pattern.

In our case, we did not want to change the API contract and impose additional constraints and requirements on our users. Doing so would require a substantial migration effort to move all clients off the old API with questionable value to the affected teams (except for helping us solve Titus' internal scalability problems). In our experience, such migrations require a nontrivial amount of work, particularly with the migration timeline not fully in our control.

To fulfill the existing API contract, we had to guarantee that for a request received at a time T₀, the data returned to the client is read from a cache that contains all state updates in Titus Job Coordinator up to time T₀.

The path over which data travels from Titus Job Coordinator to a Titus Gateway cache can be described as a sequence of event queues with different processing speeds:

A message generated by the event source may be buffered at any stage. Furthermore, as each event stream subscription from Titus Gateway to Titus Job Coordinator establishes a different instance of the processing pipeline, the state of the cache in each gateway instance may be vastly different.

Let’s assume a sequence of events E₁…E₁₀, and their location within the pipeline of two Titus Gateway instances at time T₁:

If a client makes a call to Titus Gateway 2 at the time T₁, it will read version E₈ of the data. If it immediately makes a request to Titus Gateway 1, the cache there is behind with respect to the other gateway so the client might read an older version of the data.

In both cases, data is not up to date in the caches. If a client created a new object at time T₀, and the object value is captured by an event update E₁₀, this object will be missing in both gateways at time T₁. A surprise to the client who successfully completed a create request, but the follow-up query returned a not-found error (read-your-write consistency violation).

The solution is to flush all the events created up to time T₁ and force clients to wait for the cache to receive them all. This work can be split into two different steps each with its own unique solution.

Implementation details

We solved the cache synchronization problem (as stated above) with a combination of two strategies:

  • Titus Gateway <-> Titus Job Coordinator synchronization protocol over the wire.
  • Usage of high-resolution monotonic time sources like Java’s nano time within a single server process. Java’s nano time is used as a logical time within a JVM to define an order for events happening in the JVM process. An alternative solution based on an atomic integer values generator to order the events would suffice as well. Having the local logical time source avoids issues with distributed clock synchronization.

If Titus Gateways subscribed to the Titus Job Coordinator event stream without synchronization steps, the amount of data staleness would be impossible to estimate. To guarantee that a Titus Gateway received all state updates that happened until some time Tₙ an explicit synchronization between the two services must happen. Here is what the protocol we implemented looks like:

  1. Titus Gateway receives a client request (queryₐ).
  2. Titus Gateway makes a request to the local cache to fetch the latest version of the data.
  3. The local cache in Titus Gateway records the local logical time and sends it to Titus Job Coordinator in a keep-alive message (keep-aliveₐ).
  4. Titus Job Coordinator saves the keep-alive request together with the local logical time Tₐ of the request arrival in a local queue (KAₐ, Tₐ).
  5. Titus Job Coordinator sends state updates to Titus Gateway until the former observes a state update (event) with a timestamp past the recorded local logical time (E1, E2).
  6. At that time, Titus Job Coordinator sends an acknowledgment event for the keep-alive message (KAₐ keep-alive ACK).
  7. Titus Gateway receives the keep-alive acknowledgment and consequently knows that its local cache contains all state changes that happened up to the time when the keep-alive request was sent.
  8. At this point the original client request can be handled from the local cache, guaranteeing that the client will get a fresh enough version of the data (responseₐ).

This process is illustrated by the figure below:

The procedure above explains how to synchronize a Titus Gateway cache with the source of truth in Titus Job Coordinator, but it does not address how the internal queues in Titus Job Coordinator are drained to the point where all relevant messages are processed. The solution here is to add a logical timestamp to each event and guarantee a minimum time interval between messages emitted inside the event stream. If not enough events are created because of data updates, a dummy message is generated and inserted into the stream. Dummy messages guarantee that each keep-alive request is acknowledged within a bounded time, and does not wait indefinitely until some change in the system happens. For example:

Ta, Tb, Tc, Td, and Te are high-resolution monotonic logical timestamps. At time Td a dummy message is inserted, so the interval between two consecutive events in the event stream is always below a configurable threshold. These timestamp values are compared with keep-alive request arrival timestamps to know when a keep-alive acknowledgment can be sent.

There are a few optimization techniques that can be used. Here are those implemented in Titus:

  • Before sending a keep-alive request for each new client request, wait a fixed interval and send a single keep-alive request for all requests that arrived during that time. So the maximum rate of keep-alive requests is constrained by 1 / max_interval. For example, if max_interval is set to 5ms, the max keep alive request rate is 200 req / sec.
  • Collapse multiple keep-alive requests in Titus Job Coordinator, sending a response to the latest one which has the arrival timestamp less than that of the timestamp of the last event sent over the network. On the Titus Gateway side, a keep-alive response with a given timestamp acknowledges all pending requests with keep-alive timestamps earlier or equal to the received one.
  • Do not wait for cache synchronization on requests that do not have ordering requirements, serving data from the local cache on each Titus Gateway. Clients that can tolerate eventual consistency can opt into this new API for lower response times and increased availability.

Given the mechanism described so far, let’s try to estimate the maximum wait time of a client request that arrived at Titus Gateway for different scenarios. Let’s assume that the maximum keep alive interval is 5ms, and the maximum interval between events emitted in Titus Job Coordinator is 2ms.

Assuming that the system runs idle (no changes made to the data), and the client request arrives at a time when a new keep-alive request wait time starts, the cache update latency is equal to 7 milliseconds + network propagation delay + processing time. If we ignore the processing time and assume that the network propagation delay is <1ms given we have to only send back a small keep-alive response, we should expect an 8ms delay in the typical case. If the client request does not have to wait for the keep-alive to be sent, and the keep-alive request is acknowledged immediately in Titus Job Coordinator, the delay is equal to network propagation delay + processing time, which we estimated to be <1ms. The average delay introduced by cache synchronization is around 4ms.

Network propagation delays and stream processing times start to become a more important factor as the number of state change events and client requests increases. However, Titus Job Coordinator can now dedicate its capacity for serving high bandwidth streams to a finite number of Titus Gateways, relying on the gateway instances to serve client requests, instead of serving payloads to all client requests itself. Titus Gateways can then be scaled out to match client request volumes.

We ran empirical tests for scenarios of low and high request volumes, and the results are presented in the next section.

Performance test results

To show how the system performs with and without the caching mechanism, we ran two tests:

  • A test with a low/moderate load showing a median latency increase due to overhead from the cache synchronization mechanism, but better 99th percentile latencies.
  • A test with load close to the peak of Titus Job Coordinator capacity, above which the original system collapses. Previous results hold, showing better scalability with the caching solution.

A single request in the tests below consists of one query. The query is of a moderate size, which is a collection of 100 records, with a serialized response size of ~256KB. The total payload (request size times the number of concurrently running requests) requires a network bandwidth of ~2Gbps in the first test and ~8Gbps in the second one.

Moderate load level

This test shows the impact of cache synchronization on query latency in a moderately loaded system. The query rate in this test is set to 1K requests/second.

Median latency without caching is half of what we observe with the introduction of the caching mechanism, due to the added synchronization delays. In exchange, the worst-case 99th percentile latencies are 90% lower, dropping from 292 milliseconds without a cache to 30 milliseconds with the cache.

Load level close to Titus Job Coordinator maximum

If Titus Job Coordinator has to handle all query requests (when the cache is not enabled), it handles the traffic well up to 4K test queries / second, and breaks down (sharp latency increase and a rapid drop of throughput) at around 4.5K queries/sec. The maximum load test is thus kept at 4K queries/second.

Without caching enabled the 99th percentile hovers around 1000ms, and the 80th percentile is around 336ms, compared with the cache-enabled 99th percentile at 46ms and 80th percentile at 22ms. The median still looks better on the setup with no cache at 17ms vs 19ms when the cache is enabled. It should be noted however that the system with caching enabled scales out linearly to more request load while keeping the same latency percentiles, while the no-cache setup collapses with a mere ~15% additional load increase.

Doubling the load when the caching is enabled does not increase the latencies at all. Here are latency percentiles when running 8K query requests/second:

Conclusion

After reaching the limit of vertical scaling of our previous system, we were pleased to implement a real solution that provides (in a practical sense) unlimited scalability of Titus read-only API. We were able to achieve better tail latencies with a minor sacrifice in median latencies when traffic is low, and gained the ability to horizontally scale out our API gateway processing layer to handle growth in traffic without changes to API clients. The upgrade process was completely transparent, and no single client observed any abnormalities or changes in API behavior during and after the migration.

The mechanism described here can be applied to any system relying on a singleton leader elected component as the source of truth for managed data, where the data fits in memory and latency is low.

As for prior art, there is ample coverage of cache coherence protocols in the literature, both in the context of multiprocessor architectures (Adve & Gharachorloo, 1996) and distributed systems (Gwertzman & Seltzer, 1996). Our work fits within mechanisms of client polling and invalidation protocols explored by Gwertzman and Seltzer (1996) in their survey paper. Central timestamping to facilitate linearizability in read replicas is similar to the Calvin system (example real-world implementations in systems like FoundationDB) as well as the replica watermarking in AWS Aurora.

¹ Designing Data-Intensive Applications is an excellent book that goes into detail about consistency models discussed in this blog post.

² Adve, S. V., & Gharachorloo, K. (1996). Shared memory consistency models: A tutorial. computer, 29(12), 66–76.

³ Gwertzman, J., & Seltzer, M. I. (1996, January). World Wide Web Cache Consistency. In USENIX annual technical conference (Vol. 141, p. 152).


Consistent caching mechanism in Titus Gateway was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Orchestrating Data/ML Workflows at Scale With Netflix Maestro

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/orchestrating-data-ml-workflows-at-scale-with-netflix-maestro-aaa2b41b800c

by Jun He, Akash Dwivedi, Natallia Dzenisenka, Snehal Chennuru, Praneeth Yenugutala, Pawan Dixit

At Netflix, Data and Machine Learning (ML) pipelines are widely used and have become central for the business, representing diverse use cases that go beyond recommendations, predictions and data transformations. A large number of batch workflows run daily to serve various business needs. These include ETL pipelines, ML model training workflows, batch jobs, etc. As Big data and ML became more prevalent and impactful, the scalability, reliability, and usability of the orchestrating ecosystem have increasingly become more important for our data scientists and the company.

In this blog post, we introduce and share learnings on Maestro, a workflow orchestrator that can schedule and manage workflows at a massive scale.

Motivation

Scalability and usability are essential to enable large-scale workflows and support a wide range of use cases. Our existing orchestrator (Meson) has worked well for several years. It schedules around 70 thousands of workflows and half a million jobs per day. Due to its popularity, the number of workflows managed by the system has grown exponentially. We started seeing signs of scale issues, like:

  • Slowness during peak traffic moments like 12 AM UTC, leading to increased operational burden. The scheduler on-call has to closely monitor the system during non-business hours.
  • Meson was based on a single leader architecture with high availability. As the usage increased, we had to vertically scale the system to keep up and were approaching AWS instance type limits.

With the high growth of workflows in the past few years — increasing at > 100% a year, the need for a scalable data workflow orchestrator has become paramount for Netflix’s business needs. After perusing the current landscape of workflow orchestrators, we decided to develop a next generation system that can scale horizontally to spread the jobs across the cluster consisting of 100’s of nodes. It addresses the key challenges we face with Meson and achieves operational excellence.

Challenges in Workflow Orchestration

Scalability

The orchestrator has to schedule hundreds of thousands of workflows, millions of jobs every day and operate with a strict SLO of less than 1 minute of scheduler introduced delay even when there are spikes in the traffic. At Netflix, the peak traffic load can be a few orders of magnitude higher than the average load. For example, a lot of our workflows are run around midnight UTC. Hence, the system has to withstand bursts in traffic while still maintaining the SLO requirements. Additionally, we would like to have a single scheduler cluster to manage most of user workflows for operational and usability reasons.

Another dimension of scalability to consider is the size of the workflow. In the data domain, it is common to have a super large number of jobs within a single workflow. For example, a workflow to backfill hourly data for the past five years can lead to 43800 jobs (24 * 365 * 5), each of which processes data for an hour. Similarly, ML model training workflows usually consist of tens of thousands of training jobs within a single workflow. Those large-scale workflows might create hotspots and overwhelm the orchestrator and downstream systems. Therefore, the orchestrator has to manage a workflow consisting of hundreds of thousands of jobs in a performant way, which is also quite challenging.

Usability

Netflix is a data-driven company, where key decisions are driven by data insights, from the pixel color used on the landing page to the renewal of a TV-series. Data scientists, engineers, non-engineers, and even content producers all run their data pipelines to get the necessary insights. Given the diverse backgrounds, usability is a cornerstone of a successful orchestrator at Netflix.

We would like our users to focus on their business logic and let the orchestrator solve cross-cutting concerns like scheduling, processing, error handling, security etc. It needs to provide different grains of abstractions for solving similar problems, high-level to cater to non-engineers and low-level for engineers to solve their specific problems. It should also provide all the knobs for configuring their workflows to suit their needs. In addition, it is critical for the system to be debuggable and surface all the errors for users to troubleshoot, as they improve the UX and reduce the operational burden.

Providing abstractions for the users is also needed to save valuable time on creating workflows and jobs. We want users to rely on shared templates and reuse their workflow definitions across their team, saving time and effort on creating the same functionality. Using job templates across the company also helps with upgrades and fixes: when the change is made in a template it’s automatically updated for all workflows that use it.

However, usability is challenging as it is often opinionated. Different users have different preferences and might ask for different features. Sometimes, the users might ask for the opposite features or ask for some niche cases, which might not necessarily be useful for a broader audience.

Introducing Maestro

Maestro is the next generation Data Workflow Orchestration platform to meet the current and future needs of Netflix. It is a general-purpose workflow orchestrator that provides a fully managed workflow-as-a-service (WAAS) to the data platform at Netflix. It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, for various use cases.

Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users. Figure 1 shows the high-level architecture.

Figure 1. Maestro high level architecture
Figure 1. Maestro high level architecture

In Maestro, a workflow is a DAG (Directed acyclic graph) of individual units of job definition called Steps. Steps can have dependencies, triggers, workflow parameters, metadata, step parameters, configurations, and branches (conditional or unconditional). In this blog, we use step and job interchangeably. A workflow instance is an execution of a workflow, similarly, an execution of a step is called a step instance. Instance data include the evaluated parameters and other information collected at runtime to provide different kinds of execution insights. The system consists of 3 main micro services which we will expand upon in the following sections.

Maestro ensures the business logic is run in isolation. Maestro launches a unit of work (a.k.a. Steps) in a container and ensures the container is launched with the users/applications identity. Launching with identity ensures the work is launched on-behalf-of the user/application, the identity is later used by the downstream systems to validate if an operation is allowed or not, for an example user/application identity is checked by the data warehouse to validate if a table read/write is allowed or not.

Workflow Engine

Workflow engine is the core component, which manages workflow definitions, the lifecycle of workflow instances, and step instances. It provides rich features to support:

  • Any valid DAG patterns
  • Popular data flow constructs like sub workflow, foreach, conditional branching etc.
  • Multiple failure modes to handle step failures with different error retry policies
  • Flexible concurrency control to throttle the number of executions at workflow/step level
  • Step templates for common job patterns like running a Spark query or moving data to Google sheets
  • Support parameter code injection using customized expression language
  • Workflow definition and ownership management.
    Timeline including all state changes and related debug info.

We use Netflix open source project Conductor as a library to manage the workflow state machine in Maestro. It ensures to enqueue and dequeue each step defined in a workflow with at least once guarantee.

Time-Based Scheduling Service

Time-based scheduling service starts new workflow instances at the scheduled time specified in workflow definitions. Users can define the schedule using cron expression or using periodic schedule templates like hourly, weekly etc;. This service is lightweight and provides an at-least-once scheduling guarantee. Maestro engine service will deduplicate the triggering requests to achieve an exact-once guarantee when scheduling workflows.

Time-based triggering is popular due to its simplicity and ease of management. But sometimes, it is not efficient. For example, the daily workflow should process the data when the data partition is ready, not always at midnight. Therefore, on top of manual and time-based triggering, we also provide event-driven triggering.

Signal Service

Maestro supports event-driven triggering over signals, which are pieces of messages carrying information such as parameter values. Signal triggering is efficient and accurate because we don’t waste resources checking if the workflow is ready to run, instead we only execute the workflow when a condition is met.

Signals are used in two ways:

  • A trigger to start new workflow instances
  • A gating function to conditionally start a step (e.g., data partition readiness)

Signal service goals are to

  • Collect and index signals
  • Register and handle workflow trigger subscriptions
  • Register and handle the step gating functions
  • Captures the lineage of workflows triggers and steps unblocked by a signal
Figure 2. Signal service high level architecture
Figure 2. Signal service high level architecture

The maestro signal service consumes all the signals from different sources, e.g. all the warehouse table updates, S3 events, a workflow releasing a signal, and then generates the corresponding triggers by correlating a signal with its subscribed workflows. In addition to the transformation between external signals and workflow triggers, this service is also responsible for step dependencies by looking up the received signals in the history. Like the scheduling service, the signal service together with Maestro engine achieves exactly-once triggering guarantees.

Signal service also provides the signal lineage, which is useful in many cases. For example, a table updated by a workflow could lead to a chain of downstream workflow executions. Most of the time the workflows are owned by different teams, the signal lineage helps the upstream and downstream workflow owners to see who depends on whom.

Orchestration at Scale

All services in the Maestro system are stateless and can be horizontally scaled out. All the requests are processed via distributed queues for message passing. By having a shared nothing architecture, Maestro can horizontally scale to manage the states of millions of workflow and step instances at the same time.

CockroachDB is used for persisting workflow definitions and instance state. We chose CockroachDB as it is an open-source distributed SQL database that provides strong consistency guarantees that can be scaled horizontally without much operational overhead.

It is hard to support super large workflows in general. For example, a workflow definition can explicitly define a DAG consisting of millions of nodes. With that number of nodes in a DAG, UI cannot render it well. We have to enforce some constraints and support valid use cases consisting of hundreds of thousands (or even millions) of step instances in a workflow instance.

Based on our findings and user feedback, we found that in practice

  • Users don’t want to manually write the definitions for thousands of steps in a single workflow definition, which is hard to manage and navigate over UI. When such a use case exists, it is always feasible to decompose the workflow into smaller sub workflows.
  • Users expect to repeatedly run a certain part of DAG hundreds of thousands (or even millions) times with different parameter settings in a given workflow instance. So at runtime, a workflow instance might include millions of step instances.

Therefore, we enforce a workflow DAG size limit (e.g. 1K) and we provide a foreach pattern that allows users to define a sub DAG within a foreach block and iterate the sub DAG with a larger limit (e.g. 100K). Note that foreach can be nested by another foreach. So users can run millions or billions of steps in a single workflow instance.

In Maestro, foreach itself is a step in the original workflow definition. Foreach is internally treated as another workflow which scales similarly as any other Maestro workflow based on the number of step executions in the foreach loop. The execution of sub DAG within foreach will be delegated to a separate workflow instance. Foreach step will then monitor and collect status of those foreach workflow instances, each of which manages the execution of one iteration.

Figure 3. Maestro’s scalable foreach design to support super large iterations
Figure 3. Maestro’s scalable foreach design to support super large iterations

With this design, foreach pattern supports sequential loop and nested loop with high scalability. It is easy to manage and troubleshoot as users can see the overall loop status at the foreach step or view each iteration separately.

Workflow Platform for Everyone

We aim to make Maestro user friendly and easy to learn for users with different backgrounds. We made some assumptions about user proficiency in programming languages and they can bring their business logic in multiple ways, including but not limited to, a bash script, a Jupyter notebook, a Java jar, a docker image, a SQL statement, or a few clicks in the UI using parameterized workflow templates.

User Interfaces

Maestro provides multiple domain specific languages (DSLs) including YAML, Python, and Java, for end users to define their workflows, which are decoupled from their business logic. Users can also directly talk to Maestro API to create workflows using the JSON data model. We found that human readable DSL is popular and plays an important role to support different use cases. YAML DSL is the most popular one due to its simplicity and readability.

Here is an example workflow defined by different DSLs.

Figure 4. An example workflow defined by YAML, Python, and Java DSLs
Figure 4. An example workflow defined by YAML, Python, and Java DSLs

Additionally, users can also generate certain types of workflows on UI or use other libraries, e.g.

  • In Notebook UI, users can directly schedule to run the chosen notebook periodically.
  • In Maestro UI, users can directly schedule to move data from one source (e.g. a data table or a spreadsheet) to another periodically.
  • Users can use Metaflow library to create workflows in Maestro to execute DAGs consisting of arbitrary Python code.

Parameterized Workflows

Lots of times, users want to define a dynamic workflow to adapt to different scenarios. Based on our experiences, a completely dynamic workflow is less favorable and hard to maintain and troubleshooting. Instead, Maestro provides three features to assist users to define a parameterized workflow

  • Conditional branching
  • Sub-workflow
  • Output parameters

Instead of dynamically changing the workflow DAG at runtime, users can define those changes as sub workflows and then invoke the appropriate sub workflow at runtime because the sub workflow id is a parameter, which is evaluated at runtime. Additionally, using the output parameter, users can produce different results from the upstream job step and then iterate through those within the foreach, pass it to the sub workflow, or use it in the downstream steps.

Here is an example (using YAML DSL) of backfill workflow with 2 steps. In step1, the step computes the backfill ranges and returns the dates back. Next, foreach step uses the dates from step1 to create foreach iterations. Finally, each of the backfill jobs gets the date from the foreach and backfills the data based on the date.

Workflow:
id: demo.pipeline
jobs:
- job:
id: step1
type: NoOp
'!dates': return new int[]{20220101,20220102,20220103}; #SEL
- foreach:
id: step2
params:
date: ${dates@step1} #reference a upstream step parameter
jobs:
- job:
id: backfill
type: Notebook
notebook:
input_path: s3://path/to/notebook.ipynb
arg1: $date #pass the foreach parameter into notebook
Figure 4. An example of using parameterized workflow for backfill data
Figure 5. An example of using parameterized workflow for backfill data

The parameter system in Maestro is completely dynamic with code injection support. Users can write the code in Java syntax as the parameter definition. We developed our own secured expression language (SEL) to ensure security. It only exposes limited functionality and includes additional checks (e.g. the number of iteration in the loop statement, etc.) in the language parser.

Execution Abstractions

Maestro provides multiple levels of execution abstractions. Users can choose to use the provided step type and set its parameters. This helps to encapsulate the business logic of commonly used operations, making it very easy for users to create jobs. For example, for spark step type, all users have to do is just specify needed parameters like spark sql query, memory requirements, etc, and Maestro will do all behind-the-scenes to create the step. If we have to make a change in the business logic of a certain step, we can do so seamlessly for users of that step type.

If provided step types are not enough, users can also develop their own business logic in a Jupyter notebook and then pass it to Maestro. Advanced users can develop their own well-tuned docker image and let Maestro handle the scheduling and execution.

Additionally, we abstract the common functions or reusable patterns from various use cases and add them to the Maestro in a loosely coupled way by introducing job templates, which are parameterized notebooks. This is different from step types, as templates provide a combination of various steps. Advanced users also leverage this feature to ship common patterns for their own teams. While creating a new template, users can define the list of required/optional parameters with the types and register the template with Maestro. Maestro validates the parameters and types at the push and run time. In the future, we plan to extend this functionality to make it very easy for users to define templates for their teams and for all employees. In some cases, sub-workflows are also used to define common sub DAGs to achieve multi-step functions.

Moving Forward

We are taking Big Data Orchestration to the next level and constantly solving new problems and challenges, please stay tuned. If you are motivated to solve large scale orchestration problems, please join us as we are hiring.


Orchestrating Data/ML Workflows at Scale With Netflix Maestro was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Product Teams Can Build Empathy Through Experimentation

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-product-teams-can-build-empathy-through-experimentation-6253603880a6

A conversation between Travis Brooks, Netflix Product Manager for Experimentation Platform, and George Khachatryan, OfferFit CEO

Note: I’ve known George for a little while now, and as we’ve talked a lot about the philosophy of experimentation, he kindly invited me to their office (virtually) for their virtual speaker series. We had a fun conversation with his team, and we realized that some parts of it might make a good blog post as well. So we jointly edited a bit for length and clarity, and are posting here as well as on OfferFit’s blog. Hope you enjoy the result. — Travis B.

George Khachatryan: Travis, could you tell us a bit about your background and how you came to your current role?

Travis Brooks: I’m the product manager (PM) for the experimentation platform. So my job is to make sure that all the tooling and infrastructure we have at Netflix for experimentation does what it needs to do, and to set the road map for the next year or more for what we’re building.

I started out in physics, but ended up not doing that. Instead, I started leading an information resource for particle physics literature. One of the things we ran up against was we didn’t really have enough users to run experiments. We were all experimental physicists at heart and we wanted to make decisions on some sort of principled basis, but we didn’t actually have enough users to get statistical significance.

At the same time, I had an opportunity to go join Yelp as the first product manager there for search, where there were many more users. And so I did that and spent some time building out search algorithms and recommendation engines at Yelp.

I came to Netflix about three years ago, and first led a team of data scientists responsible for front end experimentation — basically everything you see on the Netflix platform. And then in the last year, I’ve been the PM for all of our experimentation infrastructure and platform.

George Khachatryan: So over the last decade, a lot of tech companies have been increasingly embracing user centric design — it’s kind of become the accepted wisdom. And a lot of non-tech companies also are increasingly trying to be customer centric in their thinking. How would you define user centric design and what role do you think experimentation plays in it?

Travis Brooks: Let me say first that I’m talking here about my own experiences. I’m not speaking for Netflix.

But what I can say is that broadly, I think user centric design is really about empathy. And as a person who’s been both a user facing PM and a tools PM, having empathy for your user is one of the core traits that defines good product management. So when we say “user centric”, we’re just saying, “Hey, really lean into empathy.”

When you’re building things, whether you’re a visual designer, or a designer of an API, or a PM, or anybody who’s building something, lean into trying to put yourself in the shoes of the user. And if you can do that, not just at the beginning when you write down the specs, but all the way through the process, you make a better product in the end.

In the act of building we tend to get really entranced by the technical problem and solving that problem. And in fact, it’s probably necessary to lose sight of what the end user is going to experience in order to build the best technical solution. So to build products that are effective for the user we need that user perspective brought back into focus pretty regularly — “Oh, wait, here’s what the end user is going to experience” Or, “Oh yeah, actually we don’t even need to solve that really challenging, interesting technical problem over there, because the end user is only going to experience this part over here.” To me, that’s what user centric design means.

How do we make sure that in all aspects, whether it’s an API, or the front-end visual design, we’re centering the user? How are they going to experience this product? What are their pain points? Is what we’re doing actually connected to that end user?

George Khachatryan: And what role does experimentation play, if any, in building empathy?

Travis Brooks: This is really a PM’s role — to ensure that the team that’s building something is maintaining that level of user empathy. But then you have to ask, “How does the PM know what users want?” Right? They’re not magic. A good PM doesn’t spring fully formed from the head of Zeus with all the knowledge of what users want. How do they get that knowledge? I think there are four ways.

1. One way is if you’re PM-ing a product that you yourself use. It’s the cheapest and maybe the lowest fidelity way of building empathy. “Okay, well, I’m a user so I know what users feel because I use the product”. It’s low fidelity because it’s an N of 1, and you’re certainly not a typical user. You’re a PM. You have a way different way of interacting with products than most people.

2. Typically the next thing people do is they start talking to users. And if they’re smart, they start talking to people who are not like them. “Hey, how do you use this product? What do you value? What do you find painful about it? How often do you use it? Why don’t you use it more? When was the last time you used it? What were you trying to do? Did you achieve that?” — all those typical user research questions that PMs ask. Really good user researchers get into this sort of qualitative research, and that’s a great way to build broader empathy, at a higher fidelity level, than just, “I use my product.”

3. Then you get to a scale where you have a lot of users, and talking to them becomes an art of “How do I get a representative sample from this broad population?” And you start to worry that maybe their memory isn’t quite perfect. Users are self-reporting how they use things, but that’s not actually how they use things. We know people have a lot of cognitive biases in that way. So then you start getting into observational data, and you say, “well, okay, people report that they use the product once a week. If I go look at data, I can see people use the product three times a week, so I can tell that what they report isn’t quite what happened.” Adding this observational data layer makes user research much higher fidelity. Of course, it’s higher cost and may take some time and some effort and investment.

4. But even that observational data layer doesn’t really help you understand how people use the product at the level of a deep causal connection. The end game of trying to understand the user is, “if I do X, users respond this way”. And the only way to establish that causal connection — maybe not the only way, but the most reliable way, the highest fidelity way — is to show a random sample of your users X and see how they differ from the rest of your users who didn’t see X. That’s the core of experimentation: a high cost, high fidelity, arguably lower speed, way to build empathy. It’s probably not the first place you’re going to turn to build empathy, but you’re going to get there and you’ll eventually need to have it in your arsenal.

Different scales of usage demand different methods of learning
Different methods of learning work optimally at different scales, having all of them in your arsenal is useful.

George Khachatryan: Yeah. So you talk about the importance of building an experimentation culture. Can you explain what the main elements of such a culture would be, in your view?

Travis Brooks: I think having a sense of humility is super important. If you read our blog posts, or posts from anybody who does massive scale testing such as Microsoft, you see that they test, and most of their treatments fail. And those are treatments from expert designers and PMs and engineers who have the best context, the best user research. Especially as your product matures, it is hard to improve upon. Even a less mature product is hard to improve upon, because it turns out our intuition is pretty good. We understand what users need. It’s just not a very reliable mechanism. So most treatments that we come up with fail, which means you have to have a lot of humility.

You can’t get married to your ideas and say, “I’m going to do an experiment; this is going to blow the world away.” And you end up wasting a lot of time and effort trying to show that your treatment is good, even if it’s not. You miss the bigger picture, which is, “Hey, you tried something. It didn’t work. What can you learn from that experience to inform the next treatment?”

The cultures where people can be really successful in experimentation involve a lot of humility, which encourages that sort of iterative approach. “I’m guessing this is not going to work because I can see from history most of these things don’t work. What I’m going to do is put it out there and I’m going to learn from it. Maybe I’ll get lucky and it’ll work right off the bat, but maybe I won’t. I’ll learn from the next two tests, and I’ll get to someplace where I can actually solve this problem.”

The other thing I think is important is having a culture of open debate, where decisions are made out in the open. The more open your decision-making, the louder a voice data has. When decision-making gets closed, into one person’s office or one person’s head, it’s hard. Often when people debate and they can’t agree, they turn to data, because it’s a lot harder to disagree with that. And so if you want an experimentation culture, if you want data, have open debate. Have open decision making. Then people more clearly see that they need data, that they need to experiment.

So yes, I think that humility and open decision-making are really important.


How Product Teams Can Build Empathy Through Experimentation was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support for Non-Parallelizable Workloads

by Kostas Christidis

Introduction

Timestone is a high-throughput, low-latency priority queueing system we built in-house to support the needs of Cosmos, our media encoding platform. Over the past 2.5 years, its usage has increased, and Timestone is now also the priority queueing engine backing Conductor, our general-purpose workflow orchestration engine, and BDP Scheduler, the scheduler for large-scale data pipelines. All in all, millions of critical workflows within Netflix now flow through Timestone on a daily basis.

Timestone clients can create queues, enqueue messages with user-defined deadlines and metadata, then dequeue these messages in an earliest-deadline-first (EDF) fashion. Filtering for EDF messages with criteria (e.g. “messages that belong to queue X and have metadata Y”) is also supported.

One of the things that make Timestone different from other priority queues is its support for a construct we call exclusive queues — this is a means to mark chunks of work as non-parallelizable, without requiring any locking or coordination on the consumer side; everything is taken care of by the exclusive queue in the background. We explain the concept in detail in the sections that follow.

Why Timestone

When designing the successor to Reloaded — our media encoding system — back in 2018 (see “Background” section in The Netflix Cosmos Platform), we needed a priority queueing system that would provide queues between the three components in Cosmos (Figure 1):

  1. the API framework (Optimus),
  2. the forward chaining rule engine (Plato), and
  3. the serverless computing layer (Stratum)
Figure 1. A video encoding application built on top of Cosmos. Notice the three Cosmos subsystems: Optimus, an API layer mapping external requests to internal business models, Plato, a workflow layer for business rule modeling, and Stratum, the serverless layer for running stateless and computational-intensive functions. Source: The Netflix Cosmos Platform

Some of the key requirements this priority queueing system would need to satisfy:

1. A message can only be assigned to one worker at any given time. The work that tends to happen in Cosmos is resource-intensive, and can fan out to thousands of actions. Assume then, that there is replication lag between the replicas in our data store, and we present as dequeueable to worker B the message that was just dequeued by worker A via a different node. When we do that, we waste significant compute cycles. This requirement then throws eventually consistent solutions out of the window, and means we want linearizable consistency at the queue level.

2. Allow for non-parallelizable work.

Given that Plato is continuously polling all workflow queues for more work to execute —

While Plato is executing a workflow for a given project (a request for work on a given service) —

Then Plato should not be able to dequeue additional requests for work for that project on that workflow. Otherwise Plato’s inference engine will evaluate the workflow prematurely, and may move the workflow to an incorrect state.

There exists then, a certain type of work in Cosmos that should not be parallelizable, and the ask is for the queueing system to support this type of access pattern natively. This requirement gave birth to the exclusive queue concept. We explain how exclusive queues work in Timestone in the“Key Concepts” section.

3. Allow for dequeueing and queue depth querying using filters (metadata key-value pairs)

4. Allow for the automatic creation of a queue upon message ingestion

5. Render a message dequeueable within a second of ingestion

We built Timestone because we could not find an off-the-shelf solution that meets these requirements.

System Architecture

Timestone is a gRPC-based service. We use protocol buffers to define the interface of our service and the structure of our request and response messages. The system diagram for the application is shown in Figure 2.

Figure 2. Timestone system diagram. Arrows link all the components touched during a typical Timestone client-server interaction. Numbers in red indicate sequence steps. Identical numbers identify concurrent steps.

System of record

The system of record is a durable Redis cluster. Every write request (see Step 1 — note that this includes dequeue requests since they alter the state of the queue) that reaches the cluster (Step 2) is persisted to a transaction log before a response is sent back to the server (Step 3).

Inside the database, we represent each queue with a sorted set where we rank message IDs (see “Message” section) according to priority. We persist messages and queue configurations (see “Queues” section) in Redis as hashes. All data structures related to a queue — from the messages it contains to the in-memory secondary indexes needed to support dequeue-by-filter — are placed in the same Redis shard. We achieve this by having them share a common prefix, specific to the queue in question. We then codify this prefix as a Redis hash tag. Each message carries a payload (see “Message” section) that can weigh up to 32 KiB.

Almost all of the interactions between Timestone and Redis (see “Message States” section) are codified as Lua scripts. In most of these Lua scripts, we tend to update a number of data structures. Since Redis guarantees that each script is executed atomically, a successful script execution is guaranteed to leave the system in a consistent (in the ACID sense) state.

All API operations are queue-scoped. All API operations that modify state are idempotent.

Secondary indexes

For observability purposes, we capture information about incoming messages and their transition between states in two secondary indexes maintained on Elasticsearch.

When we get a write response from Redis, we concurrently (a) return the response to the client, and (b) convert this response into an event that we post to a Kafka cluster, as shown in Step 4. Two Flink jobs — one for each type of index we maintain — consume the events from the corresponding Kafka topics, and update the indexes in Elasticsearch.

One index (“current”) gives users a best-effort view into the current state of the system, while the other index (“historic”) gives users a best effort longitudinal view for messages, allowing them to trace the messages as they flow through Timestone, and answer questions such as time spent in a state, and number of processing errors. We maintain a version counter for each message; every write operation increments that counter. We rely on that version counter to order the events in the historic index.

Events are stored in the Elasticsearch cluster for a finite number of days.

Current Usage at Netflix

The system is dequeue heavy. We see 30K dequeue requests per second (RPS) with a P99 latency of 45ms. In comparison, we see 1.2K enqueue RPS at 25ms P99 latency. We regularly see 5K RPS enqueue bursts at 85ms P99 latency.

15B messages have been enqueued to Timestone since the beginning of the year; these messages have been dequeued 400B times. Pending messages regularly reach 10M.

Usage is expected to double next year, as we migrate the rest of Reloaded, our legacy media encoding system, to Cosmos.

Key Concepts

Message

A message carries an opaque payload, a user-defined priority (see “Priority” section), an optional (mandatory for exclusive queues) set of metadata key-value pairs that can be used for filter-based dequeueing, and an optional invisibility duration.

Any message that is placed into a queue can be dequeued a finite number of times. We call these attempts; each dequeue invocation on a message decreases the attempts left on it.

Priority

The priority of a message is expressed as an integer value; the lower the value, the higher the priority. While an application is free to use whatever range they see fit, the norm is to use Unix timestamps in milliseconds (e.g. 1661990400000 for 9/1/2022 midnight UTC).

Figure 3. A snippet from the PriorityClass enum used by a streaming encoding pipeline in Cosmos. The values in parentheses indicate the offset in days.

It is also entirely up to the application to define its own priority levels. For instance a streaming encoding pipeline within Cosmos uses mail priority classes, as shown in Figure 3. Messages belonging to the standard class use the time of enqueue as their priority, while all other classes have their priority values adjusted in multiples of ∼10 years. The priority is set at the workflow rule level, but can be overridden if the request carries a studio tag, such as DAY_OF_BROADCAST.

Message States

Within a queue, a Timestone message belongs to one of six states (Figure 4):

  1. invisible
  2. pending
  3. running
  4. completed
  5. canceled
  6. errored

In general, a message can be enqueued with or without invisibility, which makes the message invisible or pending respectively. Invisible messages become pending when their invisibility window elapses.

A worker can dequeue a pending earliest-deadline-first message from a queue by specifying the amount of time (lease duration) they will be processing it for. Dequeueing messages in batch is also supported. This moves the message to the running state.

The same worker can then issue a complete call to Timestone within the allotted lease window to move the message to the completed state, or issue a lease extension call if they want to maintain control of the message. (A worker can also move a typically running message to the canceled state to signal it is no longer need for processing.)

If none of these calls are issued on time, the message becomes dequeueable again, and this attempt on the message is spent. If there are no attempts left on the message, it is moved automatically to the errored state.

The terminal states (completed, errored, and canceled) are garbage-collected periodically by a background process.

Messages can move states either when a worker invokes an API operation, or when Timestone runs its background processes (Figure 4, marked in red — these run periodically). Figure 4 shows the complete state transition diagram.

Figure 4. State transition diagram for Timestone messages.

Queues

All incoming messages are stored in queues. Within a queue, messages are sorted by their priority date. Timestone can host an arbitrary number of user-created queues, and offers a set of API operations for queue management, all revolving around a queue configuration object. Data we store in this object includes the queue’s type (see rest of section), the lease duration that applies to dequeued messages, or the invisibility duration that applies to enqueued messages, the number of times a message can be dequeued, and whether enqueueing or dequeueing is temporarily blocked. Note that a message producer can override the default lease or invisibility duration by setting it at the message level during enqueue.

All queues in Timestone fall into two types, simple, and exclusive.

When an exclusive queue is created, it is associated with a user-defined exclusivity key — for example project. All messages posted to that queue must carry this key in their metadata. For instance, a message with project=foo will be accepted into the queue; a message without the project key will not be. In this example, we call foo, the value that corresponds to the exclusivity key, the message’s exclusivity value.

The contract for exclusive queues is that at any point in time, there can be only up to one consumer per exclusivity value. Therefore, if the project-based exclusive queue in our example has two messages with the key-value pair project=foo in it, and one of them is already leased out to a worker, the other one is not dequeueable. This is depicted in Figure 5.

Figure 5. When worker_2 issues a dequeue call, they lease msg_2 instead of msg_1, even though msg_1 has a higher priority. That happens because the queue is exclusive, and the exclusive value foo is already leased out.

In a simple queue no such contract applies, and there is no tight coupling with message metadata keys. A simple queue works as your typical priority queue, simply ordering messages in an earliest-deadline-first fashion.

What We Are Working On

Some of the things we’re working on:

  1. As the the usage of Timestone within Cosmos grows, so does the need to support a range of queue depth queries. To solve this, we are building a dedicated query service that uses a distinct query model.
  2. As noted above (see “System of record” section), a queue and its contents can only currently occupy one Redis shard. Hot queues however can grow big, esp. when compute capacity is scarce. We want to support arbitrarily large queues, which has us building support for queue sharding.
  3. Messages can carry up to 4 key-value pairs. We currently use all of these key-value pairs to populate the secondary indexes used during dequeue-by-filter. This operation is exponentially complex both in terms of time and space (O(2^n)). We are switching to lexicographical ordering on sorted sets to drop the number of indexes by half, and handle metadata in a more cost-efficient manner.

We may be covering our work on the above in follow-up posts. If these kinds of problems sound interesting to you, and if you like the challenges of building distributed systems for the Netflix Content and Studio ecosystem at scale in general, you should consider joining us.

Acknowledgements

Poorna Reddy, Kostas Christidis, Aravindan Ramkumar, Surafel Korse, Jiaofen Xu, Anoop Panicker, and Kishore Banala have contributed to this project. We thank Charles Zhao, Olof Johansson, Frank San Miguel, Dmitry Vasilyev, Prudhvi Chaganti, and the rest of the Cosmos team for their constructive feedback while developing and operating Timestone.


Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Reinforcement Learning for Budget Constrained Recommendations

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/reinforcement-learning-for-budget-constrained-recommendations-6cbc5263a32a

by Ehtsham Elahi
with
James McInerney, Nathan Kallus, Dario Garcia Garcia and Justin Basilico

Introduction

This writeup is about using reinforcement learning to construct an optimal list of recommendations when the user has a finite time budget to make a decision from the list of recommendations. Working within the time budget introduces an extra resource constraint for the recommender system. It is similar to many other decision problems (for e.g. in economics and operations research) where the entity making the decision has to find tradeoffs in the face of finite resources and multiple (possibly conflicting) objectives. Although time is the most important and finite resource, we think that it is an often ignored aspect of recommendation problems.

In addition to relevance of the recommendations, time budget also determines whether users will accept a recommendation or abandon their search. Consider the scenario that a user comes to the Netflix homepage looking for something to watch. The Netflix homepage provides a large number of recommendations and the user has to evaluate them to choose what to play. The evaluation process may include trying to recognize the show from its box art, watching trailers, reading its synopsis or in some cases reading reviews for the show on some external website. This evaluation process incurs a cost that can be measured in units of time. Different shows will require different amounts of evaluation time. If it’s a popular show like Stranger Things then the user may already be aware of it and may incur very little cost before choosing to play it. Given the limited time budget, the recommendation model should construct a slate of recommendations by considering both the relevance of the items to the user and their evaluation cost. Balancing both of these aspects can be difficult as a highly relevant item may have a much higher evaluation cost and it may not fit within the user’s time budget. Having a successful slate therefore depends on the user’s time budget, relevance of each item as well as their evaluation cost. The goal for the recommendation algorithm therefore is to construct slates that have a higher chance of engagement from the user with a finite time budget. It is important to point out that the user’s time budget, like their preferences, may not be directly observable and the recommender system may have to learn that in addition to the user’s latent preferences.

A typical slate recommender system

We are interested in settings where the user is presented with a slate of recommendations. Many recommender systems rely on a bandit style approach to slate construction. A bandit recommender system constructing a slate of K items may look like the following:

A bandit style recommender system for slate construction

To insert an element at slot k in the slate, the item scorer scores all of the available N items and may make use of the slate constructed so far (slate above) as additional context. The scores are then passed through a sampler (e.g. Epsilon-Greedy) to select an item from the available items. The item scorer and the sampling step are the main components of the recommender system.

Problem formulation

Let’s make the problem of budget constrained recommendations more concrete by considering the following (simplified) setting. The recommender system presents a one dimensional slate (a list) of K items and the user examines the slate sequentially from top to bottom.

A user with a fixed time budget evaluating a slate of recommendations with K items. Item 2 gets the click/response from the user. The item shaded in red falls outside of the user’s time budget.

The user has a time budget which is some positive real valued number. Let’s assume that each item has two features, relevance (a scalar, higher value of relevance means that the item is more relevant) and cost (measured in a unit of time). Evaluating each recommendation consumes from the user’s time budget and the user can no longer browse the slate once the time budget has exhausted. For each item examined, the user makes a probabilistic decision to consume the recommendation by flipping a coin with probability of success proportional to the relevance of the video. Since we want to model the user’s probability of consumption using the relevance feature, it is helpful to think of relevance as a probability directly (between 0 and 1). Clearly the probability to choose something from the slate of recommendations is dependent not only on the relevance of the items but also on the number of items the user is able to examine. A recommendation system trying to maximize the user’s engagement with the slate needs to pack in as many relevant items as possible within the user budget, by making a trade-off between relevance and cost.

Connection with the 0/1 Knapsack problem

Let’s look at it from another perspective. Consider the following definitions for the slate recommendation problem described above

Clearly the abandonment probability is small if the items are highly relevant (high relevance) or the list is long (since the abandonment probability is a product of probabilities). The abandonment option is sometimes referred to as the null choice/arm in bandit literature.

This problem has clear connections with the 0/1 Knapsack problem in theoretical computer science. The goal is to find the subset of items with the highest total utility such that the total cost of the subset is not greater than the user budget. If β_i and c_i are the utility and cost of the i-th item and u is the user budget, then the budget constrained recommendations can be formulated as

0/1 Knapsack formulation for Budget constrained recommendations

There is an additional requirement that optimal subset S be sorted in descending order according to the relevance of items in the subset.

The 0/1 Knapsack problem is a well studied problem and is known to be NP-Complete. There are many approximate solutions to the 0/1 Knapsack problem. In this writeup, we propose to model the budget constrained recommendation problem as a Markov Decision process and use algorithms from reinforcement learning (RL) to find a solution. It will become clear that the RL based solution to budget constrained recommendation problems fits well within the recommender system architecture for slate construction. To begin, we first model the budget constrained recommendation problem as a Markov Decision Process.

Budget constrained recommendations as a Markov Decision Process

In a Markov decision process, the key component is the state evolution of the environment as a function of the current state and the action taken by the agent. In the MDP formulation of this problem, the agent is the recommender system and the environment is the user interacting with the recommender system. The agent constructs a slate of K items by repeatedly selecting actions it deems appropriate at each slot in the slate. The state of the environment/user is characterized by the available time budget and the items examined in the slate at a particular step in the slate browsing process. Specifically, the following table defines the Markov Decision Process for the budget constrained recommendation problem,

Markov Decision Process for Budget constrained recommendations

In real world recommender systems, the user budget may not be observable. This problem can be solved by computing an estimate of the user budget from historical data (e.g. how long the user scrolled before abandoning in the historical data logs). In this writeup, we assume that the recommender system/agent has access to the user budget for sake of simplicity.

The slate generation task above is an episodic task i-e the recommender agent is tasked with choosing K items in the slate. The user provides feedback by choosing one or zero items from the slate. This can be viewed as a binary reward r per item in the slate. Let π be the recommender policy generating the slate and γ be the reward discount factor, we can then define the discounted return for each state, action pair as,

State, Action Value function estimation

The reinforcement learning algorithm we employ is based on estimating this return using a model. Specifically, we use Temporal Difference learning TD(0) to estimate the value function. Temporal difference learning uses Bellman’s equation to define the value function of current state and action in terms of value function of future state and action.

Bellman’s equation for state, action value function

Based on this Bellman’s equation, a squared loss for TD-Learning is,

Loss Function for TD(0) Learning

The loss function can be minimized using semi-gradient based methods. Once we have a model for q, we can use that as the item scorer in the above slate recommender system architecture. If the discount factor γ =0, the return for each (state, action) pair is simply the immediate user feedback r. Therefore q with γ = 0 corresponds to an item scorer for a contextual bandit agent whereas for γ > 0, the recommender corresponds to a (value function based) RL agent. Therefore simply using the model for the value function as the item scorer in the above system architecture makes it very easy to use an RL based solution.

Budget constrained Recommendation Simulation

As in other applications of RL, we find simulations to be a helpful tool for studying this problem. Below we describe the generative process for the simulation data,

Generative model for simulated data

Note that, instead of sampling the per-item Bernoulli, we can alternatively sample once from a categorical distribution with relative relevances for items and a fixed weight for the null arm. The above generative process for simulated data depends on many hyper-parameters (loc, scale etc.). Each setting of these hyper-parameters results in a different simulated dataset and it’s easy to realize many simulated datasets in parallel. For the experiments below, we fix the hyper-parameters for the cost and relevance distributions and sweep over the initial user budget distribution’s location parameter. The attached notebook contains the exact settings of the hyper-parameters used for the simulations.

Metric

A slate recommendation algorithm generates slates and then the user model is used to predict the success/failure of each slate. Given the simulation data, we can train various recommendation algorithms and compare their performance using a simple metric as the average number of successes of the generated slates (referred to as play-rate below). In addition to play-rate, we look at the effective-slate-size as well, which we define to be the number of items in the slate that fit the user’s time budget. As mentioned earlier, one of the ways play-rate can be improved is by constructing larger effective slates (with relevant items of-course) so looking at this metric helps understand the mechanism of the recommendation algorithms.

On-policy learning results

Given the flexibility of working in the simulation setting, we can learn to construct optimal slates in an on-policy manner. For this, we start with some initial random model for the value function, generate slates from it, get user feedback (using the user model) and then update the value function model using the feedback and keep repeating this loop until the value function model converges. This is known as the SARSA algorithm.

The following set of results show how the learned recommender policies behave in terms of metric of success, play-rate for different settings of the user budget distributions’s location parameter and the discount factor. In addition to the play rate, we also show the effective slate size, average number of items that fit within the user budget. While the play rate changes are statistically insignificant (the shaded areas are the 95% confidence intervals estimated using bootstrapping simulations 100 times), we see a clear trend in the increase in the effective slate size (γ > 0) compared to the contextual bandit (γ= 0)

Play-Rate and Effective slate sizes for different User Budget distributions. The user budget distribution’s location is on the same scale of the item cost and we are looking for changes in the metrics as we make changes to the user budget distribution

We can actually get a more statistically sensitive result by comparing the result of the contextual bandit with an RL model for each simulation setting (similar to a paired comparison in paired t-test). Below we show the change in play rate (delta play rate) between any RL model (shown with γ = 0.8 below as an example) and a contextual bandit (γ = 0). We compare the change in this metric for different user budget distributions. By performing this paired comparison, we see a statistically significant lift in play rate for small to medium budget user budget ranges. This makes intuitive sense as we would expect both approaches to work equally well when the user budget is too large (therefore the item’s cost is irrelevant) and the RL algorithm only outperforms the contextual bandit when the user budget is limited and finding the trade-off between relevance and cost is important. The increase in the effective slate size is even more dramatic. This result clearly shows that the RL agent is performing better by minimizing the abandonment probability by packing more items within the user budget.

Paired comparison between RL and Contextual bandit. For limited user budget settings, we see statistically significant lift in play rate for the RL algorithm.

Off-policy learning results

So far the results have shown that in the budget constrained setting, reinforcement learning outperforms contextual bandit. These results have been for the on-policy learning setting which is very easy to simulate but difficult to execute in realistic recommender settings. In a realistic recommender, we have data generated by a different policy (called a behavior policy) and we want to learn a new and better policy from this data (called the target policy). This is called the off-policy setting. Q-Learning is one well known technique that allows us to learn optimal value function in an off-policy setting. The loss function for Q-Learning is very similar to the TD(0) loss except that it uses Bellman’s optimality equation instead

Loss function for Q-Learning

This loss can again be minimized using semi-gradient techniques. We estimate the optimal value function using Q-Learning and compare its performance with the optimal policy learned using the on-policy SARSA setup. For this, we generate slates using Q-Learning based optimal value function model and compare the play-rate with the slates generated using the optimal policy learned with SARSA. Below is result of the paired comparison between SARSA and Q-Learning,

Paired comparison between Q-Learning and SARSA. Play rates are similar between the two approaches but effective slate sizes are very different.

In this result, the change in the play-rate between on-policy and off-policy models is close to zero (see the error bars crossing the zero-axis). This is a favorable result as this shows that Q-Learning results in similar performance as the on-policy algorithm. However, the effective slate size is quite different between Q-Learning and SARSA. Q-Learning seems to be generating very large effective slate sizes without much difference in the play rate. This is an intriguing result and needs a little more investigation to fully uncover. We hope to spend more time understanding this result in future.

Conclusion:

To conclude, in this writeup we presented the budget constrained recommendation problem and showed that in order to generate slates with higher chances of success, a recommender system has to balance both the relevance and cost of items so that more of the slate fits within the user’s time budget. We showed that the problem of budget constrained recommendation can be modeled as a Markov Decision Process and we can find a solution to optimal slate construction under budget constraints using reinforcement learning based methods. We showed that the RL outperforms contextual bandits in this problem setting. Moreover, we compared the performance of On-policy and Off-policy approaches and found the results to be comparable in terms of metrics of success.

Code

Github repo


Reinforcement Learning for Budget Constrained Recommendations was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Virtual Production — A Validation Framework For Unreal Engine

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/virtual-production-a-validation-framework-for-unreal-engine-aab780b2f8c8

Virtual Production — A Validation Framework For Unreal Engine

By Adam Davis, Jimmy Fusil, Bhanu Srikanth and Girish Balakrishnan

Game Engines in Virtual Production

The use of Virtual Production and real time technologies has markedly accelerated in the past few years. At Netflix, we are always thrilled to see technology enable new ways of telling stories, and the use of these techniques on some of our shows like 1899 and Super Giant Robot Brothers has given us a front row seat to this exciting evolution in filmmaking. Each production that deploys these methods is an opportunity for the crew, tech manufacturers and us–the Netflix Production Innovation team–to learn, innovate and collaborate towards a common goal: universally accessible workflows that will enable creative opportunities and technical success for all filmmakers regardless of the size, location or scope of their project.

Game engines have been a major part of driving the democratization of advanced virtual production techniques such as pre-visualization and in-camera visual effects. While these platforms were initially developed to build games, their open and versatile design and development environment offer other capabilities. In the hands of film and TV artists and technicians–combined with techniques and technologies such as LED Panels and tracking systems–they’ve become powerful creation tools for filmmakers.

The incredible depth and flexibility of game engines also means there are infinite permutations and combinations of configurations and settings available to the operator. But not getting the configuration right can have a significant impact on the results. Could we harness the benefits of this depth and flexibility while also supporting the predictability and repeatability expected from high pressure shooting environments? Or put more simply: could we offer productions a shortcut to achieving the right configuration for their scenario every time?

The Netflix Production Innovation Team is always asking itself versions of this question: “How do we ensure a high level of creative opportunity and flexibility, while achieving technical consistency and quality? How do we reduce the potential for accidents and errors, while making sure artists and technicians still have the agency to stay true to their vision and their practice?” One past approach to enabling excellence while mitigating risk has been creating knowledge resources for our productions and partners, while also working with the industry to develop standards and best practices. But in this instance, the open development framework of the game engine itself, as well as its central place in Virtual Production workflows, offered another compelling solution.

The Validation Framework as Helper, Teacher and Enabler

Many large scale operations use quality control methods and validation frameworks to ensure successful outcomes. Validation frameworks can act as a flexible and unobtrusive assistant, while offering a safety net for the operator. We’ve had success with this type of validation in the past with Photon, which acted as a library, reference implementation and validation tool for IMF packages, the delivery format for all of our Netflix original programming.

An excellent validation framework is designed with the following characteristics:

  1. It is informative and instructive, providing useful information as to why an issue has arisen, what problems will be caused if not resolved, and what can be done to resolve it.
  2. It acts as dynamic documentation, a checklist, and a co-pilot to allow the operator to focus on the creative aspects.
  3. It promotes standards and consistency across individuals and teams, regardless of how bespoke the workflow is, and allows them to operate along a common and agreed set of expectations and checks.
  4. It enhances efficiency in complex interconnected workflows by looking across multiple components, roles and responsibilities and reporting on the system as a whole.

With such a validation framework in place, we can avoid:

  • Locking users behind layers of confining procedures and tooling, which can limit the evolution of new ways of working by constraining the options available to the operator and creator to adapt to new scenarios, as well as preventing them from learning what’s really going on under the hood.
  • Creating piles of documentation, sticky notes, runbooks… While all of that material is often useful as learning aids or during prep, it’s rarely practical to be flicking through pages of documentation to solve a problem during production.

Furthermore, by incorporating the validation framework within the game engine itself, we could ensure that everything is set up for success, maintaining flexibility while minimizing the risk of error. Because when the camera rolls, creators don’t want to assume everything is ok; they need to know that all the possible steps were taken to prevent errors.

Netflix’s Unreal Engine Validation Framework

Image of the Netflix Unreal Engine Validation Framework User Interface

Functionality

Our Validation Framework, which we have developed as a plugin to Unreal Engine, is extensible and customizable. It hosts and manages automated validation checks and fixes, which help identify and address technical problems within a given workflow.

The Validation Framework builds upon Unreal’s EditorUtility functionality and provides a simple base–the Validation Base–from which all other validations are built. The core also provides a registration to find all validations built atop this base within the Unreal project itself. This allows us to execute the validations from either blueprints or from C++, serving a variety of users: from developers extending the execution into a CI/CD from the C++ side, to an artist executing via a widget or the UI in the editor.

Regardless of how many validations there are or where in the project they live, they are always accessible and can be run from either entry point thanks to the registration mechanics. This gives teams a lot of freedom and flexibility as to how they extend validations into their workflows. For example, a core library of validations can be shared across projects within a company, or across a production–including with other vendors. Or project-specific validations can be deployed, encapsulated within an Unreal project until it is delivered.

While we aim to keep as much exposed in blueprints as possible, having a thin underlying C++ layer is a convenient way to grant the framework access to some of the objects, settings and parameters which are inaccessible via blueprinting alone.

All of the Validations provide two simple hooks: the Validation itself, which checks something, and a Fix, which can apply a correction. What the validation framework checks for, and what and how failures are fixed is entirely up to the artists and developers!

Validations can be grouped to a dedicated Scope, specifically either Level or Project. This enables a hierarchy in the assignment of validations, as some will be applied on Project settings and configurations, while others will be inspecting the content of the Levels.

A second layer of organizational tagging can then be applied to create Workflows. While the validations can only apply to a single Scope (either Level or Project), they can belong to multiple workflows, each with its own set of validations. Several workflow “presets” are available in the initial release, along with the capability for users to define their own workflows.

Users can also define new validations, but the tool requires that they provide a description of what they are checking for, as well as a description of what and how non-conformities (invalid settings or configurations) can be corrected. While we can’t police it, we encourage users to describe not only what the new validation checks, but also why, as well as how failing to resolve non-conformities will impact the workflow results. This will help ensure that the validation framework is useful not only as a risk management tool but also as an educational resource.

The UI is built almost entirely in blueprints, and utilizes a few helper blueprint nodes implemented in C++. Again, this allows users to use what’s there, replace it with their own, or integrate it into their existing UIs.

Finally, the framework generates validation reports, in either JSON or CSV format. Each time it runs, it writes a validation report for the project/level that was just inspected. This allows users to share results with others, such as support teams, or create a record of configurations for reporting or archival purposes.

We focused our initial development on a validation tool within Epic’s Unreal Engine because of its rapid adoption across our global slate of movies and shows, and the vendors that support them. This plug-in approach is also well aligned with Epic Games’ toolkit philosophy: being open to productions of all shapes, sizes and experience levels. Given this wide applicability, we saw a lot of benefit in an easy-to-implement utility that addresses the most prevalent “gotchas” in Unreal project set-ups for Virtual Production projects.

In-Camera VFX Validations

The plugin ships with a set of validations for the most commonly required checks and fixes for inspecting configurations aimed at executing ICVFX techniques. This set of validations is the result of a collaborative effort between the Netflix Production Innovation team and Epic Games; it is built from our collective knowledge and experience on ICVFX productions.

We initially focused only on LED In-Camera VFX (ICVFX) applications. The goal is to help production teams catch and resolve common issues which can cause unexpected render results, data outputs, and potential performance or resource allocation challenges when working with Unreal Engine. Many of the issues this tool mitigates can sometimes be hard to spot, particularly in the complex and high pressure environment of a virtual production set.

That said, the Validation Framework is not solely for use on-set; it can also be utilized during the prep and content build phases, ensuring issues are identified and handled early, before moving further along the production pipeline. The framework can also easily be extended by users to encompass bespoke workflows with custom validations and fixes, allowing teams to create and perform their own unique checks.

Current Usage and Next Steps

The Validation Framework is currently being used on a number of Netflix shows, and has already significantly reduced the amount of time needed for system setup and troubleshooting. We have also received great feedback from our crews, which has led us to add new checks and fixes. Some of our partners have even begun adapting and extending this framework to better suit their proprietary workflows.

In the future, we would like to see such frameworks become commonly used in virtual production environments in two ways:

  1. Continued implementation and extension of this particular validation framework by Virtual Production crews and service providers;
  2. Adaptation of this automated validation approach to assist with the entire stage environment, even where Unreal is not part of the workflow: checking the settings on media servers, LED processors, workstations, tracking systems… all of which play a critical part in a Virtual Production system. In order for that to happen, we need to ensure that this wider ecosystem of devices can also be queried and monitored procedurally, which in turn will free crews to focus on the creative work, and not worry so much about the tech.

Availability

The validation framework can be found at: https://github.com/Netflix-Skunkworks/UnrealValidationFramework

The system can be integrated into pipelines and additional tooling around CI/CD to generate validation reports.

We look forward to receiving your feedback and suggestions via the “Issues” tab on our git repo (above).


Virtual Production — A Validation Framework For Unreal Engine was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Mesh — A Data Movement and Processing Platform @ Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-mesh-a-data-movement-and-processing-platform-netflix-1288bcab2873

Data Mesh — A Data Movement and Processing Platform @ Netflix

By Bo Lei, Guilherme Pires, James Shao, Kasturi Chatterjee, Sujay Jain, Vlad Sydorenko

Background

Realtime processing technologies (A.K.A stream processing) is one of the key factors that enable Netflix to maintain its leading position in the competition of entertaining our users. Our previous generation of streaming pipeline solution Keystone has a proven track record of serving multiple of our key business needs. However, as we expand our offerings and try out new ideas, there’s a growing need to unlock other emerging use cases that were not yet covered by Keystone. After evaluating the options, the team has decided to create Data Mesh as our next generation data pipeline solution.

Last year we wrote a blog post about how Data Mesh helped our Studio team enable data movement use cases. A year has passed, Data Mesh has reached its first major milestone and its scope keeps increasing. As a growing number of use cases on board to it, we have a lot more to share. We will deliver a series of articles that cover different aspects of Data Mesh and what we have learned from our journey. This article gives an overview of the system. The following ones will dive deeper into different aspects of it.

Data Mesh Overview

A New Definition Of Data Mesh

Previously, we defined Data Mesh as a fully managed, streaming data pipeline product used for enabling Change Data Capture (CDC) use cases. As the system evolves to solve more and more use cases, we have expanded its scope to handle not only the CDC use cases but also more general data movement and processing use cases such that:

  • Events can be sourced from more generic applications (not only databases).
  • The catalog of available DB connectors is growing (CockroachDB, Cassandra for example)
  • More Processing patterns such as filter, projection, union, join, etc.

As a result, today we define Data Mesh as a general purpose data movement and processing platform for moving data between Netflix systems at scale.

Overall Architecture

The Data Mesh system can be divided into the control plane (Data Mesh Controller) and the data plane (Data Mesh Pipeline). The controller receives user requests, deploys and orchestrates pipelines. Once deployed, the pipeline performs the actual heavy lifting data processing work. Provisioning a pipeline involves different resources. The controller delegates the responsibility to the corresponding microservices to manage their life cycle.

Pipelines

A Data Mesh pipeline reads data from various sources, applies transformations on the incoming events and eventually sinks them into the destination data store. A pipeline can be created from the UI or via our declarative API. On the creation/update request the controller figures out the resources associated with the pipeline and calculates the proper configuration for each of them.

Connectors

A source connector is a Data Mesh managed producer. It monitors the source database’s bin log and produces CDC events to the Data Mesh source fronting Kafka topic. It is able to talk to the Data Mesh controller to automatically create/update the sources.

Previously we only had RDS source connectors to listen to MySQL and Postgres using the DBLog library; Now we have added Cockroach DB source connectors and Cassandra source connectors. They use different mechanisms to stream events out of the source databases. We’ll have blog posts deep dive into them.

In addition to managed connectors, application owners can emit events via a common library, which can be used in circumstances where a DB connector is not yet available or there is a preference to emit domain events without coupling with a DB schema.

Sources

Application developers can expose their domain data in a centralized catalog of Sources. This allows data sharing as multiple teams at Netflix may be interested in receiving changes for an entity. In addition, a Source can be defined as a result of a series of processing steps — for example an enriched Movie entity with several dimensions (such as the list of Talents) that further can be indexed to fulfill search use cases.

Processors

A processor is a Flink Job. It contains a reusable unit of data processing logic. It reads events from the upstream transports and applies some business logic to each of them. An intermediate processor writes data to another transport. A sink processor writes data to an external system such as Iceberg, ElasticSearch, or a separate discoverable Kafka topic.

We have provided a Processor SDK to help the advanced users to develop their own processors. Processors developed by Netflix developers outside our team can also be registered to the platform and work with other processors in a pipeline. Once a processor is registered, the platform also automatically sets up a default alert UI and metrics dashboard

Transports

We use Kafka as the transportation layer for the interconnected processors to communicate. The output events of the upstream processor are written to a Kafka topic, and the downstream processors read their input events from there.

Kafka topics can also be shared across pipelines. A topic in pipeline #1 that holds the output of its upstream processor can be used as the source in pipeline #2. We frequently see use cases where some intermediate output data is needed by different consumers. This design enables us to reuse and share data as much as possible. We have also implemented the features to track the data lineage so that our users can have a better picture of the overall data usage.

Schema

Data Mesh enforces schema on all the pipelines, meaning we require all the events passing through the pipelines to conform to a predefined template. We’re using Avro as a shared format for all our schemas, as it’s simple, powerful, and widely adopted by the community..

We make schema as the first class citizen in Data Mesh due to the following reasons:

  • Better data quality: Only events that comply with the schema can be encoded. Gives the consumer more confidence.
  • Finer granularity of data lineage: The platform is able to track how fields are consumed by different consumers and surface it on the UI.
  • Data discovery: Schema describes data sets and enables the users to browse different data sets and find the dataset of interest.

On pipeline creation, each processor in that pipeline needs to define what schema it consumes and produces. The platform handles the schema validation and compatibility check. We have also built automation around handling schema evolution. If the schema is changed at the source, the platform tries to upgrade the consuming pipelines automatically without human intervention.

Future

Data Mesh Initially started as a project to solve our Change Data Capture needs. Over the past year, we have observed an increasing demand for all sorts of needs in other domains such as Machine Learning, Logging, etc. Today, Data Mesh is still in its early stage and there are just so many interesting problems yet to be solved. Below are the highlights of some of the high priority tasks on our roadmap.

Making Data Mesh The Paved Path (Recommended Solution) For Data Movement And Processing

As mentioned above, Data Mesh is meant to be the next generation of Netflix’s real-time data pipeline solution. As of now, we still have several specialized internal systems serving their own use cases. To streamline the offering, it makes sense to gradually migrate those use cases onto Data Mesh. We are currently working hard to make sure that Data Mesh can achieve feature parity to Delta and Keystone. In addition, we also want to add support for more sources and sinks to unlock a wide range of data integration use cases.

More Processing Patterns And Better Efficiency

People use Data Mesh not only to move data. They often also want to process or transform their data along the way. Another high priority task for us is to make more common processing patterns available to our users. Since by default a processor is a Flink job, having each simple processor doing their work in their own Flink jobs can be less efficient. We are also exploring ways to merge multiple processing patterns into one single Flink job.

Broader support for Connectors

We are frequently asked by our users if Data Mesh is able to get data out of datastore X and land it into datastore Y. Today we support certain sources and sinks but it’s far from enough. The demand for more types of connectors is just enormous and we see a big opportunity ahead of us and that’s definitely something we also want to invest on.

Data Mesh is a complex yet powerful system. We believe that as it gains its maturity, it will be instrumental in Netflix’s future success. Again, we are still at the beginning of our journey and we are excited about the upcoming opportunities. In the following months, we’ll publish more articles discussing different aspects of Data Mesh. Please stay tuned!

The Team

Data Mesh wouldn’t be possible without the hard work and great contributions from the team. Special thanks should go to our stunning colleagues:

Bronwyn Dunn, Jordan Hunt, Kevin Zhu, Pradeep Kumar Vikraman, Santosh Kalidindi, Satyajit Thadeshwar, Tom Lee, Wei Liu


Data Mesh — A Data Movement and Processing Platform @ Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Formulating ‘Out of Memory Kill’ Prediction on the Netflix App as a Machine Learning Problem

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/formulating-out-of-memory-kill-prediction-on-the-netflix-app-as-a-machine-learning-problem-989599029109

by Aryan Mehra
with
Farnaz Karimdady Sharifabad, Prasanna Vijayanathan, Chaïna Wade, Vishal Sharma and Mike Schassberger

Aim and Purpose — Problem Statement

The purpose of this article is to give insights into analyzing and predicting “out of memory” or OOM kills on the Netflix App. Unlike strong compute devices, TVs and set top boxes usually have stronger memory constraints. More importantly, the low resource availability or “out of memory” scenario is one of the common reasons for crashes/kills. We at Netflix, as a streaming service running on millions of devices, have a tremendous amount of data about device capabilities/characteristics and runtime data in our big data platform. With large data, comes the opportunity to leverage the data for predictive and classification based analysis. Specifically, if we are able to predict or analyze the Out of Memory kills, we can take device specific actions to pre-emptively lower the performance in favor of not crashing — aiming to give the user the ultimate Netflix Experience within the “performance vs pre-emptive action” tradeoff limitations. A major advantage of prediction and taking pre-emptive action, is the fact that we can take actions to better the user experience.

This is done by first elaborating on the dataset curation stage — specially focussing on device capabilities and OOM kill related memory readings. We also highlight steps and guidelines for exploratory analysis and prediction to understand Out of Memory kills on a sample set of devices. Since memory management is not something one usually associates with classification problems, this blog focuses on formulating the problem as an ML problem and the data engineering that goes along with it. We also explore graphical analysis of the labeled dataset and suggest some feature engineering and accuracy measures for future exploration.

Challenges of Dataset Curation and Labeling

Unlike other Machine Learning tasks, OOM kill prediction is tricky because the dataset will be polled from different sources — device characteristics come from our on-field knowledge and runtime memory data comes from real-time user data pushed to our servers.

Secondly, and more importantly, the sheer volume of the runtime data is a lot. Several devices running Netflix will log memory usage at fixed intervals. Since the Netflix App does not get killed very often (fortunately!), this means most of these entries represent normal/ideal/as expected runtime states. The dataset will thus be very biased/skewed. We will soon see how we actually label which entries are erroneous and which are not.

Dataset Features and Components

The schema figure above describes the two components of the dataset — device capabilities/characteristics and runtime memory data. When joined together based on attributes that can uniquely match the memory entry with its device’s capabilities. These attributes may be different for different streaming services — for us at Netflix, this is a combination of the device type, app session ID and software development kit version (SDK version). We now explore each of these components individually, while highlighting the nuances of the data pipeline and pre-processing.

Device Capabilities

All the device capabilities may not reside in one source table — requiring multiple if not several joins to gather the data. While creating the device capability table, we decided to primary index it through a composite key of (device type ID, SDK version). So given these two attributes, Netflix can uniquely identify several of the device capabilities. Some nuances while creating this dataset come from the on-field domain knowledge of our engineers. Some features (as an example) include Device Type ID, SDK Version, Buffer Sizes, Cache Capacities, UI resolution, Chipset Manufacturer and Brand.

Major Milestones in Data Engineering for Device Characteristics

Structuring the data in an ML-consumable format: The device capability data needed for the prediction was distributed in over three different schemas across the Big Data Platform. Joining them together and building a single indexable schema that can directly become a part of a bigger data pipeline is a big milestone.

Dealing with ambiguities and missing data: Sometimes the entries in BDP are contaminated with testing entries and NULL values, along with ambiguous values that have no meaning or just simply contradictory values due to unreal test environments. We deal with all of this by a simple majority voting (statistical mode) on the view that is indexed by the device type ID and SDK version from the user query. We thus verify the hypothesis that actual device characteristics are always in majority in the data lake.

Incorporating On-site and field knowledge of devices and engineers: This is probably the single most important achievement of the task because some of the features mentioned above (and some of the ones redacted) involved engineering the features manually. Example: Missing values or NULL values might mean the absence of a flag or feature in some attribute, while it might require extra tasks in others. So if we have a missing value for a feature flag, that might mean “False”, whereas a missing value in some buffer size feature might mean that we need subqueries to fetch and fill the missing data.

Runtime Memory, OOM Kill Data and ground truth labeling

Runtime data is always increasing and constantly evolving. The tables and views we use are refreshed every 24 hours and joining between any two such tables will lead to tremendous compute and time resources. In order to curate this part of the dataset, we suggest some tips given below (written from the point of view of SparkSQL-like distributed query processors):

  • Filtering the entries (conditions) before JOIN, and for this purpose using WHERE and LEFT JOIN clauses carefully. Conditions that eliminate entries after the join operation are much more expensive than when elimination happens before the join. It also prevents the system running out of memory during execution of the query.
  • Restricting Testing and Analysis to one day and device at a time. It is always good to pick a single high frequency day like New Years, or Memorial day, etc. to increase frequency counts and get normalized distributions across various features.
  • Striking a balance between driver and executor memory configurations in SparkSQL-like systems. Too high allocations may fail and restrict system processes. Too low memory allocations may fail at the time of a local collect or when the driver tries to accumulate the results.

Labeling the data — Ground Truth

An important aspect of the dataset is to understand what features will be available to us at inference time. Thus memory data (that contains the navigational level and memory reading) can be labeled using the OOM kill data, but the latter cannot be reflected in the input features. The best way to do this is to use a sliding window approach where we label the memory readings of the sessions in a fixed window before the OOM kill as erroneous, and the rest of the entries as non-erroneous. In order to make the labeling more granular, and bring more variation in a binary classification model, we propose a graded window approach as explained by the image below. Basically, it assigns higher levels to memory readings closer to the OOM kill, making it a multi-class classification problem. Level 4 is the most near to the OOM kill (range of 2 minutes), whereas Level 0 is beyond 5 minutes of any OOM kill ahead of it. We note here that the device and session of the OOM kill instance and the memory reading needs to match for the sanity of the labeling. Later the confusion matrix and model’s results can later be reduced to binary if need be.

Summary of OOM Prediction — Problem Formulation

The dataset now consists of several entries — each of which has certain runtime features (navigational level and memory reading in our case) and device characteristics (a mix of over 15 features that may be numerical, boolean or categorical). The output variable is the graded or ungraded classification variable which is labeled in accordance with the section above — primarily based on the nearness of the memory reading stamp to the OOM kill. Now we can use any multi-class classification algorithm — ANNs, XGBoost, AdaBoost, ElasticNet with softmax etc. Thus we have successfully formulated the problem of OOM kill prediction for a device streaming Netflix.

Data Analysis and Observations

Without diving very deep into the actual devices and results of the classification, we now show some examples of how we could use the structured data for some preliminary analysis and make observations. We do so by just looking at the peak of OOM kills in a distribution over the memory readings within 5 minutes prior to the kill.

Different device types

From the graph above, we show how even without doing any modeling, the structured data can give us immense knowledge about the memory domain. For example, the early peaks (marked in red) are mostly crashes not visible to users, but were marked erroneously as user-facing crashes. The peaks marked in green are real user-facing crashes. Device 2 is an example of a sharp peak towards the higher memory range, with a decline that is sharp and almost no entries after the peak ends. Hence, for Device 1 and 2, the task of OOM prediction is relatively easier, after which we can start taking pre-emptive action to lower our memory usage. In case of Device 3, we have a normalized gaussian like distribution — indicating that the OOM kills occur all over, with the decline not being very sharp, and the crashes happen all over in an approximately normalized fashion.

Feature Engineering, Accuracy Measures and Future Work Directions

We leave the reader with some ideas to engineer more features and accuracy measures specific to the memory usage context in a streaming environment for a device.

  • We could manually engineer features on memory to utilize the time-series nature of the memory value when aggregated over a user’s session. Suggestions include a running mean of the last 3 values, or a difference of the current entry and running exponential average. The analysis of the growth of memory by the user could give insights into whether the kill was caused by in-app streaming demand, or due to external factors.
  • Another feature could be the time spent in different navigational levels. Internally, the app caches several pre-fetched data, images, descriptions etc, and the time spent in the level could indicate whether or not those caches are cleared.
  • When deciding on accuracy measures for the problem, it is important to analyze the distinction between false positives and false negatives. The dataset (fortunately for Netflix!) will be highly biased — as an example, over 99.1% entries are non-kill related. In general, false negatives (not predicting the kill when actually the app is killed) are more detrimental than false positives (predicting a kill even though the app could have survived). This is because since the kill happens rarely (0.9% in this example), even if we end up lowering memory and performance 2% of the time and catch almost all the 0.9% OOM kills, we will have eliminated approximately. all OOM kills with the tradeoff of lowering the performance/clearing the cache an extra 1.1% of the time (False Positives).

Summary

This post has focussed on throwing light on dataset curation and engineering when dealing with memory and low resource crashes for streaming services on device. We also cover the distinction between non-changing attributes and runtime attributes and strategies to join them to make one cohesive dataset for OOM kill prediction. We covered labeling strategies that involved graded window based approaches and explored some graphical analysis on the structured dataset. Finally, we ended with some future directions and possibilities for feature engineering and accuracy measurements in the memory context.

Stay tuned for further posts on memory management and the use of ML modeling to deal with systemic and low latency data collected at the device level. We will try to soon post results of our models on the dataset that we have created.

Acknowledgements
I would like to thank the members of various teams — Partner Engineering (Mihir Daftari, Akshay Garg), TVUI team (Andrew Eichacker, Jason Munning), Streaming Data Team, Big Data Platform Team, Device Ecosystem Team and Data Science Engineering Team (Chris Pham), for all their support.


Formulating ‘Out of Memory Kill’ Prediction on the Netflix App as a Machine Learning Problem was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Netflix Content Engineering makes a federated graph searchable (Part 2)

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-content-engineering-makes-a-federated-graph-searchable-part-2-49348511c06c

By Alex Hutter, Falguni Jhaveri, and Senthil Sayeebaba

In a previous post, we described the indexing architecture of Studio Search and how we scaled the architecture by building a config-driven self-service platform that allowed teams in Content Engineering to spin up search indices easily.

This post will discuss how Studio Search supports querying the data available in these indices.

Data consumption from Studio Search DGS

Introduction

When we say Content Engineering teams are interested in searching against the federated graph, the use-case is mainly focused on known-item search (a user has an item or items in mind they are trying to view or navigate to but need to use an external information system to locate them) and data retrieval (typically the data is structured and there is no ambiguity as to whether a particular record matches the given search criteria except in the case of textual fields where there is limited ambiguity) within a vertical search experience (focus on enabling search for a specific sub-graph within the big federated graph)

Query Language

Given the above scope of the search (vertical search experience with a focus on known-item search and data retrieval), one of the first things we had to design was a language that users can use to easily express their search criteria. With a goal of abstracting users away from the complexity of interacting with Elasticsearch directly, we landed on a custom Studio Search DSL reminiscent of SQL.

The DSL supports specifying the search criteria as comparison expressions or inclusion/exclusion filters. The filter expressions can be combined together through logical operators (AND, OR, NOT) and grouped together through parentheses.

Sample Syntax

For example, to find all comedies from France or Spain, the query would be:

(genre == ‘comedy’) AND (country ANY [‘FR’, ‘SP’])

We used ANTLR to build the grammar for the Query DSL. From the grammar, ANTLR generates a parser that can walk the parse tree. By extending the ANTLR generated parse tree visitor, we were able to implement an Elasticsearch Query Builder component with the logic to generate the Elasticsearch query corresponding to the custom search query.

If you are familiar with Elasticsearch, then you might be familiar with how complicated it can be to build up the correct Elasticsearch query for complex queries, especially if the index includes nested JSON documents which add an additional layer of complexity with respect to building nested queries (Incorrectly constructed nested queries can lead to Elasticsearch quietly returning wrong results). By exposing just a generic query language to the users and isolating the complexity to just our Elasticsearch Query Builder, we have been able to empower users to write search queries without requiring familiarity with Elasticsearch. This also leaves the possibility of swapping Elasticsearch with a different search engine in the future.

One other challenge for the users when writing the search queries is to understand the fields that are available in the index and the associated types. Since we index the data as-is from the federated graph, the indexing query itself acts as self-documentation. For example, given the indexing query –

Sample GraphQL query

To find movies based on the actors’ roles, the query filter is simply

`actors.role == ‘actor’`

Text Search

While the search DSL provides a powerful way to help narrow the scope of the search queries, users can also find documents in the index through free form text — either with just the input text or in combination with a filter expression in the search DSL. Behind the scenes during the indexing process, we have configured the Elasticsearch index with the appropriate analyzers to ensure that the most relevant matches for the input text are returned in the results.

Hydration through Federation

Given the wide adoption of the federated gateway within Content Engineering, we decided to implement the Studio Search service as a DGS (Domain Graph Service) that integrated with the federated gateway. The search APIs (besides search, we have other APIs to support faceted search, typeahead suggestions, etc) are exposed as GraphQL queries within the federated graph.

This integration with the federation gateway allows the search DGS to just return the matching entity keys from the search index instead of the whole matching document(s). Through the power of federation, users are then able to hydrate the search results with any data available in the federated graph. This allows the search indices to be lean by indexing only the fields necessary for the search experience and at the same time provides complete flexibility for the users to fetch any data available in the federated graph instead of being restricted to just the data available in the search index.

Example

Sample Search query

In the above example, users are able to fetch the production schedule as part of the search results even though the search index doesn’t hold that data.

Authorization

With the API to query the data in the search indices in place, the next thing we needed to tackle was figuring out how to secure access to the data in the indices. With several of the indices including sensitive data, and the source teams already having restrictive access policies in place to secure the data they own, the search indices which hosted a secondary copy of the source data needed to be secured as well.

We chose to apply “late binding” (or “query time”) security — on every incoming search query, we make an API call to the centralized access policy server with context including the identity of the caller making the request and the search index they are trying to access. The policy server evaluates the access policies defined by the source teams and returns a set of constraints. Ex. The caller has access to Movies where the type is ‘licensed’ (The caller does not have access to Netflix-produced content, but just the licensed content). The constraints are then translated to a set of filter expressions in the search query DSL format (Ex. movie.type == ‘licensed’) and combined with the user-specified search filter with a logical AND operator to form a new search query that then gets executed against the index.

By adding on the access constraints as additional filters before executing the query, we ensure that the user gets back only the data they have access to from the underlying search index. This also allows source teams to evolve their access policies independently knowing that the correct constraints will be applied at query time.

Customizing Search

With the decision to build Studio Search as a GraphQL service using the DGS framework and relying on federation for hydrating results, onboarding new search indices required updating various portions of the GraphQL schema (the enum of available indices, the union of all federated result types, etc.) manually and registering the updated schema with the federated gateway schema registry before the new index was available for querying through the GraphQL API.

Additionally, there are additional configurations that users can provide while onboarding a new index to customize the search behavior for their applications — including scripts to tune the relevance scoring algorithm, configuring fields for faceted search, and configuration to control the behavior of typeahead suggestions, etc. These configurations were initially stored in our source control repository which meant any changes to the configuration of any index required a deployment for the changes to take effect.

Recently, we automated this process as well by moving all the configurations to a persistence store and leveraging the power of dynamic schemas in the DGS framework. Users can now use an API to create/update search index configuration and we are able to validate the provided configuration, generate the updated DGS schema dynamically and register the updated schema with the federated gateway schema registry immediately. All configuration changes are reflected immediately in subsequent search queries.

Example configuration:

Sample Search configuration

UI Components

While the primary goal of Studio Search was to build an easy-to-use self-service platform to enable searching against the federated graph, another important goal was to help the Content Engineering teams deliver a visually consistent search experience to the users of their tools and workflows. To that end, we partnered with our UI/UX teams to build a robust set of opinionated presentational components. Studio Search’s offering of drop-in UI components based on our Hawkins design system for typeahead suggestion, faceted search, and extensive filtering ensure visual and behavioral consistency across the suite of applications within Content Engineering. Below are a couple of examples.

Typeahead Search Component

Faceted Search Component

What’s Next?

As a config-driven, self-serve platform, Studio Search has already been able to empower Content Engineering teams to quickly enable the functionality to search against the Content federated graph within their suite of applications. But, we are not quite done yet! There are several upcoming features that are in various stages of development including

  • Leveraging the percolate query functionality in Elasticsearch to support a notifications feature (users save their search criteria and are notified when documents are updated in the index that matches their search criteria)
  • Add support for metrics aggregation in our APIs
  • Leverage the managed delivery functionality in Spinnaker to move to a declarative model for onboarding the search indices
  • And, plenty more

If this sounds interesting to you, connect with us on LinkedIn.

Credits

Thanks to Anoop Panicker, Bo Lei, Charles Zhao, Chris Dhanaraj, Hemamalini Kannan, Jim Isaacs, Johnny Chang, Kasturi Chatterjee, Kishore Banala, Kevin Zhu, Tom Lee, Tongliang Liu, Utkarsh Shrivastava, Vince Bello, Vinod Viswanathan, Yucheng Zeng


How Netflix Content Engineering makes a federated graph searchable (Part 2) was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.