Ready-to-go sample data pipelines with Dataflow

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/ready-to-go-sample-data-pipelines-with-dataflow-17440a9e141d

by Jasmine Omeke, Obi-Ike Nwoke, Olek Gorajek


This post is for all data practitioners, who are interested in learning about bootstrapping, standardization and automation of batch data pipelines at Netflix.

You may remember Dataflow from the post we wrote last year titled Data pipeline asset management with Dataflow. That article was a deep dive into one of the more technical aspects of Dataflow and didn’t properly introduce this tool in the first place. This time we’ll try to give justice to the intro and then we will focus on one of the very first features Dataflow came with. That feature is called sample workflows, but before we start in let’s have a quick look at Dataflow in general.



Dataflow is a command line utility built to improve experience and to streamline the data pipeline development at Netflix. Check out this high level Dataflow help command output below:

$ dataflow --help
Usage: dataflow [OPTIONS] COMMAND [ARGS]...

--docker-image TEXT Url of the docker image to run in.
--run-in-docker Run dataflow in a docker container.
-v, --verbose Enables verbose mode.
--version Show the version and exit.
--help Show this message and exit.

migration Manage schema migration.
mock Generate or validate mock datasets.
project Manage a Dataflow project.
sample Generate fully functional sample workflows.

As you can see Dataflow CLI is divided into four main subject areas (or commands). The most commonly used one is dataflow project, which helps folks in managing their data pipeline repositories through creation, testing, deployment and few other activities.

The dataflow migration command is a special feature, developed single handedly by Stephen Huenneke, to fully automate the communication and tracking of a data warehouse table changes. Thanks to the Netflix internal lineage system (built by Girish Lingappa) Dataflow migration can then help you identify downstream usage of the table in question. And finally it can help you craft a message to all the owners of these dependencies. After your migration has started Dataflow will also keep track of its progress and help you communicate with the downstream users.

Dataflow mock command is another standalone feature. It lets you create YAML formatted mock data files based on selected tables, columns and a few rows of data from the Netflix data warehouse. Its main purpose is to enable easy unit testing of your data pipelines, but it can technically be used in any other situations as a readable data format for small data sets.

All the above commands are very likely to be described in separate future blog posts, but right now let’s focus on the dataflow sample command.

Sample workflows

Dataflow sample workflows is a set of templates anyone can use to bootstrap their data pipeline project. And by “sample” we mean “an example”, like food samples in your local grocery store. One of the main reasons this feature exists is just like with food samples, to give you “a taste” of the production quality ETL code that you could encounter inside the Netflix data ecosystem.

All the code you get with the Dataflow sample workflows is fully functional, adjusted to your environment and isolated from other sample workflows that others generated. This pipeline is safe to run the moment it shows up in your directory. It will, not only, build a nice example aggregate table and fill it up with real data, but it will also present you with a complete set of recommended components:

  • clean DDL code,
  • proper table metadata settings,
  • transformation job (in a language of choice) wrapped in an optional WAP (Write, Audit, Publish) pattern,
  • sample set of data audits for the generated data,
  • and a fully functional unit test for your transformation logic.

And last, but not least, these sample workflows are being tested continuously as part of the Dataflow code change protocol, so you can be sure that what you get is working. This is one way to build trust with our internal user base.

Next, let’s have a look at the actual business logic of these sample workflows.

Business Logic

There are several variants of the sample workflow you can get from Dataflow, but all of them share the same business logic. This was a conscious decision in order to clearly illustrate the difference between various languages in which your ETL could be written in. Obviously not all tools are made with the same use case in mind, so we are planning to add more code samples for other (than classical batch ETL) data processing purposes, e.g. Machine Learning model building and scoring.

The example business logic we use in our template computes the top hundred movies/shows in every country where Netflix operates on a daily basis. This is not an actual production pipeline running at Netflix, because it is a highly simplified code but it serves well the purpose of illustrating a batch ETL job with various transformation stages. Let’s review the transformation steps below.

Step 1: on a daily basis, incrementally, sum up all viewing time of all movies and shows in every country

, country_code
, SUM(view_hours) AS view_hours
FROM some_db.source_table
WHERE playback_date = CURRENT_DATE
, country_code

Step 2: rank all titles from most watched to least in every county

, country_code
, view_hours
PARTITION BY country_code
ORDER BY view_hours DESC
) AS title_rank

Step 3: filter all titles to the top 100

, country_code
, view_hours
, title_rank
WHERE title_rank <= 100

Now, using the above simple 3-step transformation we will produce data that can be written to the following Iceberg table:

CREATE TABLE IF NOT EXISTS ${TARGET_DB}.dataflow_sample_results (
title_id INT COMMENT "Title ID of the movie or show."
, country_code STRING COMMENT "Country code of the playback session."
, title_rank INT COMMENT "Rank of a given title in a given country."
, view_hours DOUBLE COMMENT "Total viewing hours of a given title in a given country."
"Example dataset brought to you by Dataflow. For more information on this
and other examples please visit the Dataflow documentation page."
date DATE COMMENT "Playback date."

As you can infer from the above table structure we are going to load about 19,000 rows into this table on a daily basis. And they will look something like this:

 sql> SELECT * FROM foo.dataflow_sample_results 
WHERE date = 20220101 and country_code = 'US'
ORDER BY title_rank LIMIT 5;

title_id | country_code | title_rank | view_hours | date
11111111 | US | 1 | 123 | 20220101
44444444 | US | 2 | 111 | 20220101
33333333 | US | 3 | 98 | 20220101
55555555 | US | 4 | 55 | 20220101
22222222 | US | 5 | 11 | 20220101
(5 rows)

With the business logic out of the way, we can now start talking about the components, or the boiler-plate, of our sample workflows.


Let’s have a look at the most common workflow components that we use at Netflix. These components may not fit into every ETL use case, but are used often enough to be included in every template (or sample workflow). The workflow author, after all, has the final word on whether they want to use all of these patterns or keep only some. Either way they are here to start with, ready to go, if needed.

Workflow Definitions

Below you can see a typical file structure of a sample workflow package written in SparkSQL.

├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

Above bolded files define a series of steps (a.k.a. jobs) their cadence, dependencies, and the sequence in which they should be executed.

This is one way we can tie components together into a cohesive workflow. In every sample workflow package there are three workflow definition files that work together to provide flexible functionality. The sample workflow code assumes a daily execution pattern, but it is very easy to adjust them to run at different cadence. For the workflow orchestration we use Netflix homegrown Maestro scheduler.

The main workflow definition file holds the logic of a single run, in this case one day-worth of data. This logic consists of the following parts: DDL code, table metadata information, data transformation and a few audit steps. It’s designed to run for a single date, and meant to be called from the daily or backfill workflows. This main workflow can also be called manually during development with arbitrary run-time parameters to get a feel for the workflow in action.

The daily workflow executes the main one on a daily basis for the predefined number of previous days. This is sometimes necessary for the purpose of catching up on some late arriving data. This is where we define a trigger schedule, notifications schemes, and update the “high water mark” timestamps on our target table.

The backfill workflow executes the main for a specified range of days. This is useful for restating data, most often because of a transformation logic change, but sometimes as a response to upstream data updates.


Often, the first step in a data pipeline is to define the target table structure and column metadata via a DDL statement. We understand that some folks choose to have their output schema be an implicit result of the transform code itself, but the explicit statement of the output schema is not only useful for adding table (and column) level comments, but also serves as one way to validate the transform logic.

├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

Generally, we prefer to execute DDL commands as part of the workflow itself, instead of running outside of the schedule, because it simplifies the development process. See below example of hooking the table creation SQL file into the main workflow definition.

      - job:
id: ddl
type: Spark
script: $S3{./ddl/dataflow_sparksql_sample.sql}


The metadata step provides context on the output table itself as well as the data contained within. Attributes are set via Metacat, which is a Netflix internal metadata management platform. Below is an example of plugging that metadata step in the main workflow definition

     - job:
id: metadata
type: Metadata
owner: ${username}
- dataflow
- sample
lifetime: 123
date: pk
country_code: pk
rank: pk


The transformation step (or steps) can be executed in the developer’s language of choice. The example below is using SparkSQL.

├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

Optionally, this step can use the Write-Audit-Publish pattern to ensure that data is correct before it is made available to the rest of the company. See example below:

      - template:
id: wap
type: wap
- job:
id: write
type: Spark
script: $S3{./src/sparksql_write.sql}


Audit steps can be defined to verify data quality. If a “blocking” audit fails, the job will halt and the write step is not committed, so invalid data will not be exposed to users. This step is optional and configurable, see a partial example of an audit from the main workflow below.

- function: columns_should_not_have_nulls
blocking: true
table: ${TARGET_TABLE}
- title_id

High-Water-Mark Timestamp

A successful write will typically be followed by a metadata call to set the valid time (or high-water mark) of a dataset. This allows other processes, consuming our table, to be notified and start their processing. See an example high water mark job from the main workflow definition.

      - job:
id: hwm
type: HWM
hwm_datetime: ${EXECUTION_DATE}
hwm_timezone: ${EXECUTION_TIMEZONE}

Unit Tests

Unit test artifacts are also generated as part of the sample workflow structure. They consist of data mocks, the actual test code, and a simple execution harness depending on the workflow language. See the bolded file below.

├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│ └── dataflow_sparksql_sample.sql
└── src
├── mocks
│ ├── dataflow_pyspark_sample.yaml
│ └── some_db.source_table.yaml
├── sparksql_write.sql
└── test_sparksql_write.py

These unit tests are intended to test one “unit” of data transform in isolation. They can be run during development to quickly capture code typos and syntax issues, or during automated testing/deployment phase, to make sure that code changes have not broken any tests.

We want unit tests to run quickly so that we can have continuous feedback and fast iterations during the development cycle. Running code against a production database can be slow, especially with the overhead required for distributed data processing systems like Apache Spark. Mocks allow you to run tests locally against a small sample of “real” data to validate your transformation code functionality.


Over time, the extraction of data from Netflix’s source systems has grown to encompass a wider range of end-users, such as engineers, data scientists, analysts, marketers, and other stakeholders. Focusing on convenience, Dataflow allows for these differing personas to go about their work seamlessly. A large number of our data users employ SparkSQL, pyspark, and Scala. A small but growing contingency of data scientists and analytics engineers use R, backed by the Sparklyr interface or other data processing tools, like Metaflow.

With an understanding that the data landscape and the technologies employed by end-users are not homogenous, Dataflow creates a malleable path toward. It solidifies different recipes or repeatable templates for data extraction. Within this section, we’ll preview a few methods, starting with sparkSQL and python’s manner of creating data pipelines with dataflow. Then we’ll segue into the Scala and R use cases.

To begin, after installing Dataflow, a user can run the following command to understand how to get started.

$ dataflow sample workflow --help                                                         
Dataflow (0.6.16)

Usage: dataflow sample workflow [OPTIONS] RECIPE [TARGET_PATH]

Create a sample workflow based on selected RECIPE and land it in the
specified TARGET_PATH.

Currently supported workflow RECIPEs are: spark-sql, pyspark,
scala and sparklyr.

- if not specified, current directory is assumed
- points to a directory, it will be used as the target location

--source-path TEXT Source path of the sample workflows.
--workflow-shortname TEXT Workflow short name.
--workflow-id TEXT Workflow ID.
--skip-info Skip the info about the workflow sample.
--help Show this message and exit.

Once again, let’s assume we have a directory called stranger-data in which the user creates workflow templates in all four languages that Dataflow offers. To better illustrate how to generate the sample workflows using Dataflow, let’s look at the full command one would use to create one of these workflows, e.g:

$ cd stranger-data
$ dataflow sample workflow spark-sql ./sparksql-workflow

By repeating the above command for each type of transformation language we can arrive at the following directory structure

├── pyspark-workflow
│ ├── main.sch.yaml
│ ├── daily.sch.yaml
│ ├── backfill.sch.yaml
│ ├── ddl
│ │ └── ...
│ ├── src
│ │ └── ...
│ └── tox.ini
├── scala-workflow
│ ├── build.gradle
│ └── ...
├── sparklyR-workflow
│ └── ...
└── sparksql-workflow
└── ...

Earlier we talked about the business logic of these sample workflows and we showed the Spark SQL version of that example data transformation. Now let’s discuss different approaches to writing the data in other languages.


This partial pySpark code below will have the same functionality as the SparkSQL example above, but it utilizes Spark dataframes Python interface.

def main(args, spark):

source_table_df = spark.table(f"{some_db}.{source_table})

viewing_by_title_country = (
source_table_df.select("title_id", "country_code",
.filter(col("date") == date)
.filter("title_id IS NOT NULL AND view_hours > 0")
.groupBy("title_id", "country_code")

window = Window.partitionBy(

ranked_viewing_by_title_country = viewing_by_title_country.withColumn(
"title_rank", rank().over(window)

col("title_rank") <= 100
"date", lit(int(date))
target_table, overwrite=True


Scala is another Dataflow supported recipe that offers the same business logic in a sample workflow out of the box.

package com.netflix.spark

object ExampleApp {
import spark.implicits._

def readSourceTable(sourceDb: String, dataDate: String): DataFrame =
.filter($"playback_start_date" === dataDate)

def viewingByTitleCountry(sourceTableDF: DataFrame): DataFrame = {
.select($"title_id", $"country_code", $"view_hours")
.filter($"view_hours" > 0)
.groupBy($"title_id", $"country_code")

def addTitleRank(viewingDF: DataFrame): DataFrame = {
"title_rank", F.rank().over(

def writeViewing(viewingDF: DataFrame, targetTable: String, dataDate: String): Unit = {
.select($"title_id", $"country_code", $"title_rank", $"view_hours")
.filter($"title_rank" <= 100)
.withColumn("date", F.lit(dataDate.toInt))

def main():
sourceTableDF = readSourceTable("some_db", "source_table", 20200101)
viewingDf = viewingByTitleCountry(sourceTableDF)
titleRankedDf = addTitleRank(viewingDF)

R / sparklyR

As Netflix has a growing cohort of R users, R is the latest recipe available in Dataflow.



main <- function(args, spark) {
title_df <- tbl(spark, g("{some_db}.{source_table}"))

title_activity_by_country <- title_df |>
filter(title_date == date) |>
filter(!is.null(title_id) & event_count > 0) |>
select(title_id, country_code, event_type) |>
group_by(title_id, country_code) |>
summarize(event_count = sum(event_type, na.rm = TRUE))

ranked_title_activity_by_country <- title_activity_by_country |>
group_by(country_code) |>
mutate(title_rank = rank(desc(event_count)))

top_25_title_by_country <- ranked_title_activity_by_country |>
ungroup() |>
filter(title_rank <= 25) |>
mutate(date = as.integer(date)) |>

top_25_title_by_country |>
sdf_repartition(partitions = 1) |>
spark_insert_table(target_table, mode = "overwrite")
main(args = args, spark = spark)


As you can see we try to make Netflix data engineering life easier by offering paved paths and suggestions on how to structure their code, while trying to keep the variety of options wide enough so they can pick and choose what works best for them in any particular case.

Having a well-defined set of defaults for data pipeline creation across Netflix makes onboarding easier, provides standardization and centralization best practices. Let’s review them below.


Ramping up on a new team or a business vertical always takes some effort, especially in a “highly aligned, loosely coupled” culture. Having a well-documented starting point removes some of the struggle that comes with starting from scratch and considerably speeds up the first iteration of the development cycle.


Standardization makes life easier for new team members as well as those already familiar with the domain and tech stack.

Some transfer of work between people or teams is inevitable. Having standardized layout and patterns removes friction from this exchange. Also, code reviews and suggestions are easier to manage when working from a similar baseline.

Standardization also makes project layout more intuitive and minimizes risk of human error as the codebase evolves.

Centralized Best Practices

Data infrastructure evolves continually. Having easy access to a centralized set of good defaults is critical to ensure that best practices evolve along with the technology, and that users are aware of what’s the latest on the tech-stack menu.

Even better, Dataflow offers executable best practices, which present these concepts in the context of an actual use case. Instead of reading documentation, you can initialize a “real” project, change it as needed, and iterate from there.


Special thanks to Daniel Watson, Jim Hester, Stephen Huenneke, Girish Lingappa for their contributions to Dataflow sample workflows and to Andrea Hairston for the Dataflow logo design.

Next Episode

Hopefully you won’t need to wait another year to read about other features of Dataflow. Here are a few topics that we could write about next. Please have a look at the subjects below and, if you feel strongly about any of them, let us know in the comments section:

  • Branch driven deployment — to explain how Dataflow lets anyone customize their CI/CD jobs based on the git branch for easy testing in isolated environments.
  • Local SparkSQL unit testing— to clarify how Dataflow helps in making robust unit tests for Spark SQL transform code, with ease.
  • Data migrations made easy — to show how Dataflow can be used to plan a table migration, support the communication with downstream users and help in monitoring it to completion.

Ready-to-go sample data pipelines with Dataflow was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Hybrid Cloud Architectures Using Self-hosted Apache Kafka and AWS Glue

Post Syndicated from Brandon Rubadou original https://aws.amazon.com/blogs/architecture/hybrid-cloud-architectures-using-self-hosted-apache-kafka-and-aws-glue/

Using analytics to gain insights from a variety of datasets is key to successful transformation. There are many options to consider to realize the full value and potential of our data in a hybrid cloud infrastructure. Common practice is to route data produced from on-premises to a central repository or data lake. Here it can be consumed by multiple applications.

You can use an Apache Kafka cluster for data movement from on-premises to the data lake, using Amazon Simple Storage Service (Amazon S3). But you must either replicate the topics onto a cloud cluster, or develop a custom connector to read and copy the topics to Amazon S3. This presents a challenge for many customers.

This blog presents another option; an architecture solution leveraging AWS Glue.

Kafka and ETL processing

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. You can use Kafka clusters as a system to move data between systems. Producers typically publish data (or push) to a Kafka topic, where an application can consume it. Consumers are usually custom applications that feed data into respective target applications. These targets can be a data warehouse, an Amazon OpenSearch Service cluster, or others.

AWS Glue offers the ability to create jobs that will extract, transform, and load (ETL) data. This allows you to consume from many sources, such as from Apache Kafka, Amazon Kinesis Data Streams, or Amazon Managed Streaming for Apache Kafka (Amazon MSK). The jobs cleanse and transform the data, and then load the results into Amazon S3 data lakes or JDBC data stores.

Hybrid solution and architecture design

In most cases, the first step in building a responsive and manageable architecture is to review the data itself. For example, if we are processing insurance policy data from a financial organization, our data may contain fields that identify customer data. These can be account ID, an insurance claim identifier, and the dollar amount of the specific claim. Glue provides the ability to change any of these field types into the expected data lake schema type for processing.

Figure 1. Data flow - Source to data lake target

Figure 1. Data flow – Source to data lake target

Next, AWS Glue must be configured to connect to the on-premises Kafka server (see Figure 1). Private and secure connectivity to the on-premises environment can be established via AWS Direct Connect or a VPN solution. Traffic from the Amazon Virtual Private Cloud (Amazon VPC) is allowed to access the cluster directly. You can do this by creating a three-step streaming ETL job:

  1. Create a Glue connection to the on-premises Kafka source
  2. Create a Data Catalog table
  3. Create an ETL job, which saves to an S3 data lake

Configuring AWS Glue

  1. Create a connection. Using AWS Glue, create a secure SSL connection in the Data Catalog using the predefined Kafka connection type. Enter the hostname of the on-premises cluster and use the custom-managed certificate option for additional security. If you are in a development environment, you are required to generate a self-signed SSL certificate. Use your Kafka SSL endpoint to connect to Glue. (AWS Glue also supports client authentication for Apache Kafka streams.)
  2. Specify a security group. To allow AWS Glue to communicate between its components, specify a security group with a self-referencing inbound rule for all TCP ports. By creating this rule, you can restrict the source to the same security group in the Amazon VPC. Ensure you check the default security group for your VPC, as it could have a preconfigured self-referencing inbound rule for ALL traffic.
  3. Create the Data Catalog. Glue can auto-create the data schema. Since it’s a simple flat file, use the schema detection function of Glue. Set up the Kafka topic and refer to the connection.
  4. Define the job properties. Create the AWS Identity and Access Management (IAM) role to allow Glue to connect to S3 data. Select an S3 bucket and format. In this case, we use CSV and enable schema detection.

The Glue job can be scheduled, initiated manually, or by using an event driven architecture. Note that Glue does not yet support the “test connection” option within the console. Make sure you set the “Job Timeout” and enter a duration in minutes because the default value is blank.

When the job runs, it pulls the latest topics from the source on-premises Kafka cluster. Glue supports checkpoints to ensure that all source data is processed. By default, AWS Glue processes and writes out data in 100-second windows. This allows data to be processed efficiently and permits aggregations to be performed on data arriving later. You can modify this window size to increase timeliness or aggregation accuracy. AWS Glue streaming jobs use checkpoints rather than job bookmarks to track the data that has been read. AWS Glue bills hourly for streaming ETL jobs only while they are running.

Now that the connection is complete and the job is created, we can format the source data needed for the data lake. AWS Glue offers a set of built-in transforms that you can use to process your data using your ETL script. The transformed data is then placed in S3, where it can be leveraged as part of a larger data lake environment.

Many additional steps can be taken to render even more value from the information. For example, one team may choose to use a business intelligence tool like Amazon QuickSight to visualize and embed the data into an internal dashboard. Another team may want to use event driven architectures to notify financial analysts and initiate downstream actions when specific types of data are discovered. There are endless opportunities that should be determined by the business needs.


In this blog post, we have given an overview of an architecture that provides hybrid cloud data integration and analytics capability. Once the data is transformed and hosted in the S3 data lake, we can provide secure, reliable access to gain valuable insights. This solution allows for a variety of different producers and consumers, with the ability to handle increasing volumes of data.

AWS Glue along with Apache Kafka will ensure that your on-premises workloads are tightly integrated with your larger data lake solution.

If you have questions, post your thoughts in the comments section.

For further reading:

Address Modernization Tradeoffs with Lake House Architecture

Post Syndicated from Sukhomoy Basak original https://aws.amazon.com/blogs/architecture/address-modernization-tradeoffs-with-lake-house-architecture/

Many organizations are modernizing their applications to reduce costs and become more efficient. They must adapt to modern application requirements that provide 24×7 global access. The ability to scale up or down quickly to meet demand and process a large volume of data is critical. This is challenging while maintaining strict performance and availability. For many companies, modernization includes decomposing a monolith application into a set of independently developed, deployed, and managed microservices. The decoupled nature of a microservices environment allows each service to evolve agilely and independently. While there are many benefits for moving to a microservices-based architecture, there can be some tradeoffs. As your application monolith evolves into independent microservices, you must consider the implications to your data architecture.

In this blog post we will provide example use cases, and show how Lake House Architecture on AWS can streamline your microservices architecture. A Lake house architecture embraces the decentralized nature of microservices by facilitating data movement. These transfers can be between data stores, from data stores to data lake, and from data lake to data stores (Figure 1).

Figure 1. Integrating data lake, data warehouse, and all purpose-built stores into a coherent whole

Figure 1. Integrating data lake, data warehouse, and all purpose-built stores into a coherent whole

Health and wellness application challenges

Our fictitious health and wellness customer has an application architecture comprised of several microservices backed by purpose-built data stores. User profiles, assessments, surveys, fitness plans, health preferences, and insurance claims are maintained in an Amazon Aurora MySQL-Compatible relational database. The event service monitors the number of steps walked, sleep pattern, pulse rate, and other behavioral data in Amazon DynamoDB, a NoSQL database (Figure 2).

Figure 2. Microservices architecture for health and wellness company

Figure 2. Microservices architecture for health and wellness company

With this microservices architecture, it’s common to have data spread across various data stores. This is because each microservice uses a purpose-built data store suited to its usage patterns and performance requirements. While this provides agility, it also presents challenges to deriving needed insights.

Here are four challenges that different users might face:

  1. As a health practitioner, how do I efficiently combine the data from multiple data stores to give personalized recommendations that improve patient outcomes?
  2. As a sales and marketing professional, how do I get a 360 view of my customer, when data lives in multiple data stores? Profile and fitness data are in a relational data store, but important behavioral and clickstream data are in NoSQL data stores. It’s hard for me to run targeted marketing campaigns, which can lead to revenue loss.
  3. As a product owner, how do I optimize healthcare costs when designing wellbeing programs for patients?
  4. As a health coach, how do I find patients and help them with their wellness goals?

Our remaining subsections highlight AWS Lake House Architecture capabilities and features that allow data movement and the integration of purpose-built data stores.

1. Patient care use case

In this scenario, a health practitioner is interested in historical patient data to estimate the likelihood of a future outcome. To get the necessary insights and identify patterns, the health practitioner needs event data from Amazon DynamoDB and patient profile data from Aurora MySQL-Compatible. Our health practitioner will use Amazon Athena to run an ad-hoc analysis across these data stores.

Amazon Athena provides an interactive query service for both structured and unstructured data. The federated query functionality in Amazon Athena helps with running SQL queries across data stored in relational, NoSQL, and custom data sources. Amazon Athena uses Lambda-based data source connectors to run federated queries. Figure 3 illustrates the federated query architecture.

Figure 3. Amazon Athena federated query

Figure 3. Amazon Athena federated query

The patient care team could use an Amazon Athena federated query to find out if a patient needs urgent care. It is able to detect anomalies in the combined datasets from claims processing, device data, and electronic health record (HER) as show in Figure 4.

Figure 4. Federated query result by combining data from claim, device, and EHR stores

Figure 4. Federated query result by combining data from claim, device, and EHR stores

Healthcare data from various sources, including EHRs and genetic data, helps improve personalized care. Machine learning (ML) is able to harness big data and perform predictive analytics. This creates opportunities for researchers to develop personalized treatments for various diseases, including cancer and depression.

To achieve this, you must move all the related data into a centralized repository such as an Amazon S3 data lake. For specific use cases, you also must move the data between the purpose-built data stores. Finally, you must build an ML solution that can predict the outcome. Amazon Redshift ML, combined with its federated query processing capabilities enables data analysts and database developers to create a platform to detect patterns (Figure 5). With this platform, health practitioners are able to make more accurate, data-driven decisions.

Figure 5. Amazon Redshift federated query with Amazon Redshift ML

Figure 5. Amazon Redshift federated query with Amazon Redshift ML

2. Sales and marketing use case

To run marketing campaigns, the sales and marketing team must search customer data from a relational database, with event data in a non-relational data store. We will move the data from Aurora MySQL-Compatible and Amazon DynamoDB to Amazon Elasticsearch Service (ES) to meet this requirement.

AWS Database Migration Service (DMS) helps move the change data from Aurora MySQL-Compatible to Amazon ES using Change Data Capture (CDC). AWS Lambda could be used to move the change data from DynamoDB streams to Amazon ES, as shown in Figure 6.

Figure 6. Moving and combining data from Aurora MySQL-Compatible and Amazon DynamoDB to Amazon Elasticsearch Service

Figure 6. Moving and combining data from Aurora MySQL-Compatible and Amazon DynamoDB to Amazon Elasticsearch Service

The sales and marketing team can now run targeted marketing campaigns by querying data from Amazon Elasticsearch Service, see Figure 7. They can improve sales operations by visualizing data with Amazon QuickSight.

Figure 7. Personalized search experience for ad-tech marketing team

Figure 7. Personalized search experience for ad-tech marketing team

3. Healthcare product owner use case

In this scenario, the product owner must define the care delivery value chain. They must develop process maps for patient activity and estimate the cost of patient care. They must analyze these datasets by building business intelligence (BI) reporting and dashboards with a tool like Amazon QuickSight. Amazon Redshift, a cloud scale data warehousing platform, is a good choice for this. Figure 8 illustrates this pattern.

Figure 8. Moving data from Amazon Aurora and Amazon DynamoDB to Amazon Redshift

Figure 8. Moving data from Amazon Aurora and Amazon DynamoDB to Amazon Redshift

The product owners can use integrated business intelligence reports with Amazon Redshift to analyze their data. This way they can make more accurate and appropriate decisions, see Figure 9.

Figure 9. Business intelligence for patient care processes

Figure 9. Business intelligence for patient care processes

4. Health coach use case

In this scenario, the health coach must find a patient based on certain criteria. They would then send personalized communication to connect with the patient to ensure they are following the proposed health plan. This proactive approach contributes to a positive patient outcome. It can also reduce healthcare costs incurred by insurance companies.

To be able to search patient records with multiple data points, it is important to move data from Amazon DynamoDB to Amazon ES. This also will provide a fast and personalized search experience. The health coaches can be notified and will have the information they need to provide guidance to their patients. Figure 10 illustrates this pattern.

Figure 10. Moving Data from Amazon DynamoDB to Amazon ES

Figure 10. Moving Data from Amazon DynamoDB to Amazon ES

The health coaches can use Elasticsearch to search users based on specific criteria. This helps them with counseling and other health plans, as shown in Figure 11.

Figure 11. Simplified personalized search using patient device data

Figure 11. Simplified personalized search using patient device data


In this post, we highlight how Lake House Architecture on AWS helps with the challenges and tradeoffs of modernization. A Lake House architecture on AWS can help streamline the movement of data between the microservices data stores. This offers new capabilities for various analytics use cases.

For further reading on architectural patterns, and walkthroughs for building Lake House Architecture, see the following resources:

Processing ETL tasks with Ratchet

Post Syndicated from Grab Tech original https://engineering.grab.com/processing-etl-tasks-with-ratchet


At Grab, the Lending team is focused towards building products that help finance various segments of users, such as Passengers, Drivers, or Merchants, based on their needs. The team builds products that enable users to avail funds in a seamless and hassle-free way. In order to achieve this, multiple lending microservices continuously interact with each other. Each microservice handles different responsibilities, such as providing offers, storing user information, disbursing availed amounts to a user’s account, and many more.

In this tech blog, we will discuss what Data and Extract, Transform and Load (ETL) pipelines are and how they are used for processing multiple tasks in the Lending Team at Grab. We will also discuss Ratchet, which is a Go library, that helps us in building data pipelines and handling ETL tasks. Let’s start by covering the basis of Data and ETL pipelines.

What is a Data Pipeline?

A Data pipeline is used to describe a system or a process that moves data from one platform to another. In between platforms, data passes through multiple steps based on defined requirements, where it may be subjected to some kind of modification. All the steps in a Data pipeline are automated, and the output from one step acts as an input for the next step.

Data Pipeline
Data Pipeline (Source: Hazelcast)

What is an ETL Pipeline?

An ETL pipeline is a type of Data pipeline that consists of 3 major steps, namely extraction of data from a source, transformation of that data into the desired format, and finally loading the transformed data to the destination. The destination is also known as the sink.

Extract-Transform-Load (Source: TatvaSoft)

The combination of steps in an ETL pipeline provides functions to assure that the business requirements of the application are achieved.

Let’s briefly look at each of the steps involved in the ETL pipeline.

Data Extraction

Data extraction is used to fetch data from one or multiple sources with ease. The source of data can vary based on the requirement. Some of the commonly used data sources are:

  • Database
  • Web-based storage (S3, Google cloud, etc)
  • Files
  • User Feeds, CRM, etc.

The data format can also vary from one use case to another. Some of the most commonly used data formats are:

  • SQL
  • CSV
  • JSON
  • XML

Once data is extracted in the desired format, it is ready to be fed to the transformation step.

Data Transformation

Data transformation involves applying a set of rules and techniques to convert the extracted data into a more meaningful and structured format for use. The extracted data may not always be ready to use. In order to transform the data, one of the following techniques may be used:

  1. Filtering out unnecessary data.
  2. Preprocessing and cleaning of data.
  3. Performing validations on data.
  4. Deriving a new set of data from the existing one.
  5. Aggregating data from multiple sources into a single uniformly structured format.

Data Loading

The final step of an ETL pipeline involves moving the transformed data to a sink where it can be accessed for its use. Based on requirements, a sink can be one of the following:

  1. Database
  2. File
  3. Web-based storage (S3, Google cloud, etc)

An ETL pipeline may or may not have a loadstep based on its requirements. When the transformed data needs to be stored for further use, the loadstep is used to move the transformed data to the storage of choice. However, in some cases, the transformed data may not be needed for any further use and thus, the loadstep can be skipped.

Now that you understand the basics, let’s go over how we, in the Grab Lending team, use an ETL pipeline.

Why Use Ratchet?

At Grab, we use Golang for most of our backend services. Due to Golang’s simplicity, execution speed, and concurrency support, it is a great choice for building data pipeline systems to perform custom ETL tasks.

Given that Ratchet is also written in Go, it allows us to easily build custom data pipelines.

Go channels are connecting each stage of processing, so the syntax for sending data is intuitive for anyone familiar with Go. All data being sent and received is in JSON, providing a nice balance of flexibility and consistency.

Utilising Ratchet for ETL Tasks

We use Ratchet for multiple ETL tasks like batch processing, restructuring and rescheduling of loans, creating user profiles, and so on. One of the backend services, named Azkaban, is responsible for handling various ETL tasks.

Ratchet uses Data Processors for building a pipeline consisting of multiple stages. Data Processors each run in their own goroutine so all of the data is processed concurrently. Data Processors are organised into stages, and those stages are run within a pipeline. For building an ETL pipeline, each of the three steps (Extract, Transform and Load) use a Data Processor for implementation. Ratchet provides a set of built-in, useful Data Processors, while also providing an interface to implement your own. Usually, the transform stage uses a Custom Data Processor.

Data Processors in Ratchet
Data Processors in Ratchet (Source: Github)

Let’s take a look at one of these tasks to understand how we utilise Ratchet for processing an ETL task.

Whitelisting Merchants Through ETL Pipelines

Whitelisting essentially means making the product available to the user by mapping an offer to the user ID. If a merchant in Thailand receives an option to opt for Cash Loan, it is done by whitelisting that merchant. In order to whitelist our merchants, our Operations team uses an internal portal to upload a CSV file with the user IDs of the merchants and other required information. This CSV file is generated by our internal Data and Risk team and handed over to the Operations team. Once the CSV file is uploaded, the user IDs present in the file are whitelisted within minutes. However, a lot of work goes in the background to make this possible.

Data Extraction

Once the Operations team uploads the CSV containing a list of merchant users to be whitelisted, the file is stored in S3 and an entry is created on the Azkaban service with the document ID of the uploaded file.

File upload by Operations team
File upload by Operations team

The data extraction step makes use of a Custom CSV Data Processor that uses the document ID to first create a PreSignedUrl and then uses it to fetch the data from S3. The data extracted is in bytes and we use commas as the delimiter to format the CSV data.

Data Transformation

In order to transform the data, we define a Custom Data Processor that we call a Transformer for each ETL pipeline. Transformers are responsible for applying all necessary transformations to the data before it is ready for loading. The transformations applied in the merchant whitelisting transformers are:

  1. Convert data from bytes to struct.
  2. Check for presence of all mandatory fields in the received data.
  3. Perform validation on the data received.
  4. Make API calls to external microservices for whitelisting the merchant.

As mentioned earlier, the CSV file is uploaded manually by the Operations team. Since this is a manual process, it is prone to human errors. Validation of data in the data transformation step helps avoid these errors and not propagate them further up the pipeline. Since CSV data consists of multiple rows, each row passes through all the steps mentioned above.

Data Loading

Whenever the merchants are whitelisted, we don’t need to store the transformed data. As a result, we don’t have a loadstep for this ETL task, so we just use an Empty Data Processor. However, this is just one of many use cases that we have. In cases where the transformed data needs to be stored for further use, the loadstep will have a Custom Data Processor, which will be responsible for storing the data.

Connecting All Stages

After defining our Data Processors for each of the steps in the ETL pipeline, the final piece is to connect all the stages together. As stated earlier, the ETL tasks have different ETL pipelines and each ETL pipeline consists of 3 stages defined by their Data Processors.

In order to connect these 3 stages, we define a Job Processor for each ETL pipeline. A Job Processor represents the entire ETL pipeline and encompasses Data Processors for each of the 3 stages. Each Job Processor implements the following methods:

  1. SetSource: Assigns the Data Processor for the Extraction stage.
  2. SetTransformer: Assigns the Data Processor for the Transformation stage.
  3. SetDestination: Assigns the Data Processor for the Load stage.
  4. Execute: Runs the ETL pipeline.
Job processors containing Data Processor for each stage in ETL
Job processors containing Data Processor for each stage in ETL

When the Azkaban service is initialised, we run the SetSource(), SetTransformer() and SetDestination() methods for each of the Job Processors defined. When an ETL task is triggered, the Execute() method of the corresponding Job Processor is run. This triggers the ETL pipeline and gradually runs the 3 stages of ETL pipeline. For each stage, the Data Processor assigned during initialisation is executed.


ETL pipelines help us in streamlining various tasks in our team. As showcased through the example in the above section, an ETL pipeline breaks a task into multiple stages and divides the responsibilities across these stages.

In cases where a task fails in the middle of the process, ETL pipelines help us determine the cause of the failure quickly and accurately. With ETL pipelines, we have reduced the manual effort required for validating data at each step and avoiding propagation of errors towards the end of the pipeline.

Through the use of ETL pipelines and schedulers, we at Lending have been able to automate the entire pipeline for many tasks to run at scheduled intervals without any manual effort involved at all. This has helped us tremendously in reducing human errors, increasing the throughput of the system and making the backend flow more reliable. As we continue to automate more and more of our tasks that have tightly defined stages, we foresee a growth in our ETL pipelines usage.





Join Us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Bulldozer: Batch Data Moving from Data Warehouse to Online Key-Value Stores

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/bulldozer-batch-data-moving-from-data-warehouse-to-online-key-value-stores-41bac13863f8

By Tianlong Chen and Ioannis Papapanagiotou

Netflix has more than 195 million subscribers that generate petabytes of data everyday. Data scientists and engineers collect this data from our subscribers and videos, and implement data analytics models to discover customer behaviour with the goal of maximizing user joy. Usually Data scientists and engineers write Extract-Transform-Load (ETL) jobs and pipelines using big data compute technologies, like Spark or Presto, to process this data and periodically compute key information for a member or a video. The processed data is typically stored as data warehouse tables in AWS S3. Iceberg is widely adopted in Netflix as a data warehouse table format that addresses many of the usability and performance problems with Hive tables.

At Netflix, we also heavily embrace a microservice architecture that emphasizes separation of concerns. Many of these services often have the requirement to do a fast lookup for this fine-grained data which is generated periodically. For example, in order to enhance our user experience, one online application fetches subscribers’ preferences data to recommend movies and TV shows. The data warehouse is not designed to serve point requests from microservices with low latency. Therefore, we must efficiently move data from the data warehouse to a global, low-latency and highly-reliable key-value store. For how our machine learning recommendation systems leverage our key-value stores, please see more details on this presentation.

What is Bulldozer

Bulldozer is a self-serve data platform that moves data efficiently from data warehouse tables to key-value stores in batches. It leverages Netflix Scheduler for scheduling the Bulldozer Jobs. Netflix Scheduler is built on top of Meson which is a general purpose workflow orchestration and scheduling framework to execute and manage the lifecycle of the data workflow. Bulldozer makes data warehouse tables more accessible to different microservices and reduces each individual team’s burden to build their own solutions. Figure 1 shows how we use Bulldozer to move data at Netflix.

Figure 1. Moving data with Bulldozer at Netflix.

As the paved path for moving data to key-value stores, Bulldozer provides a scalable and efficient no-code solution. Users only need to specify the data source and the destination cluster information in a YAML file. Bulldozer provides the functionality to auto-generate the data schema which is defined in a protobuf file. The protobuf schema is used for serializing and deserializing the data by Bulldozer and data consumers. Bulldozer uses Spark to read the data from the data warehouse into DataFrames, converts each data entry to a key-value pair using the schema defined in the protobuf and then delivers key-value pairs into a key-value store in batches.

Instead of directly moving data into a specific key-value store like Cassandra or Memcached, Bulldozer moves data to a Netflix implemented Key-Value Data Abstraction Layer (KV DAL). The KV DAL allows applications to use a well-defined and storage engine agnostic HTTP/gRPC key-value data interface that in turn decouples applications from hard to maintain and backwards-incompatible datastore APIs. By leveraging multiple shards of the KV DAL, Bulldozer only needs to provide one single solution for writing data to the highly abstracted key-value data interface, instead of developing different plugins and connectors for different data stores. Then the KV DAL handles writing to the appropriate underlying storage engines depending on latency, availability, cost, and durability requirements.

Figure 2. How Bulldozer leverages Spark, Protobuf and KV DAL for moving the data.

Configuration-Based Bulldozer Job

For batch data movement in Netflix, we provide job templates in our Scheduler to make movement of data from all data sources into and out of the data warehouse. Templates are backed by notebooks. Our data platform provides the clients with a configuration-based interface to run a templated job with input validation.

We provide the job template MoveDataToKvDal for moving the data from the warehouse to one Key-Value DAL. Users only need to put the configurations together in a YAML file to define the movement job. The job is then scheduled and executed in Netflix Big Data Platform. This configuration defines what and where the data should be moved. Bulldozer abstracts the underlying infrastructure on how the data moves.

Let’s look at an example of a Bulldozer YAML configuration (Figure 3). Basically the configuration consists of three major domains: 1) data_movement includes the properties that specify what data to move. 2) key_value_dal defines the properties of where the data should be moved. 3) bulldozer_protobuf has the required information for protobuf file auto generation.

Figure 3. An Exemplar Bulldozer Job YAML.

In the data_movement domain, the source of the data can be a warehouse table or a SQL query. Users also need to define the key and value columns to tell Bulldozer which column is used as the key and which columns are included in the value message. We will discuss more details about the schema mapping in the next Data Model section. In the key_value_dal domain, it defines the destination of the data which is a namespace in the Key-Value DAL. One namespace in a Key-Value DAL contains as many key-value data as required, it is the equivalent to a table in a database.

Data Model

Bulldozer uses protobuf for 1) representing warehouse table schema into a key-value schema; 2) serializing and deserializing the key-value data when performing write and read operations to KV DAL. In this way, it allows us to provide a more traditional typed record store while keeping the key-value storage engine abstracted.

Figure 4 shows a simple example of how we represent a warehouse table schema into a key-value schema. The left part of the figure shows the schema of the warehouse table while the right part is the protobuf message that Bulldozer auto generates based on the configurations in the YAML file. The field names should exactly match for Bulldozer to convert the structured data entries into the key-value pairs. In this case, profile_id field is the key while email and age fields are included in the value schema. Users can use the protobuf schema KeyMessage and ValueMessage to deserialize data from Key-Value DAL as well.

Figure 4. An Example of Schema Mapping.

In this example, the schema of the warehouse table is flat, but sometimes the table can have nested structures. Bulldozer supports complicated schemas, like struct of struct type, array of struct, map of struct and map of map type.

Data Version Control

Bulldozer jobs can be configured to execute at a desired frequency of time, like once or many times per day. Each execution moves the latest view of the data warehouse into a Key-Value DAL namespace. Each view of the data warehouse is a new version of the entire dataset. For example, the data warehouse has two versions of full dataset as of January 1st and 2nd, Bulldozer job is scheduled to execute daily for moving each version of the data.

Figure 5. Dataset of January 1st 2020.
Figure 6. Dataset of January 2nd 2020.

When Bulldozer moves these versioned data, it usually has the following requirements:

  • Data Integrity. For one Bulldozer job moving one version of data, it should write the full dataset or none. Partially writing is not acceptable. For example above, if the consumer reads value for movie_id: 1 and movie_id: 2 after the Bulldozer jobs, the returned values shouldn’t come from two versions, like: (movie_id: 1, cost 40), (movie_id: 2, cost 101).
  • Seamless to Data Consumer. Once a Bulldozer job finishes moving a new version of data, the data consumer should be able to start reading the new data automatically and seamlessly.
  • Data Fallback. Normally, data consumers read only the latest version of the data, but if there’s some data corruption in that version, we should have a mechanism to fallback to the previous version.

Bulldozer leverages the KV DAL data namespace and namespace alias functionality to manage these versioned datasets. For each Bulldozer job execution, it creates a new namespace suffixed with the date and moves the data to that namespace. The data consumer reads data from an alias namespace which points to one of these version namespaces. Once the job moves the full data successfully, the Bulldozer job updates the alias namespace to point to the new namespace which contains the new version of data. The old namespaces are closed to reads and writes and deleted in the background once it’s safe to do so. As most key-value storage engines support efficiently deleting a namespace (e.g. truncate or drop a table) this allows us to cheaply recycle old versions of the data. There are also other systems in Netflix like Gutenberg which adopt a similar namespace alias approach for data versioning which is applied to terabyte datasets.

For example, in Figure 7 data consumers read the data through namespace: alias_namespace which points to one of the underlying namespaces. On January 1st 2020, Bulldozer job creates namespace_2020_01_01 and moves the dataset, alias_namespace points to namespace_2020_01_01. On January 2nd 2020, there’s a new version of data, bulldozer creates namespace_2020_01_02 , moves the new dataset and updates alias_namespace pointing to namespace_2020_01_02. Both namespace_2020_01_01 and namespace_2020_01_02 are transparent to the data consumers.

Figure 7. An Example of How the Namespace Aliasing Works.

The namespace aliasing mechanism ensures that the data consumer only reads data from one single version. If there’s a bad version of data, we can always swap the underlying namespaces to fallback to the old version.

Production Usage

We released Bulldozer in production in early 2020. Currently, Bulldozer transfers billions of records from the data warehouse to key-value stores in Netflix everyday. The use cases include our members’ predicted scores data to help improve personalized experience (one example shows in Figure 8), the metadata of Airtable and Google Sheets for data lifecycle management, the messaging modeling data for messaging personalization and more.

Figure 8. Personalized articles in Netflix Help Center powered by Bulldozer.

Stay Tuned

The ideas discussed here include only a small set of problems with many more challenges still left to be identified and addressed. Please share your thoughts and experience by posting your comments below and stay tuned for more on data movement work at Netflix.

Bulldozer: Batch Data Moving from Data Warehouse to Online Key-Value Stores was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.