Tag Archives: Elasticsearch

Reverse Searching Netflix’s Federated Graph

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/reverse-searching-netflixs-federated-graph-222ac5d23576

By Ricky Gardiner, Alex Hutter, and Katie Lefevre

Since our previous posts regarding Content Engineering’s role in enabling search functionality within Netflix’s federated graph (the first post, where we identify the issue and elaborate on the indexing architecture, and the second post, where we detail how we facilitate querying) there have been significant developments. We’ve opened up Studio Search beyond Content Engineering to the entirety of the Engineering organization at Netflix and renamed it Graph Search. There are over 100 applications integrated with Graph Search and nearly 50 indices we support. We continue to add functionality to the service. As promised in the previous post, we’ll share how we partnered with one of our Studio Engineering teams to build reverse search. Reverse search inverts the standard querying pattern: rather than finding documents that match a query, it finds queries that match a document.


Tiffany is a Netflix Post Production Coordinator who oversees a slate of nearly a dozen movies in various states of pre-production, production, and post-production. Tiffany and her team work with various cross-functional partners, including Legal, Creative, and Title Launch Management, tracking the progression and health of her movies.

So Tiffany subscribes to notifications and calendar updates specific to certain areas of concern, like “movies shooting in Mexico City which don’t have a key role assigned”, or “movies that are at risk of not being ready by their launch date”.

Tiffany is not subscribing to updates of particular movies, but subscribing to queries that return a dynamic subset of movies. This poses an issue for those of us responsible for sending her those notifications. When a movie changes, we don’t know who to notify, since there’s no association between employees and the movies they’re interested in.

We could save these searches, and then repeatedly query for the results of every search, but because we’re part of a large federated graph, this would have heavy traffic implications for every service we’re connected to. We’d have to decide if we wanted timely notifications or less load on our graph.

If we could answer the question “would this movie be returned by this query”, we could re-query based on change events with laser precision and not impact the broader ecosystem.

The Solution

Graph Search is built on top of Elasticsearch, which has the exact capabilities we require:

Instead of taking a search (like “spanish-language movies shot in Mexico City”) and returning the documents that match (One for Roma, one for Familia), a percolate query takes a document (one for Roma) and returns the searches that match that document, like “spanish-language movies” and “scripted dramas”.

We’ve communicated this functionality as the ability to save a search, called SavedSearches, which is a persisted filter on an existing index.

type SavedSearch {
id: ID!
filter: String
index: SearchIndex!

That filter, written in Graph Search DSL, is converted to an Elasticsearch query and indexed in a percolator field. To learn more about Graph Search DSL and why we created it rather than using Elasticsearch query language directly, see the Query Language section of “How Netflix Content Engineering makes a federated graph searchable (Part 2)”.

We’ve called the process of finding matching saved searches ReverseSearch. This is the most straightforward part of this offering. We added a new resolver to the Domain Graph Service (DGS) for Graph Search. It takes the index of interest and a document, and returns all the saved searches that match the document by issuing a percolate query.

Query for retrieving all the registered saved searches, in a given index,
based on a provided document. The document in this case is an ElasticSearch
document that is generated based on the configuration of the index.
after: String,
document: JSON!,
first: Int!,
index: SearchIndex!): SavedSearchConnection

Persisting a SavedSearch is implemented as a new mutation on the Graph Search DGS. This ultimately triggers the indexing of an Elasticsearch query in a percolator field.

Mutation for registering and updating a saved search. They need to be updated
any time a user adjusts their search criteria.
upsertSavedSearch(input: UpsertSavedSearchInput!): UpsertSavedSearchPayload

Supporting percolator fields fundamentally changed how we provision the indexing pipelines for Graph Search (see Architecture section of How Netflix Content Engineering makes a federated graph searchable). Rather than having a single indexing pipeline per Graph Search index we now have two: one to index documents and one to index saved searches to a percolate index. We chose to add percolator fields to a separate index in order to tune performance for the two types of queries separately.

Elasticsearch requires the percolate index to have a mapping that matches the structure of the queries it stores and therefore must match the mapping of the document index. Index templates define mappings that are applied when creating new indices. By using the index_patterns functionality of index templates, we’re able to share the mapping for the document index between the two. index_patterns also gives us an easy way to add a percolator field to every percolate index we create.

Example of document index mapping

Index pattern — application_*

"order": 1,
"index_patterns": ["application_*"],
"mappings": {
"properties": {
"movieTitle": {
"type": "keyword"
"isArchived": {
"type": "boolean"

Example of percolate index mappings

Index pattern — *_percolate

"order": 2,
"index_patterns": ["*_percolate*"],
"mappings": {
"properties": {
"percolate_query": {
"type": "percolator"

Example of generated mapping

Percolate index name is application_v1_percolate

"application_v1_percolate": {
"mappings": {
"_doc": {
"properties": {
"movieTitle": {
"type": "keyword"
"isArchived": {
"type": "boolean"
"percolate_query": {
"type": "percolator"

Percolate Indexing Pipeline

The percolate index isn’t as simple as taking the input from the GraphQL mutation, translating it to an Elasticsearch query, and indexing it. Versioning, which we’ll talk more about shortly, reared its ugly head and made things a bit more complicated. Here is the way the percolate indexing pipeline is set up.

See Data Mesh — A Data Movement and Processing Platform @ Netflix to learn more about Data Mesh.
  1. When SavedSearches are modified, we store them in our CockroachDB, and the source connector for the Cockroach database emits CDC events.
  2. A single table is shared for the storage of all SavedSearches, so the next step is filtering down to just those that are for *this* index using a filter processor.
  3. As previously mentioned, what is stored in the database is our custom Graph Search filter DSL, which is not the same as the Elasticsearch DSL, so we cannot directly index the event to the percolate index. Instead, we issue a mutation to the Graph Search DGS. The Graph Search DGS translates the DSL to an Elasticsearch query.
  4. Then we index the Elasticsearch query as a percolate field in the appropriate percolate index.
  5. The success or failure of the indexing of the SavedSearch is returned. On failure, the SavedSearch events are sent to a Dead Letter Queue (DLQ) that can be used to address any failures, such as fields referenced in the search query being removed from the index.

Now a bit on versioning to explain why the above is necessary. Imagine we’ve started tagging movies that have animals. If we want users to be able to create views of “movies with animals”, we need to add this new field to the existing search index to flag movies as such. However, the mapping in the current index doesn’t include it, so we can’t filter on it. To solve for this we have index versions.

Dalia & Forrest from the series Baby Animal Cam

When a change is made to an index definition that necessitates a new mapping, like when we add the animal tag, Graph Search creates a new version of the Elasticsearch index and a new pipeline to populate it. This new pipeline reads from a log-compacted Kafka topic in Data Mesh — this is how we can reindex the entire corpus without asking the data sources to resend all the old events. The new pipeline and the old pipeline run side by side, until the new pipeline has processed the backlog, at which point Graph Search cuts over to the version using Elasticsearch index aliases.

Creating a new index for our documents means we also need to create a new percolate index for our queries so they can have consistent index mappings. This new percolate index also needs to be backfilled when we change versions. This is why the pipeline works the way it does — we can again utilize the log compacted topics in Data Mesh to reindex the corpus of SavedSearches when we spin up a new percolate indexing pipeline.

We persist the user provided filter DSL to the database rather than immediately translating it to Elasticsearch query language. This enables us to make changes or fixes when we translate the saved search DSL to an Elasticsearch query . We can deploy those changes by creating a new version of the index as the bootstrapping process will re-translate every saved search.

Another Use Case

We hoped reverse search functionality would eventually be useful for other engineering teams. We were approached almost immediately with a problem that reverse searching could solve.

The way you make a movie can be very different based on the type of movie it is. One movie might go through a set of phases that are not applicable to another, or might need to schedule certain events that another movie doesn’t require. Instead of manually configuring the workflow for a movie based on its classifications, we should be able to define the means of classifying movies and use that to automatically assign them to workflows. But determining the classification of a movie is challenging: you could define these movie classifications based on genre alone, like “Action” or “Comedy”, but you likely require more complex definitions. Maybe it’s defined by the genre, region, format, language, or some nuanced combination thereof. The Movie Matching service provides a way to classify a movie based on any combination of matching criteria. Under the hood, the matching criteria are stored as reverse searches, and to determine which criteria a movie matches against, the movie’s document is submitted to the reverse search endpoint.

In short, reverse search is powering an externalized criteria matcher. It’s being used for movie criteria now, but since every Graph Search index is now reverse-search capable, any index could use this pattern.

A Possible Future: Subscriptions

Reverse searches also look like a promising foundation for creating more responsive UIs. Rather than fetching results once as a query, the search results could be provided via a GraphQL subscription. These subscriptions could be associated with a SavedSearch and, as index changes come in, reverse search can be used to determine when to update the set of keys returned by the subscription.

Reverse Searching Netflix’s Federated Graph was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Elasticsearch Indexing Strategy in Asset Management Platform (AMP)

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/elasticsearch-indexing-strategy-in-asset-management-platform-amp-99332231e541

By Burak Bacioglu, Meenakshi Jindal

Asset Management at Netflix

At Netflix, all of our digital media assets (images, videos, text, etc.) are stored in secure storage layers. We built an asset management platform (AMP), codenamed Amsterdam, in order to easily organize and manage the metadata, schema, relations and permissions of these assets. It is also responsible for asset discovery, validation, sharing, and for triggering workflows.

Amsterdam service utilizes various solutions such as Cassandra, Kafka, Zookeeper, EvCache etc. In this blog, we will be focusing on how we utilize Elasticsearch for indexing and search the assets.

Amsterdam is built on top of three storage layers.

The first layer, Cassandra, is the source of truth for us. It consists of close to a hundred tables (column families) , the majority of which are reverse indices to help query the assets in a more optimized way.

The second layer is Elasticsearch, which is used to discover assets based on user queries. This is the layer we’d like to focus on in this blog. And more specifically, how we index and query over 7TB of data in a read-heavy and continuously growing environment and keep our Elasticsearch cluster healthy.

And finally, we have an Apache Iceberg layer which stores assets in a denormalized fashion to help answer heavy queries for analytics use cases.

Elasticsearch Integration

Elasticsearch is one of the best and widely adopted distributed, open source search and analytics engines for all types of data, including textual, numerical, geospatial, structured or unstructured data. It provides simple APIs for creating indices, indexing or searching documents, which makes it easy to integrate. No matter whether you use in-house deployments or hosted solutions, you can quickly stand up an Elasticsearch cluster, and start integrating it from your application using one of the clients provided based on your programming language (Elasticsearch has a rich set of languages it supports; Java, Python, .Net, Ruby, Perl etc.).

One of the first decisions when integrating with Elasticsearch is designing the indices, their settings and mappings. Settings include index specific properties like number of shards, analyzers, etc. Mapping is used to define how documents and their fields are supposed to be stored and indexed. You define the data types for each field, or use dynamic mapping for unknown fields. You can find more information on settings and mappings on Elasticsearch website.

Most applications in content and studio engineering at Netflix deal with assets; such as videos, images, text, etc. These applications are built on a microservices architecture, and the Asset Management Platform provides asset management to those dozens of services for various asset types. Each asset type is defined in a centralized schema registry service responsible for storing asset type taxonomies and relationships. Therefore, it initially seemed natural to create a different index for each asset type. When creating index mappings in Elasticsearch, one has to define the data type for each field. Since different asset types could potentially have fields with the same name but with different data types; having a separate index for each type would prevent such type collisions. Therefore we created around a dozen indices per asset type with fields mapping based on the asset type schema. As we onboarded new applications to our platform, we kept creating new indices for the new asset types. We have a schema management microservice which is used to store the taxonomy of each asset type; and this programmatically created new indices whenever new asset types were created in this service. All the assets of a specific type use the specific index defined for that asset type to create or update the asset document.

Fig 1. Indices based on Asset Types

As Netflix is now producing significantly more originals than it used to when we started this project a few years ago, not only did the number of assets grow dramatically but also the number of asset types grew from dozens to several thousands. Hence the number of Elasticsearch indices (per asset type) as well as asset document indexing or searching RPS (requests per second) grew over time. Although this indexing strategy worked smoothly for a while, interesting challenges started coming up and we started to notice performance issues over time. We started to observe CPU spikes, long running queries, instances going yellow/red in status.

Usually the first thing to try is to scale up the Elasticsearch cluster horizontally by increasing the number of nodes or vertically by upgrading instance types. We tried both, and in many cases it helps, but sometimes it is a short term fix and the performance problems come back after a while; and it did for us. You know it is time to dig deeper to understand the root cause of it.

It was time to take a step back and reevaluate our ES data indexing and sharding strategy. Each index was assigned a fixed number of 6 shards and 2 replicas (defined in the template of the index). With the increase in the number of asset types, we ended up having approximately 900 indices (thus 16200 shards). Some of these indices had millions of documents, whereas many of them were very small with only thousands of documents. We found the root cause of the CPU spike was unbalanced shards size. Elasticsearch nodes storing those large shards became hot spots and queries hitting those instances were timing out or very slow due to busy threads.

We changed our indexing strategy and decided to create indices based on time buckets, rather than asset types. What this means is, assets created between t1 and t2 would go to the T1 bucket, assets created between t2 and t3 would go to the T2 bucket, and so on. So instead of persisting assets based on their asset types, we would use their ids (thus its creation time; because the asset id is a time based uuid generated at the asset creation) to determine which time bucket the document should be persisted to. Elasticsearch recommends each shard to be under 65GB (AWS recommends them to be under 50GB), so we could create time based indices where each index holds somewhere between 16–20GB of data, giving some buffer for data growth. Existing assets can be redistributed appropriately to these precreated shards, and new assets would always go to the current index. Once the size of the current index exceeds a certain threshold (16GB), we would create a new index for the next bucket (minute/hour/day) and start indexing assets to the new index created. We created an index template in Elasticsearch so that the new indices always use the same settings and mappings stored in the template.

We chose to index all versions of an asset in the the same bucket – the one that keeps the first version. Therefore, even though new assets can never be persisted to an old index (due to our time based id generation logic, they always go to the latest/current index); existing assets can be updated, causing additional documents for those new asset versions to be created in those older indices. Therefore we chose a lower threshold for the roll over so that older shards would still be well under 50GB even after those updates.

Fig 2. Indices based on Time Buckets

For searching purposes, we have a single read alias that points to all indices created. When performing a query, we always execute it on the alias. This ensures that no matter where documents are, all documents matching the query will be returned. For indexing/updating documents, though, we cannot use an alias, we use the exact index name to perform index operations.

To avoid the ES query for the list of indices for every indexing request, we keep the list of indices in a distributed cache. We refresh this cache whenever a new index is created for the next time bucket, so that new assets will be indexed appropriately. For every asset indexing request, we look at the cache to determine the corresponding time bucket index for the asset. The cache stores all time-based indices in a sorted order (for simplicity we named our indices based on their starting time in the format yyyyMMddHHmmss) so that we can easily determine exactly which index should be used for asset indexing based on the asset creation time. Without using the time bucket strategy, the same asset could have been indexed into multiple indices because Elasticsearch doc id is unique per index and not the cluster. Or we would have to perform two API calls, first to identify the specific index and then to perform the asset update/delete operation on that specific index.

It is still possible to exceed 50GB in those older indices if millions of updates occur within that time bucket index. To address this issue, we added an API that would split an old index into two programmatically. In order to split a given bucket T1 (which stores all assets between t1 and t2) into two, we choose a time t1.5 between t1 and t2, create a new bucket T1_5, and reindex all assets created between t1.5 and t2 from T1 into this new bucket. While the reindexing is happening, queries / reads are still answered by T1, so any new document created (via asset updates) would be dual-written into T1 and T1.5, provided that their timestamp falls between t1.5 and t2. Finally, once the reindexing is complete, we enable reads from T1_5, stop the dual write and delete reindexed documents from T1.

In fact, Elasticsearch provides an index rollover feature to handle the growing indicex problem https://www.elastic.co/guide/en/elasticsearch/reference/6.0/indices-rollover-index.html. With this feature, a new index is created when the current index size hits a threshold, and through a write alias, the index calls will point to the new index created. That means, all future index calls would go to the new index created. However, this would create a problem for our update flow use case, because we would have to query multiple indices to determine which index contains a particular document so that we can update it appropriately. Because the calls to Elasticsearch may not be sequential, meaning, an asset a1 created at T1 can be indexed after another asset a2 created at T2 where T2>T1, the older asset a1 can end up in the newer index while the newer asset a2 is persisted in the old index. In our current implementation, however, by simply looking at the asset id (and asset creation time), we can easily find out which index to go to and it is always deterministic.

One thing to mention is, Elasticsearch has a default limit of 1000 fields per index. If we index all types to a single index, wouldn’t we easily exceed this number? And what about the data type collisions we mentioned above? Having a single index for all data types could potentially cause collisions when two asset types define different data types for the same field. We also changed our mapping strategy to overcome these issues. Instead of creating a separate Elasticsearch field for each metadata field defined in an asset type, we created a single nested type with a mandatory field called `key`, which represents the name of the field on the asset type, and a handful of data-type specific fields, such as: `string_value`, `long_value`, `date_value`, etc. We would populate the corresponding data-type specific field based on the actual data type of the value. Below you can see a part of the index mapping defined in our template, and an example from a document (asset) which has four metadata fields:

Fig 3. Snippet of the index mapping
Fig 4. Snippet of nested metadata field on a stored document

As you see above, all asset properties go under the same nested field `metadata` with a mandatory `key` field, and the corresponding data-type specific field. This ensures that no matter how many asset types or properties are indexed, we would always have a fixed number of fields defined in the mapping. When searching for these fields, instead of querying for a single value (cameraId == 42323243), we perform a nested query where we query for both key and the value (key == cameraId AND long_value == 42323243). For more information on nested queries, please refer to this link.

Fig 5. Search/Indexing RPS

After these changes, the indices we created are now balanced in terms of data size. CPU utilization is down from an average of 70% to 10%. In addition, we are able to reduce the refresh interval time on these indices from our earlier setting 30 seconds to 1 sec in order to support use cases like read after write, which enables users to search and get a document after a second it was created

Fig 6. CPU Spike with Old indexing strategy
Fig 7. CPU Usage with New indexing strategy

We had to do a one time migration of the existing documents to the new indices. Thankfully we already have a framework in place that can query all assets from Cassandra and index them in Elasticsearch. Since doing full table scans in Cassandra is not generally recommended on large tables (due to potential timeouts), our cassandra schema contains several reverse indices that help us query all data efficiently. We also utilize Kafka to process these assets asynchronously without impacting our real time traffic. This infrastructure is used not only to index assets to Elasticsearch, but also to perform administrative operations on all or some assets, such as bulk updating assets, scanning / fixing problems on them, etc. Since we only focused on Elasticsearch indexing in this blog, we are planning to create another blog to talk about this infrastructure later.

Elasticsearch Indexing Strategy in Asset Management Platform (AMP) was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Reprocessing Pipeline in Asset Management Platform @Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-reprocessing-pipeline-in-asset-management-platform-netflix-46fe225c35c9

By Meenakshi Jindal


At Netflix, we built the asset management platform (AMP) as a centralized service to organize, store and discover the digital media assets created during the movie production. Studio applications use this service to store their media assets, which then goes through an asset cycle of schema validation, versioning, access control, sharing, triggering configured workflows like inspection, proxy generation etc. This platform has evolved from supporting studio applications to data science applications, machine-learning applications to discover the assets metadata, and build various data facts.

During this evolution, quite often we receive requests to update the existing assets metadata or add new metadata for the new features added. This pattern grows over time when we need to access and update the existing assets metadata. Hence we built the data pipeline that can be used to extract the existing assets metadata and process it specifically to each new use case. This framework allowed us to evolve and adapt the application to any unpredictable inevitable changes requested by our platform clients without any downtime. Production assets operations are performed in parallel with older data reprocessing without any service downtime. Some of the common supported data reprocessing use cases are listed below.

Production Use Cases

  • Real-Time APIs (backed by the Cassandra database) for asset metadata access don’t fit analytics use cases by data science or machine learning teams. We build the data pipeline to persist the assets data in the iceberg in parallel with cassandra and elasticsearch DB. But to build the data facts, we need the complete data set in the iceberg and not just the new. Hence the existing assets data was read and copied to the iceberg tables without any production downtime.
  • Asset versioning scheme is evolved to support the major and minor version of assets metadata and relations update. This feature support required a significant update in the data table design (which includes new tables and updating existing table columns). Existing data got updated to be backward compatible without impacting the existing running production traffic.
  • Elasticsearch version upgrade which includes backward incompatible changes, so all the assets data is read from the primary source of truth and reindexed again in the new indices.
  • Data Sharding strategy in elasticsearch is updated to provide low search latency (as described in blog post)
  • Design of new Cassandra reverse indices to support different sets of queries.
  • Automated workflows are configured for media assets (like inspection) and these workflows are required to be triggered for old existing assets too.
  • Assets Schema got evolved that required reindexing all assets data again in ElasticSearch to support search/stats queries on new fields.
  • Bulk deletion of assets related to titles for which license is expired.
  • Updating or Adding metadata to existing assets because of some regressions in client application/within service itself.

Data Reprocessing Pipeline Flow

Figure 1. Data Reprocessing Pipeline Flow

Data Extractor

Cassandra is the primary data store of the asset management service. With SQL datastore, it was easy to access the existing data with pagination regardless of the data size. But there is no such concept of pagination with No-SQL datastores like Cassandra. Some features are provided by Cassandra (with newer versions) to support pagination like pagingstate, COPY, but each one of them has some limitations. To avoid dependency on data store limitations, we designed our data tables such that the data can be read with pagination in a performant way.

Mainly we read the assets data either by asset schema types or time bucket based on asset creation time. Data sharding completely based on the asset type may have created the wide rows considering some types like VIDEO may have many more assets compared to others like TEXT. Hence, we used the asset types and time buckets based on asset creation date for data sharding across the Cassandra nodes. Following is the example of tables primary and clustering keys defined:

Figure 2. Cassandra Table Design

Based on the asset type, first time buckets are fetched which depends on the creation time of assets. Then using the time buckets and asset types, a list of assets ids in those buckets are fetched. Asset Id is defined as a cassandra Timeuuid data type. We use Timeuuids for AssetId because it can be sorted and then used to support pagination. Any sortable Id can be used as the table primary key to support the pagination. Based on the page size e.g. N, first N rows are fetched from the table. Next page is fetched from the table with limit N and asset id < last asset id fetched.

Figure 3. Cassandra Data Fetch Query

Data layers can be designed based on different business specific entities which can be used to read the data by those buckets. But the primary id of the table should be sortable to support the pagination.

Sometimes we have to reprocess a specific set of assets only based on some field in the payload. We can use Cassandra to read assets based on time or an asset type and then further filter from those assets which satisfy the user’s criteria. Instead we use Elasticsearch to search those assets which are more performant.

After reading the asset ids using one of the ways, an event is created per asset id to be processed synchronously or asynchronously based on the use case. For asynchronous processing, events are sent to Apache Kafka topics to be processed.

Data Processor

Data processor is designed to process the data differently based on the use case. Hence, different processors are defined which can be extended based on the evolving requirements. Data can be processed synchronously or asynchronously.

Synchronous Flow: Depending on the event type, the specific processor can be directly invoked on the filtered data. Generally, this flow is used for small datasets.

Asynchronous Flow: Data processor consumes the data events sent by the data extractor. Apache Kafka topic is configured as a message broker. Depending on the use case, we have to control the number of events processed in a time unit e.g. to reindex all the data in elasticsearch because of template change, it is preferred to re-index the data at certain RPS to avoid any impact on the running production workflow. Async processing has the benefit to control the flow of event processing with Kafka consumers count or with controlling thread pool size on each consumer. Event processing can also be stopped at any time by disabling the consumers in case production flow gets any impact with this parallel data processing. For fast processing of the events, we use different settings of Kafka consumer and Java executor thread pool. We poll records in bulk from Kafka topics, and process them asynchronously with multiple threads. Depending on the processor type, events can be processed at high scale with right settings of consumer poll size and thread pool.

Each of these use cases mentioned above looks different, but they all need the same reprocessing flow to extract the old data to be processed. Many applications design data pipelines for the processing of the new data; but setting up such a data processing pipeline for the existing data supports handling the new features by just implementing a new processor. This pipeline can be thoughtfully triggered anytime with the data filters and data processor type (which defines the actual action to be performed).

Error Handling

Errors are part of software development. But with this framework, it has to be designed more carefully as bulk data reprocessing will be done in parallel with the production traffic. We have set up the different clusters of data extractor and processor from the main Production cluster to process the older assets data to avoid any impact of the assets operations live in production. Such clusters may have different configurations of thread pools to read and write data from database, logging levels and connection configuration with external dependencies.

Figure 4: Processing clusters

Data processors are designed to continue processing the events even in case of some errors for eg. There are some unexpected payloads in old data. In case of any error in the processing of an event, Kafka consumers acknowledge that event is processed and send those events to a different queue after some retries. Otherwise Kafka consumers will continue trying to process the same message again and block the processing of other events in the topic. We reprocess data in the dead letter queue after fixing the root cause of the issue. We collect the failure metrics to be checked and fixed later. We have set up the alerts and continuously monitor the production traffic which can be impacted because of the bulk old data reprocessing. In case any impact is noticed, we should be able to slow down or stop the data reprocessing at any time. With different data processor clusters, this can be easily done by reducing the number of instances processing the events or reducing the cluster to 0 instances in case we need a complete halt.

Best Practices

  • Depending on existing data size and use case, processing may impact the production flow. So identify the optimal event processing limits and accordingly configure the consumer threads.
  • If the data processor is calling any external services, check the processing limits of those services because bulk data processing may create unexpected traffic to those services and cause scalability/availability issues.
  • Backend processing may take time from seconds to minutes. Update the Kafka consumer timeout settings accordingly otherwise different consumer may try to process the same event again after processing timeout.
  • Verify the data processor module with a small data set first, before trigger processing of the complete data set.
  • Collect the success and error processing metrics because sometimes old data may have some edge cases not handled correctly in the processors. We are using the Netflix Atlas framework to collect and monitor such metrics.


Burak Bacioglu and other members of the Asset Management platform team have contributed in the design and development of this data reprocessing pipeline.

Data Reprocessing Pipeline in Asset Management Platform @Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How GitHub Docs’ new search works

Post Syndicated from Peter Bengtsson original https://github.blog/2023-03-09-how-github-docs-new-search-works/

Until recently, the site-search on GitHub Docs was an in-memory solution. While it was a great starting point, we ultimately needed a solution that would scale with our growing needs, so we rewrote it in Elasticsearch. In this blog post, we share how the implementation works and how you can impress users with your site-search by doing the same.

How it started

Our previous solution couldn’t scale because it required loading all of the records into memory (the Node.js code that Express.js runs). This means that we’d need to scrape all the searchable text—title, headings, breadcrumbs, content, etc.—and store that data somewhere so it could quickly be loaded in the Node process runtime. To store the data, we used the Git repository itself, so when we built the Docker image run in Azure, it would have access to all the searchable text from disk.

It would take a little while to load in all the searchable text, so we generated a serialized index from the searchable text, and stored that on disk too. This solution is OKish if your data is small, but we have eight languages and five different versions per language.

Running Elasticsearch locally

The reason we picked Elasticsearch over other alternatives is a story for another day, but one compelling argument is that it’s possible to run it locally on your laptop. For the majority of contributors who make copy edits, the task of installing Elasticsearch locally isn’t necessary. So by default, our /api/search Express.js middleware looks something like this:

if (process.env.ELASTICSEARCH_URL) {
  router.use('/search', search)
} else {
      target: 'https://docs.github.com',

When someone uses http://localhost:4000/api/search on their laptop (or Codespaces) it simply forwards the Elasticsearch stuff to our production server. That way, engineers (like myself) who are debugging the search engine locally can start an Elasticsearch on http://localhost:9200 and set that in their .env file. Now engineers can quickly try new search query techniques entirely with their own local Elasticsearch.

The search implementation

A core idea in our search implementation is that we make only one single query to Elasticsearch, which contains our entire specification for how we want results ranked. Rather than sending a single request, we could try a more specific search query first, then, if there are too few results, we could attempt a second, less defined search query:

// NOTE! This is NOT what we do.

let result = await client.search({ index, body: searchQueryStrict })
if (result.hits.length === 0) {
  // nothing found when being strict, try again with a loose query
  result = await client.search({ index, body: searchQueryLoose })

To ensure we get the most exact results, we use boosts and a matrix of various matching techniques. In somewhat pseudo code it looks like this:

If the query is multiple terms (we’ll explain what _explicit means later):

  { match_phrase: { title_explicit: [Object] } },
  { match_phrase: { title: [Object] } },
  { match_phrase: { headings_explicit: [Object] } },
  { match_phrase: { headings: [Object] } },
  { match_phrase: { content: [Object] } },
  { match_phrase: { content_explicit: [Object] } },
  { match: { title_explicit: [Object] } },
  { match: { headings_explicit: [Object] } },
  { match: { content_explicit: [Object] } },
  { match: { title: [Object] } },
  { match: { headings: [Object] } },
  { match: { content: [Object] } },
  { match: { title_explicit: [Object] } },
  { match: { headings_explicit: [Object] } },
  { match: { content_explicit: [Object] } },
  { match: { title: [Object] } },
  { match: { headings: [Object] } },
  { match: { content: [Object] } },
  { fuzzy: { title: [Object] } }

If the query is a single term:

  { match: { title_explicit: [Object] } },
  { match: { headings_explicit: [Object] } },
  { match: { content_explicit: [Object] } },
  { match: { title: [Object] } },
  { match: { headings: [Object] } },
  { match: { content: [Object] } },
  { fuzzy: { title: [Object] } }

Sure, it can look like a handful of queries, but Elasticsearch is fast. Most of the total time is the network time to send the query and receive the results. In fact, the time it takes to execute the entire search query (excluding the networking) hovers quite steadily at 20 milliseconds on our Elasticsearch server.

About 55% of all searches on docs.github.com are multi-term queries, for example, actions rest. Meaning that in roughly half the cases, we can use the simplified queries because we can omit things like the match_phrase parts of the total query.

Before getting into the various combinations of searches, there’s a section later about what “explicit” means. Essentially, each field is indexed twice. E.g. title and title_explicit. It’s the same content underneath but it’s tokenized differently, which affects how it matches to queries—and that difference is exploited by having different boostings.

The nodes that make up the matrix are:


  • title (the <h1> text)
  • headings (the <h2> texts)
  • content (the bulk of the article text)


  • explicit (no stemming and no synonyms)
  • regular (full Snowball stemming and possibly synonyms)

Matches: (on multi-term queries)

  • match_phrase
  • match with OR (docs that contain “foo” OR “bar”)
  • match with AND (docs that contain “foo” AND “bar”)

Each of these combinations has a unique boost number, which boosts the ranking of the matched result. The actual number doesn’t matter much, but what matters is that the boost numbers are different from each other. For example, a match on the title has a slightly higher boost than a match on the content. And a match where all words are present has a slightly higher boost than when only some of the words match. Another example: if the search term is docker action, the users would prefer to see “Creating a Docker container action” ahead of “Publishing Docker images” or “Metadata syntax for GitHub Actions.”

Each of the above nodes has a boost calculation that looks something like this:

const BOOST_PHRASE = 10.0
const BOOST_TITLE = 4.0
const BOOST_HEADINGS = 3.0
const BOOST_CONTENT = 1.0
const BOOST_AND = 2.5
const BOOST_EXPLICIT = 3.5

match_phrase: { title_explicit: { boost: BOOST_EXPLICIT * BOOST_PHRASE * BOOST_TITLE, query } },
match: { headings: { boost: BOOST_HEADINGS * BOOST_AND, query, operator: 'AND' } },

If you exclusively print out what the boost value becomes for each node in the matrix you get:

  { match_phrase: { title_explicit: 140 } },
  { match_phrase: { title: 40 } },
  { match_phrase: { headings_explicit: 105 } },
  { match_phrase: { headings: 30 } },
  { match_phrase: { content: 10 } },
  { match_phrase: { content_explicit: 35 } },
  { match: { title_explicit: 35, operator: 'AND' } },
  { match: { headings_explicit: 26.25, operator: 'AND' } },
  { match: { content_explicit: 8.75, operator: 'AND' } },
  { match: { title: 10, operator: 'AND' } },
  { match: { headings: 7.5, operator: 'AND' } },
  { match: { content: 2.5, operator: 'AND' } },
  { match: { title_explicit: 14 } },
  { match: { headings_explicit: 10.5 } },
  { match: { content_explicit: 3.5 } },
  { match: { title: 4 } },
  { match: { headings: 3 } },
  { match: { content: 1 } },
  { fuzzy: { title: 0.1 } }

So that’s what we send to Elasticsearch. It’s like a complex wishlist saying, “I want a bicycle for Christmas. But if you have a pink one, even better. And a pink one with blue stripes is even better still. Actually, bestest would be a pink one with blue stripes and a brass bell.”

What’s important, in terms of an ideal implementation and search result for users, is that we use our human and contextual intelligence to define these parameters. Some of it is fairly obvious and some is more subtle. For example, we think a phrase match on the title without the need for stemming is the best possible match, so that gets the highest boost.

Why explicit boost is important

If someone types “creating repositories” it should definitely match a title like “Create a private GitHub repository” because of the stems creating => creat <= create and repositories => repository <= repository. We should definitely include those matches that take stemming into account. But if there’s an article that explicitly uses the words that match what the user typed, like “Creating private GitHub repositories,” then we want to boost that article’s ranking because we think that’s more relevant to the searcher.

Another good example of this is the special keyword working-directory which is an actual exact term that can appear inside the content. If someone searches for working-directory, we don’t want to let an (example) title like “Directories that work” overpower the rankings when working-directory and “Directories that work” are both deconstructed to the same two stems [ 'work', 'directori' ].

The solution we rely on is to make two matches: one with stemming and one without. Each one has a different boost. It’s similar to saying, “I’m looking for a ‘Peter’ but if there’s a ‘Petter’ or ‘Pierre’ or ‘Piotr’ that’ll also do. But ideally, ‘Peter’ first and foremost. In fact, give me all the results, but ‘Peter’ first.” So this is what we do. The stemming is great but it can potentially “overpower” search results. This helps with specific keywords that might appear to be English prose. For example “working-directory” even looks like a regular English expression but it’s actually a hardcoded specific keyword.

In terms of code, it looks like this:

// Creating the index...
await client.indices.create({
  mappings: {
    properties: {
      url: { type: 'keyword' },
      title: { type: 'text', analyzer: 'text_analyzer', norms: false },
      title_explicit: { type: 'text', analyzer: 'text_analyzer_explicit', norms: false },
      content: { type: 'text', analyzer: 'text_analyzer' },
      content_explicit: { type: 'text', analyzer: 'text_analyzer_explicit' },
      // ...snip...
    // ...snip...
// Searching...
    { match: { title_explicit: { boost: BOOST_EXPLICIT * BOOST_TITLE, query } } },
    { match: { content_explicit: { boost: BOOST_EXPLICIT * BOOST_CONTENT, query } } },
    { match: { title: { boost: BOOST_TITLE, query } } },
    { match: { content: { boost: BOOST_CONTENT, query } } },
    // ...snip...

Ranking is not that easy

Search is quickly becoming an art. It’s not enough to just match the terms and display a list of documents that match the terms of the input. For starters, the ranking is crucially important. This is especially true when a search term yields tens or hundreds of matching documents. Years of depending on Google has taught us all to expect the first search result to be the one we want.

To make this a great experience, we try to infer which document the user is genuinely looking for by using pageview metrics as a way to determine which page is most popular. Sure, if the most popular one is offered first, and gets the clicks, it’ll just get even more popular, but it’s a start. We get a lot of pageviews from users Googling something. But we also get regular pageview metrics from users simply navigating themselves to the pages they have figured out has the best information for them.

At the moment, we gather pageview metrics for the top 1,000 most popular URLs. Then we rank them and normalize it to a number between (and including) 0.0 to 1.0. Whatever the number is we add +1.0 and then we multiply this number in Elasticsearch with the match score.

Suppose a search query finds two documents that match, and their match score is 15.6 and 13.2 based on the search implementation mentioned above. Now, suppose that match of 13.2 is on a popular page, its popularity number might be 0.75, so it becomes 13.2 * (1 + 0.75) = 23.1. And the other one that matched a little bit better, has a popularity number of 0.44, so its final number becomes 15.6 * (1 + 0.44) = 22.5, which is less. In conclusion, it gives documents that might not be term-for-term as “matchy” as others a chance to rise above. This also ensures that a hugely popular document that only matched vaguely in the content won’t “overpower” other matches that might have matched in the title.

It’s a tricky challenge, but that’s what makes it so much fun. You have to bake in some human touch to the code as a way of trying to think like your users. It’s also an algorithm that will never reach perfection because even with more and better metrics, the landscape of users and where they come from, is constantly changing. But we’ll try to keep up.

What’s next?

Elasticsearch has a functionality where you can define aliases for words, which they call synonyms. (For example, repo = repository.) The challenge with this feature is how it’s managed and maintained by writers in a convenient and maintainable way.

Currently at GitHub, we base our popularity numbers by pageview metrics. It would be interesting to dig deeper into which page the user lands on. As an example, suppose the reader isn’t sure how to find what they need, so they start on a product landing page (we have about 20 of those) and slowly make their way deeper into the content and finally arrive on the page that supplied the knowledge or answer they want. This process would give each page an equal measure, and that’s not great.

Another exciting idea is to record all the times a search result URL is not clicked on, especially when it shows up higher in the ranking. That would decrease the risk of popular listings only getting more popular. If you record when something is “corrected” by humans, that could be a very powerful signal.

You could also tailor the underlying search based on other contextual variables. For example, if a user is currently inside the REST API docs you could infer that REST API-related docs are slightly preferred when searching for something ambiguous like “billing” (i.e. prefer REST API “About billing” not Billing and payments “Setting your billing email.”).

What are you looking for next? Have you tried the search on https://docs.github.com recently and found that it wasn’t giving you the best search result? Please let us know and get in touch.

How Netflix Content Engineering makes a federated graph searchable (Part 2)

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-content-engineering-makes-a-federated-graph-searchable-part-2-49348511c06c

By Alex Hutter, Falguni Jhaveri, and Senthil Sayeebaba

In a previous post, we described the indexing architecture of Studio Search and how we scaled the architecture by building a config-driven self-service platform that allowed teams in Content Engineering to spin up search indices easily.

This post will discuss how Studio Search supports querying the data available in these indices.

Data consumption from Studio Search DGS


When we say Content Engineering teams are interested in searching against the federated graph, the use-case is mainly focused on known-item search (a user has an item or items in mind they are trying to view or navigate to but need to use an external information system to locate them) and data retrieval (typically the data is structured and there is no ambiguity as to whether a particular record matches the given search criteria except in the case of textual fields where there is limited ambiguity) within a vertical search experience (focus on enabling search for a specific sub-graph within the big federated graph)

Query Language

Given the above scope of the search (vertical search experience with a focus on known-item search and data retrieval), one of the first things we had to design was a language that users can use to easily express their search criteria. With a goal of abstracting users away from the complexity of interacting with Elasticsearch directly, we landed on a custom Studio Search DSL reminiscent of SQL.

The DSL supports specifying the search criteria as comparison expressions or inclusion/exclusion filters. The filter expressions can be combined together through logical operators (AND, OR, NOT) and grouped together through parentheses.

Sample Syntax

For example, to find all comedies from France or Spain, the query would be:

(genre == ‘comedy’) AND (country ANY [‘FR’, ‘SP’])

We used ANTLR to build the grammar for the Query DSL. From the grammar, ANTLR generates a parser that can walk the parse tree. By extending the ANTLR generated parse tree visitor, we were able to implement an Elasticsearch Query Builder component with the logic to generate the Elasticsearch query corresponding to the custom search query.

If you are familiar with Elasticsearch, then you might be familiar with how complicated it can be to build up the correct Elasticsearch query for complex queries, especially if the index includes nested JSON documents which add an additional layer of complexity with respect to building nested queries (Incorrectly constructed nested queries can lead to Elasticsearch quietly returning wrong results). By exposing just a generic query language to the users and isolating the complexity to just our Elasticsearch Query Builder, we have been able to empower users to write search queries without requiring familiarity with Elasticsearch. This also leaves the possibility of swapping Elasticsearch with a different search engine in the future.

One other challenge for the users when writing the search queries is to understand the fields that are available in the index and the associated types. Since we index the data as-is from the federated graph, the indexing query itself acts as self-documentation. For example, given the indexing query –

Sample GraphQL query

To find movies based on the actors’ roles, the query filter is simply

`actors.role == ‘actor’`

Text Search

While the search DSL provides a powerful way to help narrow the scope of the search queries, users can also find documents in the index through free form text — either with just the input text or in combination with a filter expression in the search DSL. Behind the scenes during the indexing process, we have configured the Elasticsearch index with the appropriate analyzers to ensure that the most relevant matches for the input text are returned in the results.

Hydration through Federation

Given the wide adoption of the federated gateway within Content Engineering, we decided to implement the Studio Search service as a DGS (Domain Graph Service) that integrated with the federated gateway. The search APIs (besides search, we have other APIs to support faceted search, typeahead suggestions, etc) are exposed as GraphQL queries within the federated graph.

This integration with the federation gateway allows the search DGS to just return the matching entity keys from the search index instead of the whole matching document(s). Through the power of federation, users are then able to hydrate the search results with any data available in the federated graph. This allows the search indices to be lean by indexing only the fields necessary for the search experience and at the same time provides complete flexibility for the users to fetch any data available in the federated graph instead of being restricted to just the data available in the search index.


Sample Search query

In the above example, users are able to fetch the production schedule as part of the search results even though the search index doesn’t hold that data.


With the API to query the data in the search indices in place, the next thing we needed to tackle was figuring out how to secure access to the data in the indices. With several of the indices including sensitive data, and the source teams already having restrictive access policies in place to secure the data they own, the search indices which hosted a secondary copy of the source data needed to be secured as well.

We chose to apply “late binding” (or “query time”) security — on every incoming search query, we make an API call to the centralized access policy server with context including the identity of the caller making the request and the search index they are trying to access. The policy server evaluates the access policies defined by the source teams and returns a set of constraints. Ex. The caller has access to Movies where the type is ‘licensed’ (The caller does not have access to Netflix-produced content, but just the licensed content). The constraints are then translated to a set of filter expressions in the search query DSL format (Ex. movie.type == ‘licensed’) and combined with the user-specified search filter with a logical AND operator to form a new search query that then gets executed against the index.

By adding on the access constraints as additional filters before executing the query, we ensure that the user gets back only the data they have access to from the underlying search index. This also allows source teams to evolve their access policies independently knowing that the correct constraints will be applied at query time.

Customizing Search

With the decision to build Studio Search as a GraphQL service using the DGS framework and relying on federation for hydrating results, onboarding new search indices required updating various portions of the GraphQL schema (the enum of available indices, the union of all federated result types, etc.) manually and registering the updated schema with the federated gateway schema registry before the new index was available for querying through the GraphQL API.

Additionally, there are additional configurations that users can provide while onboarding a new index to customize the search behavior for their applications — including scripts to tune the relevance scoring algorithm, configuring fields for faceted search, and configuration to control the behavior of typeahead suggestions, etc. These configurations were initially stored in our source control repository which meant any changes to the configuration of any index required a deployment for the changes to take effect.

Recently, we automated this process as well by moving all the configurations to a persistence store and leveraging the power of dynamic schemas in the DGS framework. Users can now use an API to create/update search index configuration and we are able to validate the provided configuration, generate the updated DGS schema dynamically and register the updated schema with the federated gateway schema registry immediately. All configuration changes are reflected immediately in subsequent search queries.

Example configuration:

Sample Search configuration

UI Components

While the primary goal of Studio Search was to build an easy-to-use self-service platform to enable searching against the federated graph, another important goal was to help the Content Engineering teams deliver a visually consistent search experience to the users of their tools and workflows. To that end, we partnered with our UI/UX teams to build a robust set of opinionated presentational components. Studio Search’s offering of drop-in UI components based on our Hawkins design system for typeahead suggestion, faceted search, and extensive filtering ensure visual and behavioral consistency across the suite of applications within Content Engineering. Below are a couple of examples.

Typeahead Search Component

Faceted Search Component

What’s Next?

As a config-driven, self-serve platform, Studio Search has already been able to empower Content Engineering teams to quickly enable the functionality to search against the Content federated graph within their suite of applications. But, we are not quite done yet! There are several upcoming features that are in various stages of development including

  • Leveraging the percolate query functionality in Elasticsearch to support a notifications feature (users save their search criteria and are notified when documents are updated in the index that matches their search criteria)
  • Add support for metrics aggregation in our APIs
  • Leverage the managed delivery functionality in Spinnaker to move to a declarative model for onboarding the search indices
  • And, plenty more

If this sounds interesting to you, connect with us on LinkedIn.


Thanks to Anoop Panicker, Bo Lei, Charles Zhao, Chris Dhanaraj, Hemamalini Kannan, Jim Isaacs, Johnny Chang, Kasturi Chatterjee, Kishore Banala, Kevin Zhu, Tom Lee, Tongliang Liu, Utkarsh Shrivastava, Vince Bello, Vinod Viswanathan, Yucheng Zeng

How Netflix Content Engineering makes a federated graph searchable (Part 2) was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Netflix Content Engineering makes a federated graph searchable

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-content-engineering-makes-a-federated-graph-searchable-5c0c1c7d7eaf

By Alex Hutter, Falguni Jhaveri and Senthil Sayeebaba

Over the past few years Content Engineering at Netflix has been transitioning many of its services to use a federated GraphQL platform. GraphQL federation enables domain teams to independently build and operate their own Domain Graph Services (DGS) and, at the same time, connect their domain with other domains in a unified GraphQL schema exposed by a federated gateway.

As an example, let’s examine three core entities of the graph, each owned by separate engineering teams:

  1. Movie: At Netflix, we make titles (shows, films, shorts etc.). For simplicity, let’s assume each title is a Movie object.
  2. Production: Each Movie is associated with a Studio Production. A Production object tracks everything needed to make a Movie including shooting location, vendors, and more.
  3. Talent: the people working on a Movie are the Talent, including actors, directors, and so on.
Sample GraphQL Schema

Once entities like the above are available in the graph, it’s very common for folks to want to query for a particular entity based on attributes of related entities, e.g. give me all movies that are currently in photography with Ryan Reynolds as an actor.

In a federated graph architecture, how can we answer such a query given that each entity is served by its own service? The Movie service would need to provide an endpoint that accepts a query and filters that may apply to data the service does not own, and use those to identify the appropriate Movie entities to return.

In fact, every entity owning service could be required to do this work.

This common problem of making a federated graph searchable led to the creation of Studio Search.

The Studio Search platform was designed to take a portion of the federated graph, a subgraph rooted at an entity of interest, and make it searchable. The entities of the subgraph can be queried with text input, filtered, ranked, and faceted. In the next section, we’ll discuss how we made this possible.

Introducing Studio Search

When hearing that we want to enable teams to search something, your mind likely goes to building an index of some kind. Ours did too! So we need to build an index of a portion of the federated graph.

How do our users tell us which portion and, even more critically, given that the portion of the graph of interest will almost definitely span data exposed by many services, how do we keep the index current with all these various services?

We chose Elasticsearch as the underlying technology for our index and determined that there were three main pieces of information required to build out an indexing pipeline:

  • A definition of their subgraph of interest rooted at the entity they primarily will be searching for
  • Events to notify the platform of changes to entities in the subgraph
  • Index specific configuration such as whether a field should be used for full text queries or whether a sub-document is nested

In short, our solution was to build an index for the subgraphs of interest. This index needs to be kept up-to-date with the data exposed by the various services in the federated graph in near-real time.

GraphQL gives us a straightforward way to define the subgraph — a single templated GraphQL query that pulls all of the data the user is interested in using in their searches.

Here’s an example GraphQL query template. It’s pulling data for Movies and their related Productions and Talent.

Sample GraphQL query

To keep the index up to date, events are used to trigger a reindexing operation for individual entities when they change. Change Data Capture (CDC) events are the preferred events for triggering these operations — most teams produce them using Netflix’s CDC connectors — however, application events are also supported when necessary.

All data to be indexed is being fetched from the federated graph so all that is needed in the events is an entity id; the id can be substituted into the GraphQL query template to fetch the entity and any related data.

Using the type information present in the GraphQL query template and the user specified index configuration we were able to create an index template with a set of custom Elasticsearch text analyzers that generalized well across domains.

Given these inputs, a Data Mesh pipeline can be created that consists of the user provided CDC event source, a processor to enrich those events using the user provided GraphQL query and a sink to Elasticsearch.


Putting this all together, below you can see a simplified view of the architecture.

Studio Search Indexing Architecture
  1. Studio applications produce events to schematized Kafka streams within Data Mesh.

a. By transacting with a database which is monitored by a CDC connector that creates events, or

b. By directly creating events using a Data Mesh client.

2. The schematized events are consumed by Data Mesh processors implemented in the Apache Flink framework. Some entities have multiple events for their changes so we leverage union processors to combine data from multiple Kafka streams.

a. A GraphQL processor executes the user provided GraphQL query to fetch documents from the federated gateway.

b. The federated gateway, in turn, fetches data from the Studio applications.

3. The documents fetched from the federated gateway are put onto another schematized Kafka topic before being processed by an Elasticsearch sink in Data Mesh that indexes them into Elasticsearch index configured with an indexing template created specifically for the fields and types present in the document.

Reverse lookups

You may have noticed something missing in the above explanation. If the index is being populated based on Movie id events, how does it stay up to date when a Production or Talent changes? Our solution to this is a reverse lookup — when a change to a related entity is made, we need to look up all of the primary entities that could be affected and trigger events for those. We do this by consulting the index itself and querying for all primary entities related to the entity that has changed.

For instance if our index has a document that looks like this:

Sample Elasticsearch document

And the pipeline observes a change to the Production with ptpId “abc”, we can query the index for all documents with production.ptpId == “abc” and extract the movieId. Then, we can pass that movieId down into the rest of the indexing pipeline.

Scaling the Solution

The solution we came up with worked quite well. Teams were easily able to share the requirements for their subgraph’s index via a GraphQL query template and could use existing tooling to generate the events to enable the index to be kept up to date in near real-time. Reusing the index itself to power reverse lookups enabled us to keep all the logic for handling related entities contained within our systems and shield our users from that complexity. In fact it worked so well that we became inundated with requests to integrate with Studio Search — it began to power a significant portion of the user experience for many applications within Content Engineering.

Early on, we did integrations by hand but as adoption of Studio Search took off this did not scale. We needed to build tools to help us automate as much of the provisioning of the pipelines as possible. In order to get there we identified four main problems we needed to solve:

  • How to collect all the required configuration for the pipeline from users.
  • Data Mesh streams are schematized with Avro. In the previous architecture diagram, in 3) there is a stream carrying the results of the GraphQL query to the Elasticsearch sink. The response from GraphQL can contain 10s of fields, often nested. Writing an Avro schema for such a document is time consuming and error prone to do by hand. We needed to make this step much easier.
  • Similarly the generation of the Elasticsearch template was time consuming and error prone. We needed to determine how to generate one based on the users’ configuration.
  • Finally, creating Data Mesh pipelines manually was time consuming and error prone as well due to the volume of configuration required.


For collecting the indexing pipeline configuration from users we defined a single configuration file that enabled users to provide a high level description of their pipeline that we can use to programmatically create the indexing pipeline in Data Mesh. By using this high-level description we were able to greatly simplify the pipeline creation process for users by filling in common yet required configuration for the Data Mesh pipeline.

Sample .yaml configuration

Avro schema & Elasticsearch index template generation

The approach for both schema and index template generation was very similar. Essentially it required taking the user provided GraphQL query template and generating JSON from it. This was done using graphql-java. The steps required are enumerated below:

  • Introspect the federated graph’s schema and use the response to build a GraphQLSchema object
  • Parse and validate the user provided GraphQL query template against the schema
  • Visit the nodes of the query using utilities provided by graphql-java and collect the results into a JSON object — this generated object is the schema/template


The previous steps centralized all the configuration in a single file and provided tools to generate additional configuration for the pipeline’s dependencies. Now all that was required was an entry point for users to provide their configuration file for orchestrating the provisioning of the indexing pipeline. Given our user base was other engineers we decided to provide a command line interface (CLI) written in Python. Using Python we were able to get the first version of the CLI to our users quickly. Netflix provides tooling that makes the CLI auto-update which makes the CLI easy to iterate on. The CLI performs the following tasks:

  • Validates the provided configuration file
  • Calls a service to generate the Avro schema & Elasticsearch index template
  • Assembles the logical plan for the Data Mesh pipeline and creates it using Data Mesh APIs

A CLI is just a step towards a better self-service deployment process. We’re currently exploring options for treating these indices and their pipelines as declarative infrastructure managed within the application that consumes them.

Current Challenges

Using the federated graph to provide the documents for indexing simplifies much of the indexing process but it also creates its own set of challenges. If the challenges below sound exciting to you, come join us!


Bootstrapping a new index for the addition or removal of attributes or refreshing an established index both add considerable additional and spiky load to the federated gateway and the component DGSes. Depending on the cardinality of the index and the complexity of its query we may need to coordinate with service owners and/or run backfills off peak. We continue to manage tradeoffs between reindexing speed and load.

Reverse Lookups

Reverse lookups, while convenient, are not particularly user friendly. They introduce a circular dependency in the pipeline — you can’t create the indexing pipeline without reverse lookups and reverse lookups need the index to function — which we’ve mitigated although it still creates some confusion. They also require the definer of the index to have detailed knowledge of the eventing for related entities they want to include and that may cover many different domains depending on the index — we have one index covering eight domains.

Index consistency

As an index becomes more complex it is likely to depend on more DGSes and the likelihood of errors increases when fetching the required documents from the federated graph. These errors can lead to documents in the index being out of date or even missing altogether. The owner of the index is often required to follow up with other domain teams regarding errors in related entities and be in the unenviable position of not being able to do much to resolve the issues independently. When the errors are resolved, the process of replaying the failed events is manual and there can be a lag when the service is again successfully returning data but the index does not match it.

Stay Tuned

In this post, we described how our indexing infrastructure moves data for any given subgraph of the Netflix Content federated graph to Elasticsearch and keeps that data in sync with the source of truth. In an upcoming post, we’ll describe how this data can be queried without actually needing to know anything about Elasticsearch.


Thanks to Anoop Panicker, Bo Lei, Charles Zhao, Chris Dhanaraj, Hemamalini Kannan, Jim Isaacs, Johnny Chang, Kasturi Chatterjee, Kishore Banala, Kevin Zhu, Tom Lee, Tongliang Liu, Utkarsh Shrivastava, Vince Bello, Vinod Viswanathan, Yucheng Zeng

How Netflix Content Engineering makes a federated graph searchable was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Connecting to Kibana Within an AWS VPC

Post Syndicated from Bozho original https://techblog.bozho.net/connecting-to-kibana-within-an-aws-vpc/

When you use the managed Elasticsearch service on AWS, you usually choose an encrypted connection (via KMS-managed keys), which means you can’t use just any tool to connect to your Elasticsearch cluster. In fact, in order to manually execute commands the easiest option is to use the built-in Kibana and its dev tools.

However, connecting to Kibana is also not trivial due to typical security precautions. Elasticsearch can be run outside or inside a VPC. If you run it outside a VPC, you have to modify its access policy to allow connections from a set of IPs (e.g. your office network).

But if you run it inside a VPC (which is recommended), you have to connect to the VPC. And you have a lot of options for that, but all of them are rather complicated and sometimes even introduce additional cost.

A much simpler approach is to connect via SSH to a machine in the VPC (typically your bastion/jump host) and use it as a SOCKS proxy for your browser. The steps are:

  1. Open an SSH tunnel. If you are using Windows, you can do it with PuTTy. On Linux, you can use ssh$ ssh -D 1337 -q -C -N [email protected]
  2. Set the SOCKS proxy in the browser. On Firefox, open Options and type “SOCKS”, you’ll have only one option (in Network options) to choose, and then set localhost, 1337 (or whatever port you’ve chosen). Here are the instruction for Chrome (or you can use a plugin)
  3. Open the Kibana URL in the browser. Note that now all your browser traffic will go through your VPC, so depending on the VPC configuration other websites might not work.

That’s it, a quick tip that might potentially save a lot of time trying to get a VPC connection to work.

The post Connecting to Kibana Within an AWS VPC appeared first on Bozho's tech blog.

Elasticsearch – Scalability and Multitenancy [slides]

Post Syndicated from Bozho original https://techblog.bozho.net/elasticsearch-scalability-and-multitenancy-slides/

Last week I gave a talk in a local tech group about my experience with Elasticsearch at LogSentinel, and how we achieve multitenancy and scalability.

Obviously, the topic of scalability is huge and it can’t be fully covered in 45 minutes, but I tried presenting the main aspects from the application perspective (I entirely skipped the Ops perspective, as it was a developer audience). The list of resources at the end of the slides show some of the sources of my “research” on the topic, which I recommend going through.

Below are the slides (the talk was not in English):

I hope it’s a useful intro to the topic and the main conclusion is – it’s counterintuitive if you are used to relational databases, and some internals (shards, Lucene segments) “leak” through the abstractions to influence the application design (as per the law of leaky abstractions).

The post Elasticsearch – Scalability and Multitenancy [slides] appeared first on Bozho's tech blog.

Masking field values with Amazon Elasticsearch Service

Post Syndicated from Prashant Agrawal original https://aws.amazon.com/blogs/security/masking-field-values-with-amazon-elasticsearch-service/

Amazon Elasticsearch Service (Amazon ES) is a fully managed service that you can use to deploy, secure, and run Elasticsearch cost-effectively at scale. The service provides support for open-source Elasticsearch APIs, managed Kibana, and integration with Logstash and other AWS services. Amazon ES provides a deep security model that spans many layers of interaction and supports fine-grained access control at the cluster, index, document, and field level, on a per-user basis. The service’s security plugin integrates with federated identity providers for Kibana login.

A common use case for Amazon ES is log analytics. Customers configure their applications to store log data to the Elasticsearch cluster, where the data can be queried for insights into the functionality and use of the applications over time. In many cases, users reviewing those insights should not have access to all the details from the log data. The log data for a web application, for example, might include the source IP addresses of incoming requests. Privacy rules in many countries require that those details be masked, wholly or in part. This post explains how to set up field masking within your Amazon ES domain.

Field masking is an alternative to field-level security that lets you anonymize the data in a field rather than remove it altogether. When creating a role, add a list of fields to mask. Field masking affects whether you can see the contents of a field when you search. You can use field masking to either perform a random hash or pattern-based substitution of sensitive information from users, who shouldn’t have access to that information.

When you use field masking, Amazon ES creates a hash of the actual field values before returning the search results. You can apply field masking on a per-role basis, supporting different levels of visibility depending on the identity of the user making the query. Currently, field masking is only available for string-based fields. A search result with a masked field (clientIP) looks like this:

  "_index": "web_logs",
  "_type": "_doc",
  "_id": "1",
  "_score": 1,
  "_source": {
    "agent": "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1",
    "bytes": 0,
    "clientIP": "7e4df8d4df7086ee9c05efe1e21cce8ff017a711ee9addf1155608ca45d38219",
    "host": "www.example.com",
    "extension": "txt",
    "geo": {
      "src": "EG",
      "dest": "CN",
      "coordinates": {
        "lat": 35.98531194,
        "lon": -85.80931806
    "machine": {
      "ram": 17179869184,
      "os": "win 7"

To follow along in this post, make sure you have an Amazon ES domain with Elasticsearch version 6.7 or higher, sample data loaded (this example uses the web logs data supplied by Kibana), and access to Kibana through a role with administrator privileges for the domain.

Configure field masking

Field masking is managed by defining specific access controls within the Kibana visualization system. You’ll need to create a new Kibana role, define the fine-grained access-control privileges for that role, specify which fields to mask, and apply that role to specific users.

You can use either the Kibana console or direct-to-API calls to set up field masking. In our first example, we’ll use the Kibana console.

To configure field masking in the Kibana console

  1. Log in to Kibana, choose the Security pane, and then choose Roles, as shown in Figure 1.

    Figure 1: Choose security roles

    Figure 1: Choose security roles

  2. Choose the plus sign (+) to create a new role, as shown in Figure 2.

    Figure 2: Create role

    Figure 2: Create role

  3. Choose the Index Permissions tab, and then choose Add index permissions, as shown in Figure 3.

    Figure 3: Set index permissions

    Figure 3: Set index permissions

  4. Add index patterns and appropriate permissions for data access. See the Amazon ES documentation for details on configuring fine-grained access control.
  5. Once you’ve set Index Patterns, Permissions: Action Groups, Document Level Security Query, and Include or exclude fields, you can use the Anonymize fields entry to mask the clientIP, as shown in Figure 4.

    Figure 4: Anonymize field

    Figure 4: Anonymize field

  6. Choose Save Role Definition.
  7. Next, you need to create one or more users and apply the role to the new users. Go back to the Security page and choose Internal User Database, as shown in Figure 5.

    Figure 5: Select Internal User Database

    Figure 5: Select Internal User Database

  8. Choose the plus sign (+) to create a new user, as shown in Figure 6.

    Figure 6: Create user

    Figure 6: Create user

  9. Add a username and password, and under Open Distro Security Roles, select the role es-mask-role, as shown in Figure 7.

    Figure 7: Select the username, password, and roles

    Figure 7: Select the username, password, and roles

  10. Choose Submit.

If you prefer, you can perform the same task by using the Amazon ES REST API using Kibana dev tools.

Use the following API to create a role as described in below snippet and shown in Figure 8.

PUT _opendistro/_security/api/roles/es-mask-role
  "cluster_permissions": [],
  "index_permissions": [
      "index_patterns": [
      "dls": "",
      "fls": [],
      "masked_fields": [
      "allowed_actions": [

Sample response:

  "status": "CREATED",
  "message": "'es-mask-role' created."
Figure 8: API to create Role

Figure 8: API to create Role

Use the following API to create a user with the role as described in below snippet and shown in Figure 9.

PUT _opendistro/_security/api/internalusers/es-mask-user
  "password": "xxxxxxxxxxx",
  "opendistro_security_roles": [

Sample response:

  "status": "CREATED",
  "message": "'es-mask-user' created."
Figure 9: API to create User

Figure 9: API to create User

Verify field masking

You can verify field masking by running a simple search query using Kibana dev tools (GET web_logs/_search) and retrieving the data first by using the kibana_user (with no field masking), and then by using the es-mask-user (with field masking) you just created.

Query responses run by the kibana_user (all access) have the original values in all fields, as shown in Figure 10.

Figure 10: Retrieval of the full clientIP data with kibana_user

Figure 10: Retrieval of the full clientIP data with kibana_user

Figure 11, following, shows an example of what you would see if you logged in as the es-mask-user. In this case, the clientIP field is hidden due to the es-mask-role you created.

Figure 11: Retrieval of the masked clientIP data with es-mask-user

Figure 11: Retrieval of the masked clientIP data with es-mask-user

Use pattern-based field masking

Rather than creating a hash, you can use one or more regular expressions and replacement strings to mask a field. The syntax is <field>::/<regular-expression>/::<replacement-string>.

You can use either the Kibana console or direct-to-API calls to set up pattern-based field masking. In the following example, clientIP is masked in such a way that the last three parts of the IP address are masked by xxx using the pattern is clientIP::/[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}$/::xxx.xxx.xxx>. You see only the first part of the IP address, as shown in Figure 12.

Figure 12: Anonymize the field with a pattern

Figure 12: Anonymize the field with a pattern

Run the search query to verify that the last three parts of clientIP are masked by custom characters and only the first part is shown to the requester, as shown in Figure 13.

Figure 13: Retrieval of the masked clientIP (according to the defined pattern) with es-mask-user

Figure 13: Retrieval of the masked clientIP (according to the defined pattern) with es-mask-user


Field level security should be the primary approach for ensuring data access security – however if there are specific business requirements that cannot be met with this approach, then field masking may offer a viable alternative. By using field masking, you can selectively allow or prevent your users from seeing private information such as personally identifying information (PII) or personal healthcare information (PHI). For more information about fine-grained access control, see the Amazon Elasticsearch Service Developer Guide.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the Amazon Elasticsearch Service forum or contact AWS Support.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.


Prashant Agrawal

Prashant is a Search Specialist Solutions Architect with Amazon Elasticsearch Service. He works closely with team members to help customers migrate their workloads to the cloud. Before joining AWS, he helped various customers use Elasticsearch for their search and analytics use cases.

ElasticSearch Multitenancy With Routing

Post Syndicated from Bozho original https://techblog.bozho.net/elasticsearch-multitenancy-with-routing/

Elasticsearch is great, but optimizing it for high load is always tricky. This won’t be yet another “Tips and tricks for optimizing Elasticsearch” article – there are many great ones out there. I’m going to focus on one narrow use-case – multitenant systems, i.e. those that support multiple customers/users (tenants).

You can build a multitenant search engine in three different ways:

  • Cluster per tenant – this is the hardest to manage and requires a lot of devops automation. Depending on the types of customers it may be worth it to completely isolate them, but that’s rarely the case
  • Index per tenant – this can be fine initially, and requires little additional coding (you just parameterize the “index” parameter in the URL of the queries), but it’s likely to cause problems as the customer base grows. Also, supporting consistent mappings and settings across indexes may be trickier than it sounds (e.g. some may reject an update and others may not depending on what’s indexed). Moving data to colder indexes also becomes more complex.
  • Tenant-based routing – this means you put everything in one cluster but you configure your search routing to be tenant-specific, which allows you to logically isolate data within a single index.

The last one seems to be the preferred option in general. What is routing? The Elasticsearch blog has a good overview and documentation. The idea lies in the way Elasticsearch handles indexing and searching – it splits data into shards (each shard is a separate Lucene index and can be replicated on more than one node). A shard is a logical grouping within a single Elasticsearch node. When no custom routing is used, and an index request comes, the ID is used to determine which shard is going to be used to store the data. However, during search, Elasticsearch doesn’t know which shards have the data, so it has ask multiple shards and gather the results. Related to that, there’s the newly introduced adaptive replica selection, where the proper shard replica is selected intelligently, rather than using round-robin.

Custom routing allows you to specify a routing value when indexing a document and then a search can be directed only to the shard that has the same routing value. For example, at LogSentinel when we index a log entry, we use the data source id (applicationId) for routing. So each application (data source) that generates logs has a separate identifier which allows us to query only that data source’s shard. That way, even though we may have a thousand clients with a hundred data sources each, a query will be precisely targeted to where the data for that particular customer’s data source lies.

This is key for horizontally scaling multitenant applications. When there’s terabytes of data and billions of documents, many shards will be needed (in order to avoid large and heavy shards that cause performance issues). Finding data in this haystack requires the ability to know where to look.

Note that you can (and probably should) make routing required in these cases – each indexed document must be required to have a routing key, otherwise an implementation oversight may lead to a slow index.

Using custom routing you are practically turning one large Elasticsearch cluster into smaller sections, logically separated based on meaningful identifiers. In our case, it is not a userId/customerId, but one level deeper – there are multiple shards per customer, but depending on the use-case, it can be one shard per customer, using the userId/customerId. Using more than one shard per customer may complicate things a little – for example having too many shards per customer may require searches that span too many shards, but that’s not necessarily worse than not using routing.

There are some caveats – the isolation of customer data has to be handled in the application layer (whereas for the first two approaches data is segregated operationally). If there’s an application bug or lack of proper access checks, one user can query data from other users’ shards by specifying their routing key. It’s the role of the application in front of Elasticsearch to only allow queries with routing keys belonging to the currently authenticated user.

There are cases when the first two approaches to multitenancy are viable (e.g. a few very large customers), but in general the routing approach is the most scalable one.

The post ElasticSearch Multitenancy With Routing appeared first on Bozho's tech blog.