Tag Archives: data-mesh

Build a multi-Region and highly resilient modern data architecture using AWS Glue and AWS Lake Formation

Post Syndicated from Vivek Shrivastava original https://aws.amazon.com/blogs/big-data/build-a-multi-region-and-highly-resilient-modern-data-architecture-using-aws-glue-and-aws-lake-formation/

AWS Lake Formation helps with enterprise data governance and is important for a data mesh architecture. It works with the AWS Glue Data Catalog to enforce data access and governance. Both services provide reliable data storage, but some customers want replicated storage, catalog, and permissions for compliance purposes.

This post explains how to create a design that automatically backs up Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, and Lake Formation permissions in different Regions and provides backup and restore options for disaster recovery. These mechanisms can be customized for your organization’s processes. The utility for cloning and experimentation is available in the open-sourced GitHub repository.

This solution only replicates metadata in the Data Catalog, not the actual underlying data. To have a redundant data lake using Lake Formation and AWS Glue in an additional Region, we recommend replicating the Amazon S3-based storage using S3 replication, S3 sync, aws-s3-copy-sync-using-batch or S3 Batch replication process. This ensures that the data lake will still be functional in another Region if Lake Formation has an availability issue. The Data Catalog setup (tables, databases, resource links) and Lake Formation setup (permissions, settings) must also be replicated in the backup Region.

Solution overview

This post shows how to create a backup of the Lake Formation permissions and AWS Glue Data Catalog from one Region to another in the same account. The solution doesn’t create or modify AWS Identity and Access Management (IAM) roles, which are available in all Regions. There are three steps to creating a multi-Region data lake:

  1. Migrate Lake Formation data permissions.
  2. Migrate AWS Glue databases and tables.
  3. Migrate Amazon S3 data.

In the following sections, we look at each migration step in more detail.

Lake Formation permissions

In Lake Formation, there are two types of permissions: metadata access and data access.

Metadata access permissions allow users to create, read, update, and delete metadata databases and tables in the Data Catalog.

Data access permissions allow users to read and write data to specific locations in Amazon S3. Data access permissions are managed using data location permissions, which allow users to create and alter metadata databases and tables that point to specific Amazon S3 locations.

When data is migrated from one Region to another, only the metadata access permissions are replicated. This means that if data is moved from a bucket in the source Region to another bucket in the target Region, the data access permissions need to be reapplied in the target Region.

AWS Glue Data Catalog

The AWS Glue Data Catalog is a central repository of metadata about data stored in your data lake. It contains references to data that is used as sources and targets in AWS Glue ETL (extract, transform, and load) jobs, and stores information about the location, schema, and runtime metrics of your data. The Data Catalog organizes this information in the form of metadata tables and databases. A table in the Data Catalog is a metadata definition that represents the data in a data lake, and databases are used to organize these metadata tables.

Lake Formation permissions can only be applied to objects that already exist in the Data Catalog in the target Region. Therefore, in order to apply these permissions, the underlying Data Catalog databases and tables must already exist in the target Region. To meet this requirement, this utility migrates both the AWS Glue databases and tables from the source Region to the target Region.

Amazon S3 data

The data that underlies an AWS Glue table can be stored in an S3 bucket in any Region, so replication of the data itself isn’t necessary. However, if the data has already been replicated to the target Region, this utility has the option to update the table’s location to point to the replicated data in the target Region. If the location of the data is changed, the utility updates the S3 bucket name and keeps the rest of the prefix hierarchy unchanged.

This utility doesn’t include the migration of data from the source Region to the target Region. Data migration must be performed separately using methods such as S3 replication, S3 sync, aws-s3-copy-sync-using-batch or S3 Batch replication.

This utility has two modes for replicating Lake Formation and Data Catalog metadata: on-demand and real-time. The on-demand mode is a batch replication that takes a snapshot of the metadata at a specific point in time and uses it to synchronize the metadata. The real-time mode replicates changes made to the Lake Formation permissions or Data Catalog in near-real time.

The on-demand mode of this utility is recommended for creating existing Lake Formation permissions and Data Catalogs because it replicates a snapshot of the metadata. After the Lake Formation and Data Catalogs are synchronized, you can use real-time mode to replicate any ongoing changes. This creates a mirror image of the source Region in the target Region and keeps it up to date as changes are made in the source Region. These two modes can be used independently of each other, and the operations are idempotent.

The code for the on-demand and real-time modes is available in the GitHub repository. Let’s look at each mode in more detail.

On-demand mode

On-demand mode is used to copy the Lake Formation permissions and Data Catalog at a specific point in time. The code is deployed using the AWS Cloud Development Kit (AWS CDK). The following diagram shows the solution architecture for this mode.

The AWS CDK deploys an AWS Glue job to perform the replication. The job retrieves configuration information from a file stored in an S3 bucket. This file includes details such as the source and target Regions, an optional list of databases to replicate, and options for moving data to a different S3 bucket. More information about these options and deployment instructions is available in the GitHub repository.

The AWS Glue job retrieves the Lake Formation permissions and Data Catalog object metadata from the source Region and stores it in a JSON file in an S3 bucket. The same job then uses this file to create the Lake Formation permissions and Data Catalog databases and tables in the target Region.

This tool can be run on demand by running the AWS Glue job. It copies the Lake Formation permissions and Data Catalog object metadata from the source Region to the target Region. If you run the tool again after making changes to the target Region, the changes are replaced with the latest Lake Formation permissions and Data Catalog from the source Region.

This utility can detect any changes made to the Data Catalog metadata, databases, tables, and columns while replicating the Data Catalog from the source to the target Region. If a change is detected in the source Region, the latest version of the AWS Glue object is applied to the target Region. The utility reports the number of objects modified during its run.

The Lake Formation permissions are copied from the source to the target Region, so any new permissions are replicated in the target Region. If a permission is removed from the source Region, it is not removed from the target Region.

Real-time mode

Real-time mode replicates the Lake Formation permissions and Data Catalog at a regular interval. The default interval is 1 minute, but it can be modified during deployment. The code is deployed using the AWS CDK. The following diagram shows the solution architecture for this mode.

The AWS CDK deploys two AWS Lambda jobs and creates an Amazon DynamoDB table to store AWS CloudTrail events and an Amazon EventBridge rule to run the replication at a regular interval. The Lambda jobs retrieve the configuration information from a file stored in an S3 bucket. This file includes details such as the source and target Regions, options for moving data to a different S3 bucket, and the lookback period for CloudTrail in hours. More information about these options and deployment instructions is available in the GitHub repository.

The EventBridge rule triggers a Lambda job at a fixed interval. This job retrieves the configuration information and queries CloudTrail events related to the Data Catalog and Lake Formation that occurred in the past hour (the duration is configurable). All relevant events are then stored in a DynamoDB table.

After the event information is inserted into the DynamoDB table, another Lambda job is triggered. This job retrieves the configuration information and queries the DynamoDB table. It then applies all the changes to the target Region. If the tool is run again after making changes to the target Region, the changes are replaced with the latest Lake Formation permissions and Data Catalog from the source Region. Unlike on-demand mode, this utility also removes any Lake Formation permissions that were removed from the source Region from the target Region.

Limitations

This utility is designed to replicate permissions within a single account only. The on-demand mode replicates a snapshot and doesn’t remove existing permissions, so it doesn’t perform delete operations. The API currently doesn’t support replicating changes to row and column permissions.

Conclusion

In this post, we showed how you can use this utility to migrate the AWS Glue Data Catalog and Lake Formation permissions from one Region to another. It can also keep the source and target Regions synchronized if any changes are made to the Data Catalog or the Lake Formation permissions. Implementing it across Regions (multi-Region) is a good option if you are looking for the most separation and complete independence of your globally diverse data workloads. Also consider the trade-offs. Implementing and operating this strategy, particularly using multi-Region, can be more complicated and more expensive, than other DR strategies.

To get started, checkout the github repo. For more resources, refer to the following:


About the authors

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a Bigdata enthusiast and holds 13 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation

Raza Hafeez is a Senior Data Architect within the Shared Delivery Practice of AWS Professional Services. He has over 12 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Nivas Shankar  is a Principal Product Manager for AWS Lake Formation. He works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure and access data lake. Also leads several data and analytics initiatives within AWS including support for Data Mesh.

Let’s Architect! Modern data architectures

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-modern-data-architectures/

With the rapid growth in data coming from data platforms and applications, and the continuous improvements in state-of-the-art machine learning algorithms, data are becoming key assets for companies.

Modern data architectures include data mesh—a recent style that represents a paradigm shift, in which data is treated as a product and data architectures are designed around business domains. This type of approach supports the idea of distributed data, where each business domain focuses on the quality of the data it produces and exposes to the consumers.

In this edition of Let’s Architect!, we focus on data mesh and how it is designed on AWS, plus other approaches to adopt modern architectural patterns.

Design a data mesh architecture using AWS Lake Formation and AWS Glue

Domain Driven Design (DDD) is a software design approach where a solution is divided into domains aligned with business capabilities, software, and organizational boundaries. Unlike software architectures, most data architectures are often designed around technologies rather than business domains.

In this blog, you can learn about data mesh, an architectural pattern that applies the principles of DDD to data architectures. Data are organized into domains and considered the product that each team owns and offers for consumption.

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

Building Data Mesh Architectures on AWS

In this video, discover how to use the data mesh approach in AWS. Specifically, how to implement certain design patterns for building a data mesh architecture with AWS services in the cloud.

This is a pragmatic presentation to get a quick understanding of data mesh fundamentals, the benefits/challenges, and the AWS services that you can use to build it. This video provides additional context to the aforementioned blog post and includes several examples on the benefits of modern data architectures.

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

Build a modern data architecture on AWS with Amazon AppFlow, AWS Lake Formation, and Amazon Redshift

In this blog, you can learn how to build a modern data strategy using AWS managed services to ingest data from sources like Salesforce. Also discussed is how to automatically create metadata catalogs and share data seamlessly between the data lake and data warehouse, plus creating alerts in the event of an orchestrated data workflow failure.

The second part of the post explains how a data warehouse can be built by using an agile data modeling pattern, as well as how ELT jobs were quickly developed, orchestrated, and configured to perform automated data quality testing.

A data platform architecture and the subcomponents used to build it

A data platform architecture and the subcomponents used to build it

AWS Lake Formation Workshop

With a modern data architecture on AWS, architects and engineers can rapidly build scalable data lakes; use a broad and deep collection of purpose-built data services; and ensure compliance via unified data access, security, and governance. As data mesh is a modern architectural pattern, you can build it using a service like AWS Lake Formation.

Familiarize yourself with new technologies and services by not only learning how they work, but also to building prototypes and projects to gain hands-on experience. This workshop allows builders to become familiar with the features of AWS Lake Formation and its integrations with other AWS services.

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

See you next time!

Thanks for joining our discussion on data mesh! See you in a couple of weeks when we talk more about architectures and the challenges that we face every day while working with distributed systems.

Other posts in this series

Looking for more architecture content?

AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

How Netflix Content Engineering makes a federated graph searchable

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-content-engineering-makes-a-federated-graph-searchable-5c0c1c7d7eaf

By Alex Hutter, Falguni Jhaveri and Senthil Sayeebaba

Over the past few years Content Engineering at Netflix has been transitioning many of its services to use a federated GraphQL platform. GraphQL federation enables domain teams to independently build and operate their own Domain Graph Services (DGS) and, at the same time, connect their domain with other domains in a unified GraphQL schema exposed by a federated gateway.

As an example, let’s examine three core entities of the graph, each owned by separate engineering teams:

  1. Movie: At Netflix, we make titles (shows, films, shorts etc.). For simplicity, let’s assume each title is a Movie object.
  2. Production: Each Movie is associated with a Studio Production. A Production object tracks everything needed to make a Movie including shooting location, vendors, and more.
  3. Talent: the people working on a Movie are the Talent, including actors, directors, and so on.
Sample GraphQL Schema

Once entities like the above are available in the graph, it’s very common for folks to want to query for a particular entity based on attributes of related entities, e.g. give me all movies that are currently in photography with Ryan Reynolds as an actor.

In a federated graph architecture, how can we answer such a query given that each entity is served by its own service? The Movie service would need to provide an endpoint that accepts a query and filters that may apply to data the service does not own, and use those to identify the appropriate Movie entities to return.

In fact, every entity owning service could be required to do this work.

This common problem of making a federated graph searchable led to the creation of Studio Search.

The Studio Search platform was designed to take a portion of the federated graph, a subgraph rooted at an entity of interest, and make it searchable. The entities of the subgraph can be queried with text input, filtered, ranked, and faceted. In the next section, we’ll discuss how we made this possible.

Introducing Studio Search

When hearing that we want to enable teams to search something, your mind likely goes to building an index of some kind. Ours did too! So we need to build an index of a portion of the federated graph.

How do our users tell us which portion and, even more critically, given that the portion of the graph of interest will almost definitely span data exposed by many services, how do we keep the index current with all these various services?

We chose Elasticsearch as the underlying technology for our index and determined that there were three main pieces of information required to build out an indexing pipeline:

  • A definition of their subgraph of interest rooted at the entity they primarily will be searching for
  • Events to notify the platform of changes to entities in the subgraph
  • Index specific configuration such as whether a field should be used for full text queries or whether a sub-document is nested

In short, our solution was to build an index for the subgraphs of interest. This index needs to be kept up-to-date with the data exposed by the various services in the federated graph in near-real time.

GraphQL gives us a straightforward way to define the subgraph — a single templated GraphQL query that pulls all of the data the user is interested in using in their searches.

Here’s an example GraphQL query template. It’s pulling data for Movies and their related Productions and Talent.

Sample GraphQL query

To keep the index up to date, events are used to trigger a reindexing operation for individual entities when they change. Change Data Capture (CDC) events are the preferred events for triggering these operations — most teams produce them using Netflix’s CDC connectors — however, application events are also supported when necessary.

All data to be indexed is being fetched from the federated graph so all that is needed in the events is an entity id; the id can be substituted into the GraphQL query template to fetch the entity and any related data.

Using the type information present in the GraphQL query template and the user specified index configuration we were able to create an index template with a set of custom Elasticsearch text analyzers that generalized well across domains.

Given these inputs, a Data Mesh pipeline can be created that consists of the user provided CDC event source, a processor to enrich those events using the user provided GraphQL query and a sink to Elasticsearch.

Architecture

Putting this all together, below you can see a simplified view of the architecture.

Studio Search Indexing Architecture
  1. Studio applications produce events to schematized Kafka streams within Data Mesh.

a. By transacting with a database which is monitored by a CDC connector that creates events, or

b. By directly creating events using a Data Mesh client.

2. The schematized events are consumed by Data Mesh processors implemented in the Apache Flink framework. Some entities have multiple events for their changes so we leverage union processors to combine data from multiple Kafka streams.

a. A GraphQL processor executes the user provided GraphQL query to fetch documents from the federated gateway.

b. The federated gateway, in turn, fetches data from the Studio applications.

3. The documents fetched from the federated gateway are put onto another schematized Kafka topic before being processed by an Elasticsearch sink in Data Mesh that indexes them into Elasticsearch index configured with an indexing template created specifically for the fields and types present in the document.

Reverse lookups

You may have noticed something missing in the above explanation. If the index is being populated based on Movie id events, how does it stay up to date when a Production or Talent changes? Our solution to this is a reverse lookup — when a change to a related entity is made, we need to look up all of the primary entities that could be affected and trigger events for those. We do this by consulting the index itself and querying for all primary entities related to the entity that has changed.

For instance if our index has a document that looks like this:

Sample Elasticsearch document

And the pipeline observes a change to the Production with ptpId “abc”, we can query the index for all documents with production.ptpId == “abc” and extract the movieId. Then, we can pass that movieId down into the rest of the indexing pipeline.

Scaling the Solution

The solution we came up with worked quite well. Teams were easily able to share the requirements for their subgraph’s index via a GraphQL query template and could use existing tooling to generate the events to enable the index to be kept up to date in near real-time. Reusing the index itself to power reverse lookups enabled us to keep all the logic for handling related entities contained within our systems and shield our users from that complexity. In fact it worked so well that we became inundated with requests to integrate with Studio Search — it began to power a significant portion of the user experience for many applications within Content Engineering.

Early on, we did integrations by hand but as adoption of Studio Search took off this did not scale. We needed to build tools to help us automate as much of the provisioning of the pipelines as possible. In order to get there we identified four main problems we needed to solve:

  • How to collect all the required configuration for the pipeline from users.
  • Data Mesh streams are schematized with Avro. In the previous architecture diagram, in 3) there is a stream carrying the results of the GraphQL query to the Elasticsearch sink. The response from GraphQL can contain 10s of fields, often nested. Writing an Avro schema for such a document is time consuming and error prone to do by hand. We needed to make this step much easier.
  • Similarly the generation of the Elasticsearch template was time consuming and error prone. We needed to determine how to generate one based on the users’ configuration.
  • Finally, creating Data Mesh pipelines manually was time consuming and error prone as well due to the volume of configuration required.

Configuration

For collecting the indexing pipeline configuration from users we defined a single configuration file that enabled users to provide a high level description of their pipeline that we can use to programmatically create the indexing pipeline in Data Mesh. By using this high-level description we were able to greatly simplify the pipeline creation process for users by filling in common yet required configuration for the Data Mesh pipeline.

Sample .yaml configuration

Avro schema & Elasticsearch index template generation

The approach for both schema and index template generation was very similar. Essentially it required taking the user provided GraphQL query template and generating JSON from it. This was done using graphql-java. The steps required are enumerated below:

  • Introspect the federated graph’s schema and use the response to build a GraphQLSchema object
  • Parse and validate the user provided GraphQL query template against the schema
  • Visit the nodes of the query using utilities provided by graphql-java and collect the results into a JSON object — this generated object is the schema/template

Deployment

The previous steps centralized all the configuration in a single file and provided tools to generate additional configuration for the pipeline’s dependencies. Now all that was required was an entry point for users to provide their configuration file for orchestrating the provisioning of the indexing pipeline. Given our user base was other engineers we decided to provide a command line interface (CLI) written in Python. Using Python we were able to get the first version of the CLI to our users quickly. Netflix provides tooling that makes the CLI auto-update which makes the CLI easy to iterate on. The CLI performs the following tasks:

  • Validates the provided configuration file
  • Calls a service to generate the Avro schema & Elasticsearch index template
  • Assembles the logical plan for the Data Mesh pipeline and creates it using Data Mesh APIs

A CLI is just a step towards a better self-service deployment process. We’re currently exploring options for treating these indices and their pipelines as declarative infrastructure managed within the application that consumes them.

Current Challenges

Using the federated graph to provide the documents for indexing simplifies much of the indexing process but it also creates its own set of challenges. If the challenges below sound exciting to you, come join us!

Backfill

Bootstrapping a new index for the addition or removal of attributes or refreshing an established index both add considerable additional and spiky load to the federated gateway and the component DGSes. Depending on the cardinality of the index and the complexity of its query we may need to coordinate with service owners and/or run backfills off peak. We continue to manage tradeoffs between reindexing speed and load.

Reverse Lookups

Reverse lookups, while convenient, are not particularly user friendly. They introduce a circular dependency in the pipeline — you can’t create the indexing pipeline without reverse lookups and reverse lookups need the index to function — which we’ve mitigated although it still creates some confusion. They also require the definer of the index to have detailed knowledge of the eventing for related entities they want to include and that may cover many different domains depending on the index — we have one index covering eight domains.

Index consistency

As an index becomes more complex it is likely to depend on more DGSes and the likelihood of errors increases when fetching the required documents from the federated graph. These errors can lead to documents in the index being out of date or even missing altogether. The owner of the index is often required to follow up with other domain teams regarding errors in related entities and be in the unenviable position of not being able to do much to resolve the issues independently. When the errors are resolved, the process of replaying the failed events is manual and there can be a lag when the service is again successfully returning data but the index does not match it.

Stay Tuned

In this post, we described how our indexing infrastructure moves data for any given subgraph of the Netflix Content federated graph to Elasticsearch and keeps that data in sync with the source of truth. In an upcoming post, we’ll describe how this data can be queried without actually needing to know anything about Elasticsearch.

Credits

Thanks to Anoop Panicker, Bo Lei, Charles Zhao, Chris Dhanaraj, Hemamalini Kannan, Jim Isaacs, Johnny Chang, Kasturi Chatterjee, Kishore Banala, Kevin Zhu, Tom Lee, Tongliang Liu, Utkarsh Shrivastava, Vince Bello, Vinod Viswanathan, Yucheng Zeng


How Netflix Content Engineering makes a federated graph searchable was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Movement in Netflix Studio via Data Mesh

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-movement-in-netflix-studio-via-data-mesh-3fddcceb1059

By Andrew Nguonly, Armando Magalhães, Obi-Ike Nwoke, Shervin Afshar, Sreyashi Das, Tongliang Liu, Wei Liu, Yucheng Zeng

Background

Over the next few years, most content on Netflix will come from Netflix’s own Studio. From the moment a Netflix film or series is pitched and long before it becomes available on Netflix, it goes through many phases. This happens at an unprecedented scale and introduces many interesting challenges; one of the challenges is how to provide visibility of Studio data across multiple phases and systems to facilitate operational excellence and empower decision making. Netflix is known for its loosely coupled microservice architecture and with a global studio footprint, surfacing and connecting the data from microservices into a studio data catalog in real time has become more important than ever.

Operational Reporting is a reporting paradigm specialized in covering high-resolution, low-latency data sets, serving detailed day-to-day activities¹ and processes of a business domain. Such a paradigm aspires to assist front-line operations personnel and stakeholders in “running the business”²; performing their tasks through means such as ad hoc analysis, decision-support, and tracking (of tasks, assets, schedules, etc). The paradigm spans across methods, tools, and technologies and is usually defined in contrast to analytical reporting and predictive modeling which are more strategic (vs. tactical) in nature.

At Netflix Studio, teams build various views of business data to provide visibility for day-to-day decision making. With dependable near real-time data, Studio teams are able to track and react better to the ever-changing pace of productions and improve efficiency of global business operations using the most up-to-date information. Data connectivity across Netflix Studio and availability of Operational Reporting tools also incentivizes studio users to avoid forming data silos.

The Journey

In the past few years, Netflix Studio has gone through few iterations of data movement approaches. In the initial stage, data consumers set up ETL pipelines directly pulling data from databases. With this batch style approach, several issues have surfaced like data movement is tightly coupled with database tables, database schema is not an exact mapping of business data model, and data being stale given it is not real time etc. Later on, we moved to event driven streaming data pipelines (powered by Delta), which solved some problems compared to the batch style, but had its own pain points, such as a high learning curve of stream processing technologies, manual pipeline setup, a lack of schema evolution support, inefficiency of onboarding new entities, inconsistent security access models, etc.

With the latest Data Mesh Platform, data movement in Netflix Studio reaches a new stage. This configuration driven platform decreases the significant lead time when creating a new pipeline, while offering new support features like end-to-end schema evolution, self-serve UI and secure data access. The high level diagram below indicates the latest version of data movement for Operational Reporting.

Operational Reporting Architecture Overview
Operational Reporting Architecture Overview

For data delivery, we leverage the Data Mesh platform to power the data movement. Netflix Studio applications expose GraphQL queries via Studio Edge, which is a unified graph that connects all data in Netflix Studio and provides consistent data retrieval. Change Data Capture(CDC) source connector reads from studio applications’ database transaction logs and emits the change events. The CDC events are passed on to the Data Mesh enrichment processor, which issues GraphQL queries to Studio Edge to enrich the data. Once the data has landed in the Iceberg tables in Netflix Data Warehouse, they could be used for ad-hoc or scheduled querying and reporting. Centralized data will be moved to third party services such as Google Sheets and Airtable for the stakeholders. We will deep dive into Data Delivery and Data Consumption in the following sections.

Data Delivery via Data Mesh

What is Data Mesh?

Data Mesh is a fully managed, streaming data pipeline product used for enabling Change Data Capture (CDC) use cases. In Data Mesh, users create sources and construct pipelines. Sources mimic the state of an externally managed source — as changes occur in the external source, corresponding CDC messages are produced to the Data Mesh source. Pipelines can be configured to transform and store data to externally managed sinks.

Data Mesh provides a drag-and-drop, self-service user interface for exploring sources and creating pipelines so that users can focus on delivering business value without having to worry about managing and scaling complex data streaming infrastructure.

CDC and data source

Change data capture or CDC, is a semantic for processing changes in a source for the purpose of replicating those changes to a sink. The table changes could be row changes (insert row, update row, delete row) or schema changes (add column, alter column, drop column). As of now, CDC sources have been implemented for data stores at Netflix (MySQL, Postgres). CDC events can also be sent to Data Mesh via a Java Client Producer Library.

Reusable Processors and Configuration Driven

In Data Mesh, a processor is a configurable data processing application that consumes, transforms, and produces CDC events. A processor has 1 or more inputs and 0 or more outputs. Processors with 0 outputs are sink connectors; which write events to externally managed sinks (e.g. Iceberg, ElasticSearch, etc).

Processors with Different Inputs/Outputs
Processors with Different Inputs/Outputs

Data Mesh allows developers to contribute processors to the platform. Processors are not necessarily centrally developed and managed. However, the Data Mesh platform team strives to provide and manage the most highly leveraged processors (e.g. source connectors and sink connectors)

Processors are reusable. The same processor image package is used multiple times for all instances of the processor. Each instance is configured to fit each use case. For example, a GraphQL enrichment processor can be provisioned to query GraphQL Services to enrich data in different pipelines; an Iceberg sink processor can be initialized multiple times to write data to different databases/tables with different schema.

End-to-End Schema Evolution

Schema is a key component of Data Mesh. When an upstream schema evolves (e.g. schema change in the MySQL table), Data Mesh detects the change, checks the compatibility and applies the change to the downstream. With schema evolution, Data Mesh ensures the Operational Reporting pipelines always produce data with the latest schema.

We will cover a few core concepts in the Data Mesh Schema domain.

Consumer schema
Consumer schema defines how data is consumed by the downstream processors. See example below.

Consumer Schema Example
Consumer Schema Example

Schema Compatibility
Data Mesh uses Consumer Schema compatibility to achieve flexible yet safe schema evolution. If a field consumed by an Operational Reporting pipeline is removed from CDC source, Data Mesh categorizes this change as incompatible, pauses the pipeline processing and notifies the pipeline owner. On the other hand, if a required field is not consumed by any consumer, dropping such fields would be compatible.

Two Types of Processors
1. Pass through all fields from upstream to downstream.

  • Example: Filter Processor, Sink Processors
Opt in to schema Evolution example

2. Only uses a subset of fields from upstream.

  • Example: Project Processor, Enrichment Processor
Opt out to schema Evolution example

In Data Mesh, we introduce the Opt-in to Schema Evolution boolean flag to differentiate those two types of use cases.

  • Opt in: All the upstream fields will be propagated to the processor. For example, when a new field is added upstream, it will be propagated automatically.
  • Opt out: Only a subset of fields (defined using ‘Is Consumed’ checkboxes) is propagated and used in the processor. Upstream changes to the rest of the fields won’t affect this processor.

Schema Propagation
After the Schema Compatibility is checked, Data Mesh Platform will propagate the schema change based on the end user’s intention. With the opt-in to schema Evolution flag, Operational Reporting pipelines can keep the schema up-to-date with upstream data stores. As part of schema propagation, the platform also syncs the schema from the pipeline to the Iceberg sink.

Schema Evolution Diagram

Enrichment Processor via GraphQL

In the current Data Mesh Operational Reporting pipelines, the most commonly used intermediate processor is the GraphQL Enrichment Processor. It takes in the column value from CDC events coming from Source Connector as GraphQL query input, then submits a query to Studio Edge to enrich the data. With Studio Edge’s single data model, it centralizes data modeling efforts, which is highly leveraged by Studio UI Apps, Backend services and Search platforms. Enriching the data via Studio Edge helps us achieve consistent data modeling across the whole ecosystem for Operational Reporting.

Here is the example of GraphQL processor configuration, pipeline builder only need config the following fields to provision an enrichment processor:

GraphQL Enrichment Processor Configuration Example

The image below is a sample Operational Reporting pipeline in the production environment to sink the Movie related data. Teams who want to move their data no longer need to learn and write customized Stream Processing jobs. Instead they just need to configure the pipeline topology in the UI while getting other features like schema evolution and secure data access out of the box.

Operational Reporting Pipeline Example

Iceberg Sink

Apache Iceberg is an open source table format for huge analytics datasets. Data Mesh leverages Iceberg tables as data warehouse sinks for downstream analytics use cases. Currently Iceberg sink is appended only. Views are built on top of the raw Iceberg tables to retrieve the latest record for every primary key based on the operational timestamp, which indicates when the record is produced in the sink. Current pipeline consumers are directly consuming Views instead of raw tables.

The compaction process is needed to optimize the performance of downstream queries on the business view as well as lower costs of S3 GET OBJECT operations. A daily process ranks the records by timestamp to generate a data frame of compacted records. Old data files are overwritten with a set of new data files that contain only the compacted data.

Data Quality

Data Mesh provides metrics and dashboards at both the processor and pipeline level for operational observability. Operational Reporting pipeline owners will get alerts if something goes wrong with their pipelines. We also have two types of auditing on the data tables generated from Data Mesh pipelines to guarantee data quality: end-to-end auditing and synthetic events.

Most of the business views created on top of the Iceberg tables can tolerate a few minutes of latency. However, it is paramount that we validate the complete set of identifiers such as a list of movie ids across producers and consumers for higher overall confidence in the data transport layer of choice. For end-to-end audits, the objective is to run the audits hourly via Big data Platform Scheduler, which is a centralized and integrated tool provided by Netflix data platform for running workflows in an efficient, reliable and reproducible way. The audits check for equality (i.e. query results should be the same), the symmetric difference between two data sets should be empty across multiple runs, and the eventual consistency within the SLA. An hourly notification is sent when a set of primary keys consistently do not match between source of truth and target Data Mesh tables.

End to End (Black Box) Auditing Example

Synthetic events audits are artificially triggered change events to imitate common CUD operations of services. It is generating heartbeat signals at a constant frequency with the objective of using them as a baseline to verify the health of the pipeline regardless of traffic patterns or occasional silences.

Data Consumption

Our studio partners rely on data to make informed decisions and to collaborate during all the phases related to production. The Studio Tech Solutions team provides near real-time reports in some data tool of choice, which we call trackers to empower the decision making.

For the past few years, many of these trackers were powered by hand-curated SQL scripts and API calls being managed by CRON schedulers implemented in a Java Service called Lego. Lego was the main tool for the STS team, and at its peak, Lego managed 300+ trackers.

This strategy had its own set of challenges: being schema-less and treating every report column like a string not always worked out, the volatile reliance on direct RDS connections and rate limits from third party APIs would often make jobs fail. We had a set of “core views” which would be specifically tailored for reports, but this caused queries that just required a very small subset of fields to be slow and expensive due to the view doing a huge amount of joining and aggregation work before being able to retrieve that small subset.

Besides the issues, this worked fine when we didn’t have many trackers to maintain, but as we created more trackers to the point of having many hundreds, we started having issues around maintenance, awareness, knowledge sharing and standardization. New team members had a hard time getting onboard, figuring out which SQL powered which tracker was tough, the lack of standards made every SQL look different and having to update trackers as the data sources changed was a nightmare.

With this in mind, the Studio Tech Solutions focused efforts in building Genesis, a Semantic Data Layer that allows the team to map data points in Data Source Definitions defined as YAML files and then use those to generate the SQL needed for the trackers, based on a selection of fields, filters and formatters specified in an Input Definition file. Genesis takes care of joining, aggregating, formatting and filtering data based on what is available in the Data Source Definitions and specified by the user through the Input Definition being executed.

Genesis Data Source and Input definition example

Genesis is a stateless CLI written in Node.js that reads everything it needs from the file system based on the paths specified in the arguments. This allows us to hook Genesis into Jenkins Jobs, providing a GitOps and CI experience to maintain existing trackers, as well as create new trackers. We can simply change the data layer, trigger an empty pull request, review the changes and have all our trackers up to date with the data source changes.

As of the date of writing, Genesis powers 240+ trackers and is growing everyday, empowering thousands of partners in our studios globally to collaborate, annotate and share information using near-real-time data.

Git-based Tracker management workflow powered by Genesis and the Big Data Scheduler

The generated queries are then used in Workflow Definitions for multiple trackers. The Netflix Data Warehouse offers support for users to create data movement workflows that are managed through our Big Data Scheduler, powered by Titus.

We use the scheduler to execute our queries and move the results to a data tool, which often is a Google Sheet Tab, Airtable base or Tableau dashboard. The scheduler offers templated jobs for moving data from a Presto SQL output to these tools, making it easy to create and maintain hundreds of data movement workflows.

The diagram below summarizes the data consumption flow when building trackers:

Data Consumption Overview

As of July 2021, the Studio Tech Solutions team is finishing a migration from all the trackers built in Lego to use Genesis and the Data Portal. This strategy has increased the Studio Tech Solutions team performance and stability. Trackers are now easy for the team to create, review, change, monitor and discover.

Now and Future

In conclusion, our studio partners have a tracker available to them, populated with near real-time data and tailored to their needs. They can manipulate, annotate, and collaborate using a flexible tool they are familiar with.

Along the journey, we have learned that evolving data movement in complex domains could take multiple iterations and needs to be driven by the business impact. The great cross-functional partnership and collaboration among all data stakeholders is crucial to shape the ideal data product.

However, our story doesn’t end here. We still have a long journey ahead of us to fulfill the vision of such ideal data product, especially in areas such as:

  • Self-servicing data pipelines provisioning via configuration
  • Providing toolings for data discoverability, understandability, usage visibility and change management
  • Enabling data domain orientation and ownership/governance management
  • Bootstrapping trackers in our Studio ecosystem instead of third party tools. Along the same line as the point above, this would allow us to maintain high standards of data governance, lineage, and security.
  • Read-write reports and trackers using GraphQL mutations

These are some of the interesting areas that Netflix Studio is planning to invest in. We will have follow up blog posts on these topics in future. Please stay tuned!

Endnotes

¹ Inmon, Bill. Operational and Informational Reporting, Information Management, July 1st, 2000.
² Dehghani, Zhamak. Data Mesh: Delivering Data-driven Value at Scale, O’Reilly Media, Inc., 2021.

Acknowledgements

Data Movement via Data Mesh has been a success in Netflix Studio owing to multiple teams’ efforts. We would like to acknowledge the following colleagues: Amanda Benhamou, Andreas Andreakis, Anthony Preza, Bo Lei, Charles Zhao, Justin Cunningham, Kasturi Chatterjee, Kevin Zhu, Stephanie Barreyro, Yoomi Koh.


Data Movement in Netflix Studio via Data Mesh was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.