Tag Archives: data

Maestro: Netflix’s Workflow Orchestrator

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/maestro-netflixs-workflow-orchestrator-ee13a06f9c78

By Jun He, Natallia Dzenisenka, Praneeth Yenugutala, Yingyi Zhang, and Anjali Norwood

TL;DR

We are thrilled to announce that the Maestro source code is now open to the public! Please visit the Maestro GitHub repository to get started. If you find it useful, please give us a star.

What is Maestro

Maestro is a general-purpose, horizontally scalable workflow orchestrator designed to manage large-scale workflows such as data pipelines and machine learning model training pipelines. It oversees the entire lifecycle of a workflow, from start to finish, including retries, queuing, task distribution to compute engines, etc.. Users can package their business logic in various formats such as Docker images, notebooks, bash script, SQL, Python, and more. Unlike traditional workflow orchestrators that only support Directed Acyclic Graphs (DAGs), Maestro supports both acyclic and cyclic workflows and also includes multiple reusable patterns, including foreach loops, subworkflow, and conditional branch, etc.

Our Journey with Maestro

Since we first introduced Maestro in this blog post, we have successfully migrated hundreds of thousands of workflows to it on behalf of users with minimal interruption. The transition was seamless, and Maestro has met our design goals by handling our ever-growing workloads. Over the past year, we’ve seen a remarkable 87.5% increase in executed jobs. Maestro now launches thousands of workflow instances and runs half a million jobs daily on average, and has completed around 2 million jobs on particularly busy days.

Scalability and Versatility

Maestro is a fully managed workflow orchestrator that provides Workflow-as-a-Service to thousands of end users, applications, and services at Netflix. It supports a wide range of workflow use cases, including ETL pipelines, ML workflows, AB test pipelines, pipelines to move data between different storages, etc. Maestro’s horizontal scalability ensures it can manage both a large number of workflows and a large number of jobs within a single workflow.

At Netflix, workflows are intricately connected. Splitting them into smaller groups and managing them across different clusters adds unnecessary complexity and degrades the user experience. This approach also requires additional mechanisms to coordinate these fragmented workflows. Since Netflix’s data tables are housed in a single data warehouse, we believe a single orchestrator should handle all workflows accessing it.

Join us on this exciting journey by exploring the Maestro GitHub repository and contributing to its ongoing development. Your support and feedback are invaluable as we continue to improve the Maestro project.

Introducing Maestro

Netflix Maestro offers a comprehensive set of features designed to meet the diverse needs of both engineers and non-engineers. It includes the common functions and reusable patterns applicable to various use cases in a loosely coupled way.

A workflow definition is defined in a JSON format. Maestro combines user-supplied fields with those managed by Maestro to form a flexible and powerful orchestration definition. An example can be found in the Maestro repository wiki.

A Maestro workflow definition comprises two main sections: properties and versioned workflow including its metadata. Properties include author and owner information, and execution settings. Maestro preserves key properties across workflow versions, such as author and owner information, run strategy, and concurrency settings. This consistency simplifies management and aids in trouble-shootings. If the ownership of the current workflow changes, the new owner can claim the ownership of the workflows without creating a new workflow version. Users can also enable the triggering or alerting features for a given workflow over the properties.

Versioned workflow includes attributes like a unique identifier, name, description, tags, timeout settings, and criticality levels (low, medium, high) for prioritization. Each workflow change creates a new version, enabling tracking and easy reversion, with the active or the latest version used by default. A workflow consists of steps, which are the nodes in the workflow graph defined by users. Steps can represent jobs, another workflow using subworkflow step, or a loop using foreach step. Steps consist of unique identifiers, step types, tags, input and output step parameters, step dependencies, retry policies, and failure mode, step outputs, etc. Maestro supports configurable retry policies based on error types to enhance step resilience.

This high-level overview of Netflix Maestro’s workflow definition and properties highlights its flexibility to define complex workflows. Next, we dive into some of the useful features in the following sections.

Workflow Run Strategy

Users want to automate data pipelines while retaining control over the execution order. This is crucial when workflows cannot run in parallel or must halt current executions when new ones occur. Maestro uses predefined run strategies to decide whether a workflow instance should run or not. Here is the list of predefined run strategies Maestro offers.

Sequential Run Strategy
This is the default strategy used by maestro, which runs workflows one at a time based on a First-In-First-Out (FIFO) order. With this run strategy, Maestro runs workflows in the order they are triggered. Note that an execution does not depend on the previous states. Once a workflow instance reaches one of the terminal states, whether succeeded or not, Maestro will start the next one in the queue.

Strict Sequential Run Strategy
With this run strategy, Maestro will run workflows in the order they are triggered but block execution if there’s a blocking error in the workflow instance history. Newly triggered workflow instances are queued until the error is resolved by manually restarting the failed instances or marking the failed ones unblocked.

An example of strict sequential run strategy

In the above example, run5 fails at 5AM, then later runs are queued but do not run. When someone manually marks run5 unblocked or restarts it, then the workflow execution will resume. This run strategy is useful for time insensitive but business critical workflows. This gives the workflow owners the option to review the failures at a later time and unblock the executions after verifying the correctness.

First-only Run Strategy
With this run strategy, Maestro ensures that the running workflow is complete before queueing a new workflow instance. If a new workflow instance is queued while the current one is still running, Maestro will remove the queued instance. Maestro will execute a new workflow instance only if there is no workflow instance currently running, effectively turning off queuing with this run strategy. This approach helps to avoid idempotency issues by not queuing new workflow instances.

Last-only Run Strategy
With this run strategy, Maestro ensures the running workflow is the latest triggered one and keeps only the last instance. If a new workflow instance is queued while there is an existing workflow instance already running, Maestro will stop the running instance and execute the newly triggered one. This is useful if a workflow is designed to always process the latest data, such as processing the latest snapshot of an entire table each time.

Parallel with Concurrency Limit Run Strategy
With this run strategy, Maestro will run multiple triggered workflow instances in parallel, constrained by a predefined concurrency limit. This helps to fan out and distribute the execution, enabling the processing of large amounts of data within the time limit. A common use case for this strategy is for backfilling the old data.

Parameters and Expression Language Support

In Maestro, parameters play an important role. Maestro supports dynamic parameters with code injection, which is super useful and powerful. This feature significantly enhances the flexibility and dynamism of workflows, allowing using parameters to control execution logic and enable state sharing between workflows and their steps, as well as between upstream and downstream steps. Together with other Maestro features, it makes the defining of workflows dynamic and enables users to define parameterized workflows for complex use cases.

However, code injection introduces significant security and safety concerns. For example, users might unintentionally write an infinite loop that creates an array and appends items to it, eventually crashing the server with out-of-memory (OOM) issues. While one approach could be to ask users to embed the injected code within their business logic instead of the workflow definition, this would impose additional work on users and tightly couple their business logic with the workflow. In certain cases, this approach blocks users to design some complex parameterized workflows.

To mitigate these risks and assist users to build parameterized workflows, we developed our own customized expression language parser, a simple, secure, and safe expression language (SEL). SEL supports code injection while incorporating validations during syntax tree parsing to protect the system. It leverages the Java Security Manager to restrict access, ensuring a secure and controlled environment for code execution.

Simple, Secure, and Safe Expression Language (SEL)
SEL is a homemade simple, secure, and safe expression language (SEL) to address the risks associated with code injection within Maestro parameterized workflows. It is a simple expression language and the grammar and syntax follow JLS (Java Language Specifications). SEL supports a subset of JLS, focusing on Maestro use cases. For example, it supports data types for all Maestro parameter types, raising errors, datetime handling, and many predefined utility methods. SEL also includes additional runtime checks, such as loop iteration limits, array size checks, object memory size limits and so on, to enhance security and reliability. For more details about SEL, please refer to the Maestro GitHub documentation.

Output Parameters
To further enhance parameter support, Maestro allows for callable step execution, which returns output parameters from user execution back to the system. The output data is transmitted to Maestro via its REST API, ensuring that the step runtime does not have direct access to the Maestro database. This approach significantly reduces security concerns.

Parameterized Workflows
Thanks to the powerful parameter support, users can easily create parameterized workflows in addition to static ones. Users enjoy defining parameterized workflows because they are easy to manage and troubleshoot while being powerful enough to solve complex use cases.

  • Static workflows are simple and easy to use but come with limitations. Often, users have to duplicate the same workflow multiple times to accommodate minor changes. Additionally, workflow and jobs cannot share the states without using parameters.
  • On the other hand, completely dynamic workflows can be challenging to manage and support. They are difficult to debug or troubleshoot and hard to be reused by others.
  • Parameterized workflows strike a balance by being initialized step by step at runtime based on user defined parameters. This approach provides great flexibility for users to control the execution at runtime while remaining easy to manage and understand.

As we described in the previous Maestro blog post, parameter support enables the creation of complex parameterized workflows, such as backfill data pipelines.

Workflow Execution Patterns

Maestro provides multiple useful building blocks that allow users to easily define dataflow patterns or other workflow patterns. It provides support for common patterns directly within the Maestro engine. Direct engine support not only enables us to optimize these patterns but also ensures a consistent approach to implementing them. Next, we will talk about the three major building blocks that Maestro provides.

Foreach Support
In Maestro, the foreach pattern is modeled as a dedicated step within the original workflow definition. Each iteration of the foreach loop is internally treated as a separate workflow instance, which scales similarly as any other Maestro workflow based on the step executions (i.e. a sub-graph) defined within the foreach definition block. The execution of sub-graph within a foreach step is delegated to a separate workflow instance. Foreach step then monitors and collects the status of these foreach workflow instances, each managing the execution of a single iteration. For more details, please refer to our previous Maestro blog post.

The foreach pattern is frequently used to repeatedly run the same jobs with different parameters, such as data backfilling or machine learning model tuning. It would be tedious and time consuming to request users to explicitly define each iteration in the workflow definition (potentially hundreds of thousands of iterations). Additionally, users would need to create new workflows if the foreach range changes, further complicating the process.

Conditional Branch Support
The conditional branch feature allows subsequent steps to run only if specific conditions in the upstream step are met. These conditions are defined using the SEL expression language, which is evaluated at runtime. Combined with other building blocks, users can build powerful workflows, e.g. doing some remediation if the audit check step fails and then run the job again.

Subworkflow Support
The subworkflow feature allows a workflow step to run another workflow, enabling the sharing of common functions across multiple workflows. This effectively enables “workflow as a function” and allows users to build a graph of workflows. For example, we have observed complex workflows consisting of hundreds of subworkflows to process data across hundreds tables, where subworkflows are provided by multiple teams.

These patterns can be combined together to build composite patterns for complex workflow use cases. For instance, we can loop over a set of subworkflows or run nested foreach loops. One example that Maestro users developed is an auto-recovery workflow that utilizes both conditional branch and subworkflow features to handle errors and retry jobs automatically.

An example of auto-recovery ETL workflows

In this example, subworkflow `job1` runs another workflow consisting of extract-transform-load (ETL) and audit jobs. Next, a status check job leverages the Maestro parameter and SEL support to retrieve the status of the previous job. Based on this status, it can decide whether to complete the workflow or to run a recovery job to address any data issues. After resolving the issue, it then executes subworkflow `job2`, which runs the same workflow as subworkflow `job1`.

Step Runtime and Step Parameter

Step Runtime Interface
In Maestro, we use step runtime to describe a job at execution time. The step runtime interface defines two pieces of information:

  1. A set of basic APIs to control the behavior of a step instance at execution runtime.
  2. Some simple data structures to track step runtime state and execution result.

Maestro offers a few step runtime implementations such as foreach step runtime, subworkflow step runtime (mentioned in previous section). Each implementation defines its own logic for start, execute and terminate operations. At runtime, these operations control the way to initialize a step instance, perform the business logic and terminate the execution under certain conditions (i.e. manual intervention by users).

Also, Maestro step runtime internally keeps track of runtime state as well as the execution result of the step. The runtime state is used to determine the next state transition of the step and tell if it has failed or terminated. The execution result hosts both step artifacts and the timeline of step execution history, which are accessible by subsequent steps.

Step Parameter Merging
To control step behavior in a dynamic way, Maestro supports both runtime parameters and tags injection in step runtime. This makes a Maestro step more flexible to absorb runtime changes (i.e. overridden parameters) before actually being started. Maestro internally maintains a step parameter map that is initially empty and is updated by merging step parameters in the order below:

  • Default General Parameters: Parameters merging starts from default parameters that in general every step should have. For example, workflow_instance_id, step_instance_uuid, step_attempt_id and step_id are required parameters for each maestro step. They are internally reserved by maestro and cannot be passed by users.
  • Injected Parameters: Maestro then merges injected parameters (if present) into the parameter map. The injected parameters come from step runtime, which are dynamically generated based on step schema. Each type of step can have its own schema with specific parameters associated with this step. The step schema can evolve independently with no need to update Maestro code.
  • Default Typed Parameters: After injecting runtime parameters, Maestro tries to merge default parameters that are related to a specific type of step. For example, foreach step has loop_params and loop_index default parameters which are internally set by maestro and used for foreach step only.
  • Workflow and Step Info Parameters: These parameters contain information about step and the workflow it belongs to. This can be identity information, i.e. workflow_id and will be merged to step parameter map if present.
  • Undefined New Parameters: When starting or restarting a maestro workflow instance, users can specify new step parameters that are not present in initial step definition. ParamsManager merges these parameters to ensure they are available at execution time.
  • Step Definition Parameters: These step parameters are defined by users at definition time and get merged if they are not empty.
  • Run and Restart Parameters: When starting or restarting a maestro workflow instance, users can override defined parameters by providing run or restart parameters. These two types of parameters are merged at the end so that step runtime can see the most recent and accurate parameter space.

The parameters merging logic can be visualized in the diagram below.

Diagram of the parameters merging logic

Step Dependencies and Signals

Steps in the Maestro execution workflow graph can express execution dependencies using step dependencies. A step dependency specifies the data-related conditions required by a step to start execution. These conditions are usually defined based on signals, which are pieces of messages carrying information such as parameter values and can be published through step outputs or external systems like SNS or Kafka messages.

Signals in Maestro serve both signal trigger pattern and signal dependencies (a publisher-subscriber) pattern. One step can publish an output signal (a sample example) that can unblock the execution of multiple other steps that depend on it. A signal definition includes a list of mapped parameters, allowing Maestro to perform “signal matching” on a subset of fields. Additionally, Maestro supports signal operators like <, >, etc., on signal parameter values.

Netflix has built various abstractions on top of the concept of signals. For instance, a ETL workflow can update a table with data and send signals that unblock steps in downstream workflows dependent on that data. Maestro supports “signal lineage,” which allows users to navigate all historical instances of signals and the workflow steps that match (i.e. publishing or consuming) those signals. Signal triggering guarantees exactly-once execution for the workflow subscribing a signal or a set of joined signals. This approach is efficient, as it conserves resources by only executing the workflow or step when the specified conditions in the signals are met. A signal service is implemented for those advanced abstractions. Please refer to the Maestro blog for further details on it.

Breakpoint

Maestro allows users to set breakpoints on workflow steps, functioning similarly to code-level breakpoints in an IDE. When a workflow instance executes and reaches a step with a breakpoint, that step enters a “paused” state. This halts the workflow graph’s progression until a user manually resumes from the breakpoint. If multiple instances of a workflow step are paused at a breakpoint, resuming one instance will only affect that specific instance, leaving the others in a paused state. Deleting the breakpoint will cause all paused step instances to resume.

This feature is particularly useful during the initial development of a workflow, allowing users to inspect step executions and output data. It is also beneficial when running a step multiple times in a “foreach” pattern with various input parameters. Setting a single breakpoint on a step will cause all iterations of the foreach loop to pause at that step for debugging purposes. Additionally, the breakpoint feature allows human intervention during the workflow execution and can also be used for other purposes, e.g. supporting mutating step states while the workflow is running.

Timeline

Maestro includes a step execution timeline, capturing all significant events such as execution state machine changes and the reasoning behind them. This feature is useful for debugging, providing insights into the status of a step. For example, it logs transitions such as “Created” and “Evaluating params”, etc. An example of a timeline is included here for reference. The implemented step runtimes can add the timeline events into the timeline to surface the execution information to the end users.

Retry Policies

Maestro supports retry policies for steps that reach a terminal state due to failure. Users can specify the number of retries and configure retry policies, including delays between retries and exponential backoff strategies, in addition to fixed interval retries. Maestro distinguishes between two types of retries: “platform” and “user.” Platform retries address platform-level errors unrelated to user logic, while user retries are for user-defined conditions. Each type can have its own set of retry policies.

Automatic retries are beneficial for handling transient errors that can be resolved without user intervention. Maestro provides the flexibility to set retries to zero for non-idempotent steps to avoid retry. This feature ensures that users have control over how retries are managed based on their specific requirements.

Aggregated View

Because a workflow instance can have multiple runs, it is important for users to see an aggregated state of all steps in the workflow instance. Aggregated view is computed by merging base aggregated view with current runs instance step statuses. For example, as you can see on the figure below simulating a simple case, there is a first run, where step1 and step2 succeeded, step3 failed, and step4 and step5 have not started. When the user restarts the run, the run starts from step3 in run 2 with step1 and step2 skipped which succeeded in the previous run. After all steps succeed, the aggregated view shows the run states for all steps.

An example of aggregated views

Rollup

Rollup provides a high-level summary of a workflow instance, detailing the status of each step and the count of steps in each status. It flattens steps across the current instance and any nested non-inline workflows like subworkflows or foreach steps. For instance, if a successful workflow has three steps, one of which is a subworkflow corresponding to a five-step workflow, the rollup will indicate that seven steps succeeded. Only leaf steps are counted in the rollup, as other steps serve merely as pointers to concrete workflows.

Rollup also retains references to any non-successful steps, offering a clear overview of step statuses and facilitating easy navigation to problematic steps, even within nested workflows. The aggregated rollup for a workflow instance is calculated by combining the current run’s runtime data with a base rollup. The current state is derived from the statuses of active steps, including aggregated rollups for foreach and subworkflow steps. The base rollup is established when the workflow instance begins and includes statuses of inline steps (excluding foreach and subworkflows) from the previous run that are not part of the current run.

For subworkflow steps, the rollup simply reflects the rollup of the subworkflow instance. For foreach steps, the rollup combines the base rollup of the foreach step with the current state rollup. The base is derived from the previous run’s aggregated rollup, excluding the iterations to be restarted in the new run. The current state is periodically updated by aggregating rollups of running iterations until all iterations reach a terminal state.

Due to these processes, the rollup model is eventually consistent. While the figure below illustrates a straightforward example of rollup, the calculations can become complex and recursive, especially with multiple levels of nested foreaches and subworkflows.

An example of the rollup model

Maestro Event Publishing

When workflow definition, workflow instance or step instance is changed, Maestro generates an event, processes it internally and publishes the processed event to external system(s). Maestro has both internal and external events. The internal event tracks changes within the life cycle of workflow, workflow instance or step instance. It is published to an internal queue and processed within Maestro. After internal events are processed, some of them will be transformed into external event and sent out to the external queue (i.e. SNS, Kafka). The external event carries maestro status change information for downstream services. The event publishing flow is illustrated in the diagram below:

A diagram of the event publishing flow

As shown in the diagram, the Maestro event processor bridges the two aforementioned Maestro events. It listens on the internal queue to get the published internal events. Within the processor, the internal job event is processed based on its type and gets converted to an external event if needed. The notification publisher at the end emits the external event so that downstream services can consume.

The downstream services are mostly event-driven. The Maestro event carries the most useful message for downstream services to capture different changes in Maestro. In general, these changes can be classified into two categories: workflow change and instance status change. The workflow change event is associated with actions at workflow level, i.e definition or properties of a workflow has changed. Meanwhile, instance status change tracks status transition on workflow instance or step instance.

Get Started with Maestro

Maestro has been extensively used within Netflix, and today, we are excited to make the Maestro source code publicly available. We hope that the scalability and usability that Maestro offers can expedite workflow development outside Netflix. We invite you to try Maestro, use it within your organization, and contribute to its development.

You can find the Maestro code repository at github.com/Netflix/maestro. If you have any questions, thoughts, or comments about Maestro, please feel free to create a GitHub issue in the Maestro repository. We are eager to hear from you.

We are taking workflow orchestration to the next level and constantly solving new problems and challenges, please stay tuned for updates. If you are passionate about solving large scale orchestration problems, please join us.

Acknowledgements

Thanks to other Maestro team members, Binbing Hou, Zhuoran Dong, Brittany Truong, Deepak Ramalingam, Moctar Ba, for their contributions to the Maestro project. Thanks to our Product Manager Ashim Pokharel for driving the strategy and requirements. We’d also like to thank Andrew Seier, Romain Cledat, Olek Gorajek, and other stunning colleagues at Netflix for their contributions to the Maestro project. We also thank Prashanth Ramdas, Eva Tse, David Noor, Charles Smith and other leaders of Netflix engineering organizations for their constructive feedback and suggestions on the Maestro project.


Maestro: Netflix’s Workflow Orchestrator was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

A Recap of the Data Engineering Open Forum at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/a-recap-of-the-data-engineering-open-forum-at-netflix-6b4d4410b88f

A summary of sessions at the first Data Engineering Open Forum at Netflix on April 18th, 2024

The Data Engineering Open Forum at Netflix on April 18th, 2024.

At Netflix, we aspire to entertain the world, and our data engineering teams play a crucial role in this mission by enabling data-driven decision-making at scale. Netflix is not the only place where data engineers are solving challenging problems with creative solutions. On April 18th, 2024, we hosted the inaugural Data Engineering Open Forum at our Los Gatos office, bringing together data engineers from various industries to share, learn, and connect.

At the conference, our speakers share their unique perspectives on modern developments, immediate challenges, and future prospects of data engineering. We are excited to share the recordings of talks from the conference with the rest of the world.

Opening Remarks

Recording

Speaker: Max Schmeiser (Vice President of Studio and Content Data Science & Engineering)

Summary: Max Schmeiser extends a warm welcome to all attendees, marking the beginning of our inaugural Data Engineering Open Forum.

Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data Platform

Recording

Speakers:

Summary: At Netflix, hundreds of thousands of workflows and millions of jobs are running every day on our big data platform, but diagnosing and remediating job failures can impose considerable operational burdens. To handle errors efficiently, Netflix developed a rule-based classifier for error classification called “Pensive.” However, as the system has increased in scale and complexity, Pensive has been facing challenges due to its limited support for operational automation, especially for handling memory configuration errors and unclassified errors. To address these challenges, we have developed a new feature called “Auto Remediation,” which integrates the rules-based classifier with an ML service.

Automating the Data Architect: Generative AI for Enterprise Data Modeling

Recording

Speaker: Jide Ogunjobi (Founder & CTO at Context Data)

Summary: As organizations accumulate ever-larger stores of data across disparate systems, efficiently querying and gaining insights from enterprise data remain ongoing challenges. To address this, we propose developing an intelligent agent that can automatically discover, map, and query all data within an enterprise. This “Enterprise Data Model/Architect Agent” employs generative AI techniques for autonomous enterprise data modeling and architecture.

Tulika Bhatt, Senior Data Engineer at Netflix, shared how her team manages impression data at scale.

Real-Time Delivery of Impressions at Scale

Recording

Speaker: Tulika Bhatt (Senior Data Engineer at Netflix)

Summary: Netflix generates approximately 18 billion impressions daily. These impressions significantly influence a viewer’s browsing experience, as they are essential for powering video ranker algorithms and computing adaptive pages, With the evolution of user interfaces to be more responsive to in-session interactions, coupled with the growing demand for real-time adaptive recommendations, it has become highly imperative that these impressions are provided on a near real-time basis. This talk will delve into the creative solutions Netflix deploys to manage this high-volume, real-time data requirement while balancing scalability and cost.

Reflections on Building a Data Platform From the Ground Up in a Post-GDPR World

Recording

Speaker: Jessica Larson (Data Engineer & Author of “Snowflake Access Control”)

Summary: The requirements for creating a new data warehouse in the post-GDPR world are significantly different from those of the pre-GDPR world, such as the need to prioritize sensitive data protection and regulatory compliance over performance and cost. In this talk, Jessica Larson shares her takeaways from building a new data platform post-GDPR.

Unbundling the Data Warehouse: The Case for Independent Storage

Recording

Speaker: Jason Reid (Co-founder & Head of Product at Tabular)

Summary: Unbundling a data warehouse means splitting it into constituent and modular components that interact via open standard interfaces. In this talk, Jason Reid discusses the pros and cons of both data warehouse bundling and unbundling in terms of performance, governance, and flexibility, and he examines how the trend of data warehouse unbundling will impact the data engineering landscape in the next 5 years.

Clark Wright, Staff Analytics Engineer at Airbnb, talked about the concept of Data Quality Score at Airbnb.

Data Quality Score: How We Evolved the Data Quality Strategy at Airbnb

Recording

Speaker: Clark Wright (Staff Analytics Engineer at Airbnb)

Summary: Recently, Airbnb published a post to their Tech Blog called Data Quality Score: The next chapter of data quality at Airbnb. In this talk, Clark Wright shares the narrative of how data practitioners at Airbnb recognized the need for higher-quality data and then proposed, conceptualized, and launched Airbnb’s first Data Quality Score.

Data Productivity at Scale

Recording

Speaker: Iaroslav Zeigerman (Co-Founder and Chief Architect at Tobiko Data)

Summary: The development and evolution of data pipelines are hindered by outdated tooling compared to software development. Creating new development environments is cumbersome: Populating them with data is compute-intensive, and the deployment process is error-prone, leading to higher costs, slower iteration, and unreliable data. SQLMesh, an open-source project born from our collective experience at companies like Airbnb, Apple, Google, and Netflix, is designed to handle the complexities of evolving data pipelines at an internet scale. In this talk, Iaroslav Zeigerman discusses challenges faced by data practitioners today and how core SQLMesh concepts solve them.

Last but not least, thank you to the organizers of the Data Engineering Open Forum: Chris Colburn, Xinran Waibel, Jai Balani, Rashmi Shamprasad, and Patricia Ho.

Until next time!

If you are interested in attending a future Data Engineering Open Forum, we highly recommend you join our Google Group to stay tuned to event announcements.


A Recap of the Data Engineering Open Forum at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Our First Netflix Data Engineering Summit

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/our-first-netflix-data-engineering-summit-f326b0589102

Holden Karau Elizabeth Stone Pedro Duarte Chris Stephens Pallavi Phadnis Lee Woodridge Mark Cho Guil Pires Sujay Jain Tristan Reid Senthilnathan Athinarayanan Bharath Mummadisetty Abhinaya Shetty Judit Lantos Amanuel Kahsay Dao Mi Mick Dreeling Chris Colburn and Agata Gryzbek

Introduction

Earlier this summer Netflix held our first-ever Data Engineering Forum. Engineers from across the company came together to share best practices on everything from Data Processing Patterns to Building Reliable Data Pipelines. The result was a series of talks which we are now sharing with the rest of the Data Engineering community!

You can find each of the talks below with a short description of each, or you can go straight to the playlist on YouTube here.

The Talks

The Netflix Data Engineering Stack

Chris Stephens, Data Engineer, Content & Studio and Pedro Duarte, Software Engineer, Consolidated Logging walk engineers new to Netflix through the building blocks of the Netflix Data Engineering stack. Learn more about how batch and streaming data pipelines are built at Netflix.

Data Processing Patterns

Lee Woodridge and Pallavi Phadnis, Data Engineers at Netflix, talk about how you can apply different processing strategies for your batch pipelines by implementing generic abstractions to help scale, be more efficient, handle late-arriving data, and be more fault tolerant.

Streaming SQL on Data Mesh using Apache Flink

Mark Cho, Guil Pires and Sujay Jain, Engineers from the Netflix Data Platform talk about how a managed Streaming SQL using Apache Flink can help unlock new Stream Processing use cases at Netflix. You can read more about Data Mesh, Netflix’s next-generation stream processing platform, here

Building Reliable Data Pipelines

Holden Karau, OSS Engineer, Data Platform Engineering, talks about the importance of reliable data pipelines and how to build them covering tools from testing to validation and auditing. The talk uses Apache Spark as an example, but the concepts generalize regardless of your specific tools.

Knowledge Management — Leveraging Institutional Data

Tristan Reid, software engineer, shares experiences about the Knowledge Management project at Netflix, which seeks to leverage language modeling techniques and metadata from internal systems to improve the impact of the >100K memos that circulate within the company.

Psyberg, An Incremental ETL Framework Using Iceberg

Abhinaya Shetty and Bharath Mummadisetty, Data Engineers from Netflix’s Membership Data Engineering team, introduce Psyberg, an incremental ETL framework. Learn about how Psyberg leverages Iceberg metadata to handle late-arriving data, and improves data pipelines while simplifying on-call life!

Start/Stop/Continue for optimizing complex ETL jobs

Judit Lantos, Data Engineer, Member Experience Data Engineering, shares a case study to demonstrate an effective approach for optimizing complex ETL jobs.

Media Data for ML Studio Creative Production

In the last 2 decades, Netflix has revolutionized the way video content is consumed, however, there is significant work to be done in revolutionizing how movies and tv shows are made. In this video, Sr. Data Engineers Amanual Kahsay and Dao Mi showcase how data and insights are being utilized to accomplish such a vision.

We hope that our fellow members of the Data Engineering Community find these videos useful and engaging. Please follow our Netflix Data Twitter account for updates and notifications of future Data Engineering Summits!

Mick Dreeling, Chris Colburn


Our First Netflix Data Engineering Summit was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Sliding window rate limits in distributed systems

Post Syndicated from Grab Tech original https://engineering.grab.com/frequency-capping

Like many other companies, Grab uses marketing communications to notify users of promotions or other news. If a user receives these notifications from multiple companies, it would be a form of information overload and they might even start considering these communications as spam. Over time, this could lead to some users revoking their consent to receive marketing communications altogether. Hence, it is important to find a rate-limited solution that sends the right amount of communications to our users.

Background

In Grab, marketing emails and push notifications are part of carefully designed campaigns to ensure that users get the right notifications (i.e. based on past orders or usage patterns). Trident is Grab’s in-house tool to compose these campaigns so that they run efficiently at scale. An example of a campaign is scheduling a marketing email blast to 10 million users at 4 pm. Read more about Trident’s architecture here.

Trident relies on Hedwig, another in-house service, to deliver the messages to users. Hedwig does the heavy lifting of delivering large amounts of emails and push notifications to users while maintaining a high query per second (QPS) rate and minimal delay. The following high-level architectural illustration demonstrates the interaction between Trident and Hedwig.

Diagram of data interaction between Trident and Hedwig

The aim is to regulate the number of marketing comms sent to users daily and weekly, tailored based on their interaction patterns with the Grab superapp.

Solution

Based on their interaction patterns with our superapp, we have clustered users into a few segments.

For example:

New: Users recently signed up to the Grab app but haven’t taken any rides yet.
Active: Users who took rides in the past month.

With these metrics, we came up with optimal daily and weekly frequency limit values for each clustered user segment. The solution discussed in this article ensures that the comms sent to a user do not exceed the daily and weekly thresholds for the segment. This is also called frequency capping.

However, frequency capping can be split into two sub-problems:

Efficient storage of clustered user data

With a huge customer base of over 270 million users, storing the user segment membership information has to be cost-efficient and memory-sleek. Querying the segment to which a user belongs should also have minimal latency.

Persistent tracking of comms sent per user

To stay within the daily and weekly thresholds, we need to actively track the number of comms sent to each user, which can be referred to make rate limiting decisions. The rate limiting logic should also have minimal latency, be cost efficient, and not take up too much memory storage.

Optimising storage of user segment data

The problem here is figuring out which segment a particular user belongs to and ensuring that the user doesn’t appear in more than one segment. There are two options that suit our needs and we’ll explain more about each option, as well as what was the best option for us.

Bloom filter 

A Bloom filter is a space-efficient probabilistic data structure that addresses this problem well. Simply put, Bloom filters internally use arrays to track memberships of the elements.

For our scenario, each user segment would need its own bloom filter. We used this bloom filter calculator to estimate the memory required for each bloom filter. We found that we needed approximately 1 GB of memory and 23 hash functions to accurately represent the membership information of 270 million users in an array. Additionally, this method guarantees a false positive rate of  1.0E-7, which means 1 in 1 million elements may get wrong membership results because of hash collision.

With Grab’s existing segments, this approach needs 4GB of memory, which may increase as we increase the number of segments in the future. Moreover, the potential hash collision needs to be handled by increasing the memory size with even more hash functions. Another thing to note is that Bloom filters do not support deletion so every time a change needs to be done, you need to create a new version of the Bloom filter. Although Bloom filters have many advantages, these shortcomings led us to explore another approach.

Roaring bitmaps Roaring bitmaps are sets of unsigned integers consisting of containers of disjoint subsets, which can store large amounts of data in a compressed form. Essentially, roaring bitmaps could reduce memory storage significantly and overcome the hash collision problem. To understand the intuition behind this, first, we need to know how bitmaps work and the possible drawbacks behind it.

To represent a list of numbers as a bitmap, we first need to create an array with a size equivalent to the largest element in the list. For every element in the list, we then mark the bit value as 1 in the corresponding index in the array. While bitmaps work very well for storing integers in closer intervals, they occupy more space and become sparse when storing integer ranges with uneven distribution, as shown in the image below.

Diagram of bitmaps with uneven distribution

To reduce memory footprint and improve the performance of bitmaps, there are compression techniques such as Run-Length Encoding (RLE), and Word Aligned Hybrid (WAH). However, this would require additional effort to implement, whereas using roaring bitmaps would solve these issues.

Roaring bitmaps’ hybrid data storage approach offers the following advantages:

  • Faster set operations (union, intersection, differencing).
  • Better compression ratio when handling mixed datasets (both dense and sparse data distribution).
  • Ability to scale to large datasets without significant performance loss.

To summarise, roaring bitmaps can store positive integers from 0 to (2^32)-1. Each positive integer value is converted to a 32-bit binary, where the 16 Most Significant Bits (MSB) are used as the key and the remaining 16 Least Significant Bits (LSB) are represented as the value. The values are then stored in an array, a bitmap, or used to run containers with RLE encoding data structures.

If the number of integers mapped to the key is less than 4096, then all the integers are stored in an array in sorted order and converted into a bitmap container in the runtime as the size exceeds. Roaring bitmap analyses the distribution of set bits in the bitmap container i.e. if the continuous interval of set bits is more than a given threshold, the bitmap container can be more efficiently represented using the RLE container. Internally, the RLE container uses an array where the even indices store the beginning of the runs and the odd indices represent the length of the runs. This enables the roaring bitmap to dynamically switch between the containers to optimise storage and performance.

The following diagram shows how a set of elements with different distributions are stored in roaring bitmaps.

Diagram of how roaring bitmaps store elements with different distributions

In Grab, we developed a microservice that abstracts roaring bitmaps implementations and provides an API to check set membership and enumeration of elements in the sets. Check out this blog to learn more about it.

Distributed rate limiting

The second part of the problem involves rate limiting the number of communication messages sent to users on a daily or weekly basis and each segment has specific daily and weekly limits. By utilising roaring bitmaps, we can determine the segment to which a user belongs. After identifying the appropriate segment, we will apply the personalised limits to the user using a distributed rate limiter, which will be discussed in further detail in the following sections.

Choosing the right datastore

Based on our use case, Amazon ElasticCache for Redis and DynamoDB were two viable options for storing the sent communication messages count per user. However, we decided to choose Redis due to a number of factors:

  • Higher throughput at lower latency – Redis shards data across nodes in the cluster.
  • Cost-effective – Usage of Lua script reduces unnecessary data transfer overheads.
  • Better at handling spiky rate limiting workloads at scale.

Distributed rate limiter

To appropriately limit the comms our users receive, we needed a rate limiting algorithm, which could execute directly in the datastore cluster, then return the results in the application logic for further processing. The two rate limiting algorithms we considered were the sliding window rate limiter and sliding log rate limiter.

The sliding window rate limiter algorithm divides time into a fixed-size window (we defined this as 1 minute) and counts the number of requests within each window. On the other hand, the sliding log maintains a log of each request timestamp and counts the number of requests between two timestamp ranges, providing a more fine-grained method of rate limiting. Although sliding log consumes more memory to store the log of request timestamp, we opted for the sliding log approach as the accuracy of the rate limiting was more important than memory consumption.

The sliding log rate limiter utilises a Redis sorted set data structure to efficiently track and organise request logs. Each timestamp in milliseconds is stored as a unique member in the set. The score assigned to each member represents the corresponding timestamp, allowing for easy sorting in ascending order. This design choice optimises the speed of search operations when querying for the total request count within specific time ranges.

Sliding Log Rate limiter Algorithm:

Input:
  # user specific redis key where the request timestamp logs are stored as sorted set
  keys => user_redis_key

  # limit_value is the limit that needs to be applied for the user
  # start_time_in_millis is the starting point of the time window
  # end_time_in_millis is the ending point of the time window
  # current_time_in_millis is the current time the request is sent
  # eviction_time_in_millis, members in the set whose value is less than this will be evicted from the set

  args => limit_value, start_time_in_millis, end_time_in_millis, current_time_in_millis, eviction_time_in_millis

Output:
  # 0 means not_allowed and 1 means allowed
  response => 0 / 1

Logic:
  # zcount fetches the count of the request timestamp logs falling between the start and the end timestamp
  request_count = zcount user_redis_key start_time_in_millis end_time_in_millis

  response = 0
  # if the count of request logs is less than allowed limits then record the usage by adding current timestamp in sorted set

  if request_count < limit_value then
    zadd user_redis_key current_time_in_millis current_time_in_millis
    response = 1

  # zremrangebyscore removes the members in the sorted set whose score is less than eviction_time_in_millis

  zremrangebyscore user_redis_key -inf eviction_time_in_millis
  return response

This algorithm takes O(log n) time complexity, where n is the number of request logs stored in the sorted set. It is not possible to evict entries in the sorted set like how we have time-to-live (TTL) for Redis keys. To prevent the size of the sorted set from increasing over time, we have a fixed variable eviction_time_in_millis that is passed to the script. The zremrangebyscore command then deletes members from the sorted set whose score is less than eviction_time_in_millis in O(log n) time complexity.

Lua script optimisations

In Redis Cluster mode, all Redis keys accessed by a Lua script must be present on the same node, and they should be passed as part of the KEYS input array of the script. If the script attempts to access keys located on different nodes within the cluster, a CROSSSLOT error will be thrown. Redis keys, or userIDs, are distributed across multiple nodes in the cluster so it is not feasible to send a batch of userIDs within the same Lua script for rate limiting, as this might result in a CROSSSLOT error.

Invoking a separate Lua script call for each user is a possible approach, but it incurs a significant number of network calls, which can be optimised further with the following approach:

  1. Upload the Lua script into the Redis server during the server startup with the SCRIPT LOAD command and we get the SHA1 hash of the script if the upload is successful.
  2. The SHA1 hash can then be used to invoke the Lua script with the EVALSHA command passing the keys and arguments as script input.
  3. Redis pipelining takes in multiple EVALSHA commands that call the Lua script and each invocation corresponds to a userID for getting the rate limiting result.
  4. Redis pipelining groups the EVALSHA Redis commands with Redis keys located on the same nodes internally. It then sends the grouped commands in a single network call to the relevant nodes within the Redis cluster and provides the rate limiting outcome to the client.

Since Redis operates on a single thread, any long-running Lua script can cause other Redis commands to be blocked until the script completes execution. Thus, it’s optimal for the Lua script to execute in under 5 milliseconds. Additionally, the current time is passed as an argument to the script to account for potential variations in time when the script is executed on a node’s replica, which could be caused by clock drift.

By bringing together roaring bitmaps and the distributed rate limiter, this is what our final solution looks like:

Our final solution using roaring bitmaps and distributed rate limiter

The roaring bitmaps structure is serialised and stored in an AWS S3 bucket, which is then downloaded in the instance during server startup. After which, triggering a user segment membership check can simply be done with a local method call. The configuration service manages the mapping information between the segment and allowed rate limiting values.

Whenever a marketing message needs to be sent to a user, we first find the segment to which the user belongs, retrieve the defined rate limiting values from the configuration service, then execute the Lua script to get the rate limiting decision. If there is enough quota available for the user, we send the comms.

The architecture of the messaging service looks something like this:

Architecture of the messaging service

Impact

In addition to decreasing the unsubscription rate, there was a significant enhancement in the latency of sending communications. Eliminating redundant communications also alleviated the system load, resulting in a reduction of the delay between the scheduled time and the actual send time of comms.

Conclusion

Applying rate limiters to safeguard our services is not only a standard practice but also a necessary process. Many times, this can be achieved by configuring the rate limiters at the instance level. The need for rate limiters for business logic may not be as common, but when you need it, the solution must be lightning-fast, and capable of seamlessly operating within a distributed environment.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

An elegant platform

Post Syndicated from Grab Tech original https://engineering.grab.com/an-elegant-platform

Coban is Grab’s real-time data streaming platform team. As a platform team, we thrive on providing our internal users from all verticals with self-served data-streaming resources, such as Kafka topics, Flink and Change Data Capture (CDC) pipelines, various kinds of Kafka-Connect connectors, as well as Apache Zeppelin notebooks, so that they can effortlessly leverage real-time data to build intelligent applications and services.

In this article, we present our journey from pure Infrastructure-as-Code (IaC) towards a more sophisticated control plane that has revolutionised the way data streaming resources are self-served at Grab. This change also leads to improved scalability, stability, security, and user adoption of our data streaming platform.

Problem statement

In the early ages of public cloud, it was a common practice to create virtual resources by clicking through the web console of a cloud provider, which is sometimes referred to as ClickOps.

ClickOps has many downsides, such as:

  • Inability to review, track, and audit changes to the infrastructure.
  • Inability to massively scale the infrastructure operations.
  • Inconsistencies between environments, e.g. staging and production.
  • Inability to quickly recover from a disaster by re-creating the infrastructure at a different location.

That said, ClickOps has one tremendous advantage; it makes creating resources using a graphical User Interface (UI) fairly easy for anyone like Infrastructure Engineers, Software Engineers, Data Engineers etc. This also leads to a high iteration speed towards innovation in general.

IaC resolved many of the limitations of ClickOps, such as:

  • Changes are committed to a Version Control System (VCS) like Git: They can be reviewed by peers before being merged. The full history of all changes is available for investigating issues and for audit.
  • The infrastructure operations scale better: Code for similar pieces of infrastructure can be modularised. Changes can be rolled out automatically by Continuous Integration (CI) pipelines in the VCS system, when a change is merged to the main branch.
  • The same code can be used to deploy the staging and production environments consistently.
  • The infrastructure can be re-created anytime from its source code, in case of a disaster.

However, IaC unwittingly posed a new entry barrier too, requiring the learning of new tools like Ansible, Puppet, Chef, Terraform, etc.

Some organisations set up dedicated Site Reliability Engineer (SRE) teams to centrally manage, operate, and support those tools and the infrastructure as a whole, but that soon created the potential of new bottlenecks in the path to innovation.

On the other hand, others let engineering teams manage their own infrastructure, and Grab adopted that same approach. We use Terraform to manage infrastructure, and all teams are expected to have select engineers who have received Terraform training and have a clear understanding of it.

In this context, Coban’s platform initially started as a handful of Git repositories where users had to submit their Merge Requests (MR) of Terraform code to create their data streaming resources. Once reviewed by a Coban engineer, those Terraform changes would be applied by a CI pipeline running Atlantis.

While this was a meaningful first step towards self-service and platformisation of Coban’s offering within Grab, it had several significant downsides:

  • Stability: Due to the lack of control on the Terraform changes, the CI pipeline was prone to human errors and frequent failures. For example, users would initiate a new Terraform project by duplicating an existing one, but then would forget to change the location of the remote Terraform state, leading to the in-place replacement of an existing resource.
  • Scalability: The Coban team needed to review all MRs and provide ad hoc support whenever the pipeline failed.
  • Security: In the absence of Identity and Access Management (IAM), MRs could potentially contain changes pertaining to other teams’ resources, or even changes to Coban’s core infrastructure, with code review as the only guardrail.
  • Limited user growth: We could only acquire users who were well-versed in Terraform.

It soon became clear that we needed to build a layer of abstraction between our users and the Terraform code, to increase the level of control and lower the entry barrier to our platform, while still retaining all of the benefits of IaC under the hood.

Solution

We designed and built an in-house three-tier control plane made of:

  • Coban UI, a front-end web interface, providing our users with a seamless ClickOps experience.
  • Heimdall, the Go back-end of the web interface, transforming ClickOps into IaC.
  • Khone, the storage and provisioner layer, a Git repository storing Terraform code and metadata of all resources as well as the CI pipelines to plan and apply the changes.

In the next sections, we will deep dive in those three components.

Fig. 1 Simplified architecture of a request flowing from the user to the Coban infrastructure, via the three components of the control plane: the Coban UI, Heimdall, and Khone.

Although we designed the user journey to start from the Coban UI, our users can still opt to communicate with Heimdall and with Khone directly, e.g. for batch changes, or just because many engineers love Git and we want to encourage broad adoption. To make sure that data is eventually consistent across the three systems, we made Khone the only persistent storage layer. Heimdall regularly fetches data from Khone, caches it, and presents it to the Coban UI upon each query.

We also continued using Terraform for all resources, instead of mixing various declarative infrastructure approaches (e.g. Kubernetes Custom Resource Definition, Helm charts), for the sake of consistency of the logic in Khone’s CI pipelines.

Coban UI

The Coban UI is a React Single Page Application (React SPA) designed by our partner team Chroma, a dedicated team of front-end engineers who thrive on building legendary UIs and reusable components for platform teams at Grab.

It serves as a comprehensive self-service portal, enabling users to effortlessly create data streaming resources by filling out web forms with just a few clicks.

Fig. 2 Screen capture of a new Kafka topic creation in the Coban UI.

In addition to facilitating resource creation and configuration, the Coban UI is seamlessly integrated with multiple monitoring systems. This integration allows for real-time monitoring of critical metrics and health status for Coban infrastructure components, including Kafka clusters, Kafka topic bytes in/out rates, and more. Under the hood, all this information is exposed by Heimdall APIs.

Fig. 3 Screen capture of the metrics of a Kafka cluster in the Coban UI.

In terms of infrastructure, the Coban UI is hosted in AWS S3 website hosting. All dynamic content is generated by querying the APIs of the back-end: Heimdall.

Heimdall

Heimdall is the Go back-end of the Coban UI. It serves a collection of APIs for:

  • Managing the data streaming resources of the Coban platform with Create, Read, Update and Delete (CRUD) operations, treating the Coban UI as a first-class citizen.
  • Exposing the metadata of all Coban resources, so that they can be used by other platforms or searched in the Coban UI.

All operations are authenticated and authorised. Read more about Heimdall’s access control in Migrating from Role to Attribute-based Access Control.

In the next sections, we are going to dive deeper into these two features.

Managing the data streaming resources

First and foremost, Heimdall enables our users to self-manage their data streaming resources. It primarily relies on Khone as its storage and provisioner layer for actual resource management via Git CI pipelines. Therefore, we designed Heimdall’s resource management workflow to leverage the underlying Git flow.

Fig. 4 Diagram flow of a request in Heimdall.

Fig. 4 shows the diagram flow of a typical request in Heimdall to create, update, or delete a resource.

  1. An authenticated user initiates a request, either by navigating in the Coban UI or by calling the Heimdall API directly. At this stage, the request state is Initiated on Heimdall.
  2. Heimdall validates the request against multiple validation rules. For example, if an ongoing change request exists for the same resource, the request fails. If all tests succeed, the request state moves to Ongoing.
  3. Heimdall then creates an MR in Khone, which contains the Terraform files describing the desired state of the resource, as well as an in-house metadata file describing the key attributes of both resource and requester.
  4. After the MR has been created successfully, Heimdall notifies the requester via Slack and shares the MR URL.
  5. After that, Heimdall starts polling the status of the MR in a loop.
  6. For changes pertaining to production resources, an approver who is code owner in the repository of the resource has to approve the MR. Typically, the approver is an immediate teammate of the requester. Indeed, as a platform team, we empower our users to manage their own resources in a self-service fashion. Ultimately, the requester would merge the MR to trigger the CI pipeline applying the actual Terraform changes. Note that for staging resources, this entire step 6 is automatically performed by Heimdall.
  7. Depending on the MR status and the status of its CI pipeline in Khone, the final state of the request can be:
    • Failed if the CI pipeline has failed in Khone.
    • Completed if the CI pipeline has succeeded in Khone.
    • Cancelled if the MR was closed in Khone.

Heimdall exposes APIs to let users track the status of their requests. In the Coban UI, a page queries those APIs to elegantly display the requests.

Fig. 5 Screen capture of the Coban UI showing all requests.

Exposing the metadata

Apart from managing the data streaming resources, Heimdall also centralises and exposes the metadata pertaining to those resources so other Grab systems can fetch and use it. They can make various queries, for example, listing the producers and consumers of a given Kafka topic, or determining if a database (DB) is the data source for any CDC pipeline.

To make this happen, Heimdall not only retains the metadata of all of the resources that it creates, but also regularly ingests additional information from a variety of upstream systems and platforms, to enrich and make this metadata comprehensive.

Fig. 6 Diagram showing some of Heimdall’s upstreams (on the left) and downstreams (on the right) for metadata collection, enrichment, and serving. The arrows show the data flow. The network connection (client -> server) is actually the other way around.

On the left side of Fig. 6, we illustrate Heimdall’s ingestion mechanism with several examples (step 1):

  • The metadata of all Coban resources is ingested from Khone. This means the metadata of the resources that were created directly in Khone is also available in Heimdall.
  • The list of Kafka producers is retrieved from our monitoring platform, where most of them emit metrics.
  • The list of Kafka consumers is retrieved directly from the respective Kafka clusters, by listing the consumer groups and respective Client IDs of each partition.
  • The metadata of all DBs, that are used as a data source for CDC pipelines, is fetched from Grab’s internal DB management platform.
  • The Kafka stream schemas are retrieved from the Coban schema repository.
  • The Kafka stream configuration of each stream is retrieved from Grab Universal Configuration Management platform.

With all of this ingested data, Heimdall can provide comprehensive and accurate information about all data streaming resources to any other Grab platforms via a set of dedicated APIs.

The right side of Fig. 6 shows some examples (step 2) of Heimdall’s serving mechanism:

  • As a downstream of Heimdall, the Coban UI enables our direct users to conveniently browse their data streaming resources and access their attributes.
  • The entire resource inventory is ingested into the broader Grab inventory platform, based on backstage.io.
  • The Kafka streams are ingested into Grab’s internal data discovery platform, based on DataHub, where users can discover and trace the lineage of any piece of data.
  • The CDC connectors pertaining to DBs are ingested by Grab internal DB management platform, so that they are made visible in that platform when users are browsing their DBs.

Note that the downstream platforms that ingest data from Heimdall each expose a particular view of the Coban inventory that serves their purpose, but the Coban platform remains the only source of truth for any data streaming resource at Grab.

Lastly, Heimdall leverages an internal MySQL DB to support quick data query and exploration. The corresponding API is called by the Coban UI to let our users conveniently search globally among all resources’ attributes.

Fig. 7 Screen capture of the global search feature in the Coban UI.

Khone

Khone is the persistent storage layer of our platform, as well as the executor for actual resource creation, changes, and deletion. Under the hood, it is actually a GitLab repository of Terraform code in typical GitOps fashion, with CI pipelines to plan and apply the Terraform changes automatically. In addition, it also stores a metadata file for each resource.

Compared to letting the platform create the infrastructure directly and keep track of the desired state in its own way, relying on a standard IaC tool like Terraform for the actual changes to the infrastructure presents two major advantages:

  • The Terraform code can directly be used for disaster recovery. In case of a disaster, any entitled Cobaner with a local copy of the main branch of the Khone repository is able to recreate all our platform resources directly from their machine. There is no need to rebuild the entire platform’s control plane, thus reducing our Recovery Time Objective (RTO).
  • Minimal effort required to follow the API changes of our infrastructure ecosystem (AWS, Kubernetes, Kafka, etc.). When such a change happens, all we need to do is to update the corresponding Terraform provider.

If you’d like to read more about Khone, check out Securing GitOps pipelines. In this section, we will only focus on Khone’s features that are relevant from the platform perspective.

Lightweight Terraform

In Khone, each resource is stored as a Terraform definition. There are two major differences from a normal Terraform project:

  • No Terraform environment, such as the required Terraform providers and the location of the remote Terraform state file. They are automatically generated by the CI pipeline via a simple wrapper.
  • Only vetted Khone Terraform modules can be used. This is controlled and enforced by the CI pipeline via code inspection. There is one such Terraform module for each kind of supported resource of our platform (e.g. Kafka topic, Flink pipeline, Kafka Connect mirror source connector etc.). Furthermore, those in-house Terraform modules are designed to automatically derive their key variables (e.g. resource name, cluster name, environment) from the relative path of the parent Terraform project in the Khone repository.

Those characteristics are designed to limit the risk and blast radius of human errors. They also make sure that all resources created in Khone are supported by our platform, so that they can also be discovered and managed in Heimdall and the Coban UI. Lastly, by generating the Terraform environment on the fly, we can destroy resources simply by deleting the directory of the project in the code base – this would not be possible otherwise.

Resource metadata

All resource metadata is stored in a YAML file that is present in the Terraform directory of each resource in the Khone repository. This is mainly used for ownership and cost attribution.

With this metadata, we can:

  • Better communicate with our users whenever their resources are impacted by an incident or an upcoming maintenance operation.
  • Help teams understand the costs of their usage of our platform, a significant step towards cost efficiency.

There are two different ways resource metadata can be created:

  • Automatically through Heimdall: The YAML metadata file is automatically generated by Heimdall.
  • Through Khone by a human user: The user needs to prepare the YAML metadata file and include it in the MR. This file is then verified by the CI pipeline.

Outcome

The initial version of the three-tier Coban platform, as described in this article, was internally released in March 2022, supporting only Kafka topic management at the time. Since then, we have added support for Flink pipelines, four kinds of Kafka Connect connectors, CDC pipelines, and more recently, Apache Zeppelin notebooks. At the time of writing, the Coban platform manages about 5000 data streaming resources, all described as IaC under the hood.

Our platform also exposes enriched metadata that includes the full data lineage from Kafka producers to Kafka consumers, as well as ownership information, and cost attribution.

With that, our monthly active users have almost quadrupled, truly moving the needle towards democratising the usage of real-time data within all Grab verticals.

In spite of that user growth, the end-to-end workflow success rate for self-served resource creation, change or deletion, remained well above 90% in the first half of 2023, while the Heimdall API uptime was above 99.95%.

Challenges faced

A common challenge for platform teams resides in the misalignment between the Service Level Objective (SLO) of the platform, and the various environments (e.g. staging, production) of the managed resources and upstream/downstream systems and platforms.

Indeed, the platform aims to guarantee the same level of service, regardless of whether it is used to create resources in the staging or the production environment. From the platform team’s perspective, the platform as a whole is considered production-grade, as soon as it serves actual users.

A naive approach to address this challenge is to let the production version of the platform manage all resources regardless of their respective environments. However, doing so does not permit a hermetic segregation of the staging and production environments across the organisation, which is a good security practice, and often a requirement for compliance. For example, the production version of the platform would have to connect to upstream systems in the staging environment, e.g. staging Kafka clusters to collect their consumer groups, in the case of Heimdall. Conversely, the staging version of certain downstreams would have to connect to the production version of Heimdall, to fetch the metadata of relevant staging resources.

The alternative approach, generally adopted across Grab, is to instantiate all platforms in each environment (staging and production), while still considering both instances as production-grade and guaranteeing tight SLOs in both environments.

Fig. 8 Architecture of the Coban platform, broken down by environment.

In Fig. 8, both instances of Heimdall have equivalent SLOs. The caveat is that all upstream systems and platforms must also guarantee a strict SLO in both environments. This obviously comes with a cost, for example, tighter maintenance windows for the operations pertaining to the Kafka clusters in the staging environment.

A strong “platform” culture is required for platform teams to fully understand that their instance residing in the staging environment is not their own staging environment and should not be used for testing new features.

What’s next?

Currently, users creating, updating, or deleting production resources in the Coban UI (or directly by calling Heimdall API) receive the URL of the generated GitLab MR in a Slack message. From there, they must get the MR approved by a code owner, typically another team member, and finally merge the MR, for the requested change to be actually implemented by the CI pipeline.

Although this was a fairly easy way to implement a maker/checker process that was immediately compliant with our regulatory requirements for any changes in production, the user experience is not optimal. In the near future, we plan to bring the approval mechanism into Heimdall and the Coban UI, while still providing our more advanced users with the option to directly create, approve, and merge MRs in GitLab. In the longer run, we would also like to enhance the Coban UI with the output of the Khone CI jobs that include the Terraform plan and apply results.

There is another aspect of the platform that we want to improve. As Heimdall regularly polls the upstream platforms to collect their metadata, this introduces a latency between a change in one of those platforms and its reflection in the Coban platform, which can hinder the user experience. To refresh resource metadata in Heimdall in near real time, we plan to leverage an existing Grab-wide event stream, where most of the configuration and code changes at Grab are produced as events. Heimdall will soon be able to consume those events and update the metadata of the affected resources immediately, without waiting for the next periodic refresh.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Incremental Processing using Netflix Maestro and Apache Iceberg

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/incremental-processing-using-netflix-maestro-and-apache-iceberg-b8ba072ddeeb

by Jun He, Yingyi Zhang, and Pawan Dixit

Incremental processing is an approach to process new or changed data in workflows. The key advantage is that it only incrementally processes data that are newly added or updated to a dataset, instead of re-processing the complete dataset. This not only reduces the cost of compute resources but also reduces the execution time in a significant manner. When workflow execution has a shorter duration, chances of failure and manual intervention reduce. It also improves the engineering productivity by simplifying the existing pipelines and unlocking the new patterns.

In this blog post, we talk about the landscape and the challenges in workflows at Netflix. We will show how we are building a clean and efficient incremental processing solution (IPS) by using Netflix Maestro and Apache Iceberg. IPS provides the incremental processing support with data accuracy, data freshness, and backfill for users and addresses many of the challenges in workflows. IPS enables users to continue to use the data processing patterns with minimal changes.

Introduction

Netflix relies on data to power its business in all phases. Whether in analyzing A/B tests, optimizing studio production, training algorithms, investing in content acquisition, detecting security breaches, or optimizing payments, well structured and accurate data is foundational. As our business scales globally, the demand for data is growing and the needs for scalable low latency incremental processing begin to emerge. There are three common issues that the dataset owners usually face.

  • Data Freshness: Large datasets from Iceberg tables needed to be processed quickly and accurately to generate insights to enable faster product decisions. The hourly processing semantics along with valid–through-timestamp watermark or data signals provided by the Data Platform toolset today satisfies many use cases, but is not the best for low-latency batch processing. Before IPS, the Data Platform did not have a solution for tracking the state and progression of data sets as a single easy to use offering. This has led to a few internal solutions such as Psyberg. These internal libraries process data by capturing the changed partitions, which works only on specific use cases. Additionally, the libraries have tight coupling to the user business logic, which often incurs higher migration costs, maintenance costs, and requires heavy coordination with the Data Platform team.
  • Data Accuracy: Late arriving data causes datasets processed in the past to become incomplete and as a result inaccurate. To compensate for that, ETL workflows often use a lookback window, based on which they reprocess the data in that certain time window. For example, a job would reprocess aggregates for the past 3 days because it assumes that there would be late arriving data, but data prior to 3 days isn’t worth the cost of reprocessing.
  • Backfill: Backfilling datasets is a common operation in big data processing. This requires repopulating data for a historical time period which is before the scheduled processing. The need for backfilling could be due to a variety of factors, e.g. (1) upstream data sets got repopulated due to changes in business logic of its data pipeline, (2) business logic was changed in a data pipeline, (3) anew metric was created that needs to be populated for historical time ranges, (4) historical data was found missing, etc.

These challenges are currently addressed in suboptimal and less cost efficient ways by individual local teams to fulfill the needs, such as

  • Lookback: This is a generic and simple approach that data engineers use to solve the data accuracy problem. Users configure the workflow to read the data in a window (e.g. past 3 hours or 10 days). The window is set based on users’ domain knowledge so that users have a high confidence that the late arriving data will be included or will not matter (i.e. data arrives too late to be useful). It ensures the correctness with a high cost in terms of time and compute resources.
  • Foreach pattern: Users build backfill workflows using Maestro foreach support. It works well to backfill data produced by a single workflow. If the pipeline has multiple stages or many downstream workflows, users have to manually create backfill workflows for each of them and that requires significant manual work.

The incremental processing solution (IPS) described here has been designed to address the above problems. The design goal is to provide a clean and easy to adopt solution for the Incremental processing to ensure data freshness, data accuracy, and to provide easy backfill support.

  • Data Freshness: provide the support for scheduling workflows in a micro batch fashion (e.g. 15 min interval) with state tracking functionality
  • Data Accuracy: provide the support to process all late arriving data to achieve data accuracy needed by the business with significantly improved performance in terms of multifold time and cost efficiency
  • Backfill: provide managed backfill support to build, monitor, and validate the backfill, including automatically propagating changes from upstream to downstream workflows, to greatly improve engineering productivity (i.e. a few days or weeks of engineering work to build backfill workflows vs one click for managed backfill)

Approach Overview

General Concept

Incremental processing is an approach to process data in batch — but only on new or changed data. To support incremental processing, we need an approach for not only capturing incremental data changes but also tracking their states (i.e. whether a change is processed by a workflow or not). It must be aware of the change and can capture the changes from the source table(s) and then keep tracking those changes. Here, changes mean more than just new data itself. For example, a row in an aggregation target table needs all the rows from the source table associated with the aggregation row. Also, if there are multiple source tables, usually the union of the changed data ranges from all input tables gives the full change data set. Thus, change information captured must include all related data including those unchanged rows in the source table as well. Due to previously mentioned complexities, change tracking cannot be simply achieved by using a single watermark. IPS has to track those captured changes in finer granularity.

The changes from the source tables might affect the transformed result in the target table in various ways.

  • If one row in the target table is derived from one row in the source table, newly captured data change will be the complete input dataset for the workflow pipeline.
  • If one row in the target table is derived from multiple rows in the source table, capturing new data will only tell us the rows have to be re-processed. But the dataset needed for ETL is beyond the change data itself. For example, an aggregation based on account id requires all rows from the source table about an account id. The change dataset will tell us which account ids are changed and then the user business logic needs to load all data associated with those account ids found in the change data.
  • If one row in the target table is derived based on the data beyond the changed data set, e.g. joining source table with other tables, newly captured data is still useful and can indicate a range of data to be affected. Then the workflow will re-process the data based on the range. For example, assuming we have a table that keeps the accumulated view time for a given account partitioned by the day. If the view time 3-days ago is updated right now due to late arriving data, then the view time for the following two days has to be re-calculated for this account. In this case, the captured late arriving data will tell us the start of the re-calculation, which is much more accurate than recomputing everything for the past X days by guesstimate, where X is a cutoff lookback window decided by business domain knowledge.

Once the change information (data or range) is captured, a workflow has to write the data to the target table in a slightly more complicated way because the simple INSERT OVERWRITE mechanism won’t work well. There are two alternatives:

  • Merge pattern: In some compute frameworks, e.g. Spark 3, it supports MERGE INTO to allow new data to be merged into the existing data set. That solves the write problem for incremental processing. Note that the workflow/step can be safely restarted without worrying about duplicate data being inserted when using MERGE INTO.
  • Append pattern: Users can also use append only write (e.g. INSERT INTO) to add the new data to the existing data set. Once the processing is completed, the append data is committed to the table. If users want to re-run or re-build the data set, they will run a backfill workflow to completely overwrite the target data set (e.g. INSERT OVERWRITE).

Additionally, the IPS will naturally support the backfill in many cases. Downstream workflows (if there is no business logic change) will be triggered by the data change due to backfill. This enables auto propagation of backfill data in multi-stage pipelines. Note that the backfill support is skipped in this blog. We will talk about IPS backfill support in another following blog post.

Netflix Maestro

Maestro is the Netflix data workflow orchestration platform built to meet the current and future needs of Netflix. It is a general-purpose workflow orchestrator that provides a fully managed workflow-as-a-service (WAAS) to the data platform users at Netflix. It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, in various use cases. Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users.

Since the last blog on Maestro, we have migrated all the workflows to it on behalf of users with minimal interruption. Maestro has been fully deployed in production with 100% workload running on it.

IPS is built upon Maestro as an extension by adding two building blocks, i.e. a new trigger mechanism and step job type, to enable incremental processing for all workflows. It is seamlessly integrated into the whole Maestro ecosystem with minimal onboarding cost.

Apache Iceberg

Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time. It supports expressive SQL, full schema evolution, hidden partitioning, data compaction, and time travel & rollback. In the IPS, we leverage the rich features provided by Apache Iceberg to develop a lightweight approach to capture the table changes.

Incremental Change Capture Design

Using Netflix Maestro and Apache Iceberg, we created a novel solution for incremental processing, which provides the incremental change (data and range) capture in a super lightweight way without copying any data. During our exploration, we see a huge opportunity to improve cost efficiency and engineering productivity using incremental processing.

Here is our solution to achieve incremental change capture built upon Apache Iceberg features. As we know, an iceberg table contains a list of snapshots with a set of metadata data. Snapshots include references to the actual immutable data files. A snapshot can contain data files from different partitions.

Design to achieve incremental change capture built upon Apache Iceberg features

The graph above shows that s0 contains data for Partition P0 and P1 at T1. Then at T2, a new snapshot s1 is committed to the table with a list of new data files, which includes late arriving data for partition P0 and P1 and data for P2.

We implemented a lightweight approach to create an iceberg table (called ICDC table), which has its own snapshot but only includes the new data file references from the original table without copying the data files. It is highly efficient with a low cost. Then workflow pipelines can just load the ICDC table to process only the change data from partition P0, P1, P2 without reprocessing the unchanged data in P0 and P1. Meanwhile, the change range is also captured for the specified data field as the Iceberg table metadata contains the upper and lower bound information of each data field for each data file. Moreover, IPS will track the changes in data file granularity for each workflow.

This lightweight approach is seamlessly integrated with Maestro to allow all (thousands) scheduler users to use this new building block (i.e. incremental processing) in their tens of thousands of workflows. Each workflow using IPS will be injected with a table parameter, which is the table name of the lightweight ICDC table. The ICDC table contains only the change data. Additionally, if the workflow needs the change range, a list of parameters will be injected to the user workflow to include the change range information. The incremental processing can be enabled by a new step job type (ICDC) and/or a new incremental trigger mechanism. Users can use them together with all existing Maestro features, e.g. foreach patterns, step dependencies based on valid–through-timestamp watermark, write-audit-publish templatized pattern, etc.

Main Advantages

With this design, user workflows can adopt incremental processing with very low efforts. The user business logic is also decoupled from the IPS implementation. Multi-stage pipelines can also mix the incremental processing workflows with existing normal workflows. We also found that user workflows can be simplified after using IPS by removing additional steps to handle the complexity of the lookback window or calling some internal libraries.

Adding incremental processing features into Netflix Maestro as new features/building blocks for users will enable users to build their workflows in a much more efficient way and bridge the gaps to solve many challenging problems (e.g. dealing with late arriving data) in a much simpler way.

Emerging Incremental Processing Patterns

While onboarding user pipelines to IPS, we have discovered a few incremental processing patterns:

Incrementally process the captured incremental change data and directly append them to the target table

Incrementally process the captured incremental change data and directly append them to the target table

This is the straightforward incremental processing use case, where the change data carries all the information needed for the data processing. Upstream changes (usually from a single source table) are propagated to the downstream (usually another target table) and the workflow pipeline only needs to process the change data (might join with other dimension tables) and then merge into (usually append) to the target table. This pattern will replace lookback window patterns to take care of late arriving data. Instead of overwriting past X days of data completely by using a lookback window pattern, user workflows just need to MERGE the change data (including late arriving data) into the target table by processing the ICDC table.

Use captured incremental change data as the row level filter list to remove unnecessary transformation

Use captured incremental change data as the row level filter list to remove unnecessary transformation

ETL jobs usually need to aggregate data based on certain group-by keys. Change data will disclose all the group-by keys that require a re-aggregation due to the new landing data from the source table(s). Then ETL jobs can join the original source table with the ICDC table on those group-by keys by using ICDC as a filter to speed up the processing to enable calculations of a much smaller set of data. There is no change to business transform logic and no re-design of ETL workflow. ETL pipelines keep all the benefits of batch workflows.

Use the captured range parameters in the business logic

Use the captured range parameters in the business logic

This pattern is usually used in complicated use cases, such as joining multiple tables and doing complex processings. In this case, the change data do not give the full picture of the input needed by the ETL workflow. Instead, the change data indicates a range of changed data sets for a specific set of fields (might be partition keys) in a given input table or usually multiple input tables. Then, the union of the change ranges from all input tables gives the full change data set needed by the workflow. Additionally, the whole range of data usually has to be overwritten because the transformation is not stateless and depends on the outcome result from the previous ranges. Another example is that the aggregated record in the target table or window function in the query has to be updated based on the whole data set in the partition (e.g. calculating a medium across the whole partition). Basically, the range derived from the change data indicates the dataset to be re-processed.

Use cases

Data workflows at Netflix usually have to deal with late arriving data which is commonly solved by using lookback window pattern due to its simplicity and ease of implementation. In the lookback pattern, the ETL pipeline will always consume the past X number of partition data from the source table and then overwrite the target table in every run. Here, X is a number decided by the pipeline owners based on their domain expertise. The drawback is the cost of computation and execution time. It usually costs almost X times more than the pipeline without considering late arriving data. Given the fact that the late arriving data is sparse, the majority of the processing is done on the data that have been already processed, which is unnecessary. Also, note that this approach is based on domain knowledge and sometimes is subject to changes of the business environment or the domain expertise of data engineers. In certain cases, it is challenging to come up with a good constant number.

Below, we will use a two-stage data pipeline to illustrate how to rebuild it using IPS to improve the cost efficiency. We will observe a significant cost reduction (> 80%) with little changes in the business logic. In this use case, we will set the lookback window size X to be 14 days, which varies in different real pipelines.

Original Data Pipeline with Lookback Window

Original data pipeline with lookback window
  • playback_table: an iceberg table holding playback events from user devices ingested by streaming pipelines with late arriving data, which is sparse, only about few percents of the data is late arriving.
  • playback_daily_workflow: a daily scheduled workflow to process the past X days playback_table data and write the transformed data to the target table for the past X days
  • playback_daily_table: the target table of the playback_daily_workflow and get overwritten every day for the past X days
  • playback_daily_agg_workflow: a daily scheduled workflow to process the past X days’ playback_daily_table data and write the aggregated data to the target table for the past X days
  • playback_daily_agg_table: the target table of the playback_daily_agg_workflow and get overwritten every day for the past 14 days.

We ran this pipeline in a sample dataset using the real business logic and here is the average execution result of sample runs

  • The first stage workflow takes about 7 hours to process playback_table data
  • The second stage workflow takes about 3.5 hours to process playback_daily_table data

New Data Pipeline with Incremental Processing

Using IPS, we rewrite the pipeline to avoid re-processing data as much as possible. The new pipeline is shown below.

New data pipeline with incremental processing

Stage 1:

  • ips_playback_daily_workflow: it is the updated version of playback_daily_workflow.
  • The workflow spark sql job then reads an incremental change data capture (ICDC) iceberg table (i.e. playback_icdc_table), which only includes the new data added into the playback_table. It includes the late arriving data but does not include any unchanged data from playback_table.
  • The business logic will replace INSERT OVERWRITE by MERGE INTO SQL query and then the new data will be merged into the playback_daily_table.

Stage 2:

  • IPS captures the changed data of playback_daily_table and also keeps the change data in an ICDC source table (playback_daily_icdc_table). So we don’t need to hard code the lookback window in the business logic. If there are only Y days having changed data in playback_daily_table, then it only needs to load data for Y days.
  • In ips_playback_daily_agg_workflow, the business logic will be the same for the current day’s partition. We then need to update business logic to take care of late arriving data by
  • JOIN the playback_daily table with playback_daily_icdc_table on the aggregation group-by keys for the past 2 to X days, excluding the current day (i.e. day 1)
  • Because late arriving data is sparse, JOIN will narrow down the playback_daily_table data set so as to only process a very small portion of it.
  • The business logic will use MERGE INTO SQL query then the change will be propagated to the downstream target table
  • For the current day, the business logic will be the same and consume the data from playback_daily_table and then write the outcome to the target table playback_daily_agg_table using INSERT OVERWRITE because there is no need to join with the ICDC table.

With these small changes, the data pipeline efficiency is greatly improved. In our sample run,

  • The first stage workflow takes just about 30 minutes to process X day change data from playback_table.
  • The second stage workflow takes about 15 minutes to process change data between day 2 to day X from playback_daily_table by joining with playback_daily_cdc_table data and takes another 15 minutes to process the current day (i.e. day 1) playback_daily_table change data.

Here the spark job settings are the same in original and new pipelines. So in total, the new IPS based pipeline overall needs around 10% of resources (measured by the execution time) to finish.

Looking Forward

We will improve IPS to support more complicated cases beyond append-only cases. IPS will be able to keep track of the progress of the table changes and support multiple Iceberg table change types (e.g. append, overwrite, etc.). We will also add managed backfill support into IPS to help users to build, monitor, and validate the backfill.

We are taking Big Data Orchestration to the next level and constantly solving new problems and challenges, please stay tuned. If you are motivated to solve large scale orchestration problems, please join us.

Acknowledgements

Thanks to our Product Manager Ashim Pokharel for driving the strategy and requirements. We’d also like to thank Andy Chu, Kyoko Shimada, Abhinaya Shetty, Bharath Mummadisetty, John Zhuge, Rakesh Veeramacheneni, and other stunning colleagues at Netflix for their suggestions and feedback while developing IPS. We’d also like to thank Prashanth Ramdas, Eva Tse, Charles Smith, and other leaders of Netflix engineering organizations for their constructive feedback and suggestions on the IPS architecture and design.


Incremental Processing using Netflix Maestro and Apache Iceberg was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Road localisation in GrabMaps

Post Syndicated from Grab Tech original https://engineering.grab.com/road-localisation-grabmaps

Introduction

In 2022, Grab achieved self-sufficiency in its Geo services. As part of this transition, one crucial step was moving towards using an internally-developed map tailored specifically to the market in which Grab operates. Now that we have full control over the map layer, we can add more data to it or improve it according to the needs of the services running on top. One key aspect that this transition unlocked for us was the possibility of creating hyperlocal data at map level.

For instance, by determining the country to which a road belongs, we can now automatically infer the official language of that country and display the street name in that language. In another example, knowing the country for a specific road, we can automatically infer the driving side (left-handed or right-handed) leading to an improved navigation experience. Furthermore, this capability also enables us to efficiently handle various scenarios. For example, if we know that a road is part of a gated community, an area where our driver partners face restricted access, we can prevent the transit through that area.

These are just some examples of the possibilities from having full control over the map layer. By having an internal map, we can align our maps with specific markets and provide better experiences for our driver-partners and customers.

Background

For all these to be possible, we first needed to localise the roads inside the map. Our goal was to include hyperlocal data into the map, which refers to data that is specific to a certain area, such as a country, city, or even a smaller part of the city like a gated community. At the same time, we aimed to deliver our map with a high cadence, thus, we needed to find the right way to process this large amount of data while continuing to create maps in a cost-effective manner.

Solution

In the following sections of this article, we will use an extract from the Southeast Asia map to provide visual representations of the concepts discussed.

In Figure 1, Image 1 shows a visualisation of the road network, the roads belonging to this area. The coloured lines in Image 2 represent the borders identifying the countries in the same area. Overlapping the information from Image 1 and Image 2, we can extrapolate and say that the entire surface included in a certain border could have the same set of common properties as shown in Image 3. In Image 4, we then proceed with adding localised roads for each area.

Figure 1 – Map of Southeast Asia

For this to be possible, we have to find a way to localise each road and identify its associated country. Once this localisation process is complete, we can replicate all this information specific to a given border onto each individual road. This information includes details such as the country name, driving side, and official language. We can go even further and infer more information, and add hyperlocal data. For example, in Vietnam, we can automatically prevent motorcycle access on the motorways.

Assigning each road on the map to a specific area, such as a country, service area, or subdivision, presents a complex task. So, how can we efficiently accomplish this?

Implementation

The most straightforward approach would be to test the inclusion of each road into each area boundary, but that is easier said than done. With close to 30 million road segments in the Southeast Asia map and over 10 thousand areas, the computational cost of determining inclusion or intersection between a polyline and a polygon is expensive.

Our solution to this challenge involves replacing the expensive yet precise operation with a decent approximation. We introduce a proxy entity, the geohash, and we use it to approximate the areas and also to localise the roads.

We replace the geometrical inclusion with a series of simpler and less expensive operations. First, we conduct an inexpensive precomputation where we identify all the geohases that belong to a certain area or within a defined border. We then identify the geohashes to which the roads belong to. Finally, we use these precomputed values to assign roads to their respective areas. This process is also computationally inexpensive.

Given the large area we process, we leverage big data techniques to distribute the execution across multiple nodes and thus speed up the operation. We want to deliver the map daily and this is one of the many operations that are part of the map-making process.

What is a geohash?

To further understand our implementation we will first explain the geohash concept. A geohash is a unique identifier of a specific region on the Earth. The basic idea is that the Earth is divided into regions of user-defined size and each region is assigned a unique id, which is known as its geohash. For a given location on earth, the geohash algorithm converts its latitude and longitude into a string.

Geohashes uses a Base-32 alphabet encoding system comprising characters ranging from 0 to 9 and A to Z, excluding “A”, “I”, “L” and “O”. Imagine dividing the world into a grid with 32 cells. The first character in a geohash identifies the initial location of one of these 32 cells. Each of these cells are then further subdivided into 32 smaller cells.This subdivision process continues and refines to specific areas in the world. Adding characters to the geohash sub-divides a cell, effectively zooming in to a more detailed area.

The precision factor of the geohash determines the size of the cell. For instance, a precision factor of one creates a cell 5,000 km high and 5,000 km wide. A precision factor of six creates a cell 0.61km high and 1.22 km wide. Furthermore, a precision factor of nine creates a cell 4.77 m high and 4.77 m wide. It is important to note that cells are not always square and can have varying dimensions.

In Figure 2, we have exemplified a geohash 6 grid and its code is wsdt33.

Figure 2 – An example of geohash code wsdt33

Using less expensive operations

Calculating the inclusion of the roads inside a certain border is an expensive operation. However, quantifying the exact expense is challenging as it depends on several factors. One factor is the complexity of the border. Borders are usually irregular and very detailed, as they need to correctly reflect the actual border. The complexity of the road geometry is another factor that plays an important role as roads are not always straight lines.

Figure 3 – Roads to localise

Since this operation is expensive both in terms of cloud cost and time to run, we need to identify a cheaper and faster way that would yield similar results. Knowing that the complexity of the border lines is the cause of the problem, we tried using a different alternative, a rectangle. Calculating the inclusion of a polyline inside a rectangle is a cheaper operation.

Figure 4 – Roads inside a rectangle

So we transformed this large, one step operation, where we test each road segment for inclusion in a border, into a series of smaller operations where we perform the following steps:

  1. Identify all the geohashes that are part of a certain area or belong to a certain border. In this process we include additional areas to make sure that we cover the entire surface inside the border.
  2. For each road segment, we identify the list of geohashes that it belongs to. A road, depending on its length or depending on its shape, might belong to multiple geohashes.

In Figure 5, we identify that the road belongs to two geohashes and that the two geohashes are part of the border we use.

Figure 5 – Geohashes as proxy

Now, all we need to do is join the two data sets together. This kind of operation is a great candidate for a big data approach, as it allows us to run it in parallel and speed up the processing time.

Precision tradeoff

We mentioned earlier that, for the sake of argument, we replace precision with a decent approximation. Let’s now delve into the real tradeoff by adopting this approach.

The first thing that stands out with this approach is that we traded precision for cost. We are able to reduce the cost as this approach uses less hardware resources and computation time. However, this reduction in precision suffers, particularly for roads located near the borders as they might be wrongly classified.

Going back to the initial example, let’s take the case of the external road, on the left side of the area. As you can see in Figure 6, it is clear that the road does not belong to our border. But when we apply the geohash approach it gets included into the middle geohash.

Figure 6 – Wrong road localisation

Given that just a small part of the geohash falls inside the border, the entire geohash will be classified as belonging to that area, and, as a consequence, the road that belongs to that geohash will be wrongly localised and we’ll end up adding the wrong localisation information to that road. This is clearly a consequence of the precision tradeoff. So, how can we solve this?

Geohash precision

One option is to increase the geohash precision. By using smaller and smaller geohashes, we can better reflect the actual area. As we go deeper and we further split the geohash, we can accurately follow the border. However, a high geohash precision also equates to a computationally intensive operation bringing us back to our initial situation. Therefore, it is crucial to find the right balance between the geohash size and the complexity of operations.

Figure 7 – Geohash precision

Geohash coverage percentage

To find a balance between precision and data loss, we looked into calculating the geohash coverage percentage. For example, in Figure 8, the blue geohash is entirely within the border. Here we can say that it has a 100% geohash coverage.

Figure 8 – Geohash inside the border

However, take for example the geohash in Figure 9. It touches the border and has only around 80% of its surface inside the area. Given that most of its surface is within the border, we still can say that it belongs to the area.

Figure 9 – Geohash partially inside the border

Let’s look at another example. In Figure 10, only a small part of the geohash is within the border. We can say that the geohash coverage percentage here is around 5%. For these cases, it becomes difficult for us to determine whether the geohash does belong to the area. What would be a good tradeoff in this case?

Figure 10 – Geohash barely inside the border

Border shape

To go one step further, we can consider a mixed solution, where we use the border shape but only for the geohashes touching the border. This would still be an intensive computational operation but the number of roads located in these geohashes will be much smaller, so it is still a gain.

For the geohashes with full coverage inside the area, we’ll use the geohash for the localisation, the simpler operation. For the geohashes that are near the border, we’ll use a different approach. To increase the precision around the borders, we can cut the geohash following the border’s shape. Instead of having a rectangle, we’ll use a more complex shape which is still simpler than the initial border shape.

Figure 11 – Geohash following a border’s shape

Result

We began with a simple approach and we enhanced it to improve precision. This also increased the complexity of the operation. We then asked, what are the actual gains? Was it worthwhile to go through all this process? In this section, we put this to the test.

We first created a benchmark by taking a small sample of the data and ran the localisation process on a laptop. The sample comprised approximately 2% of the borders and 0.0014% of the roads. We ran the localisation process using two approaches.

  • With the first approach, we calculated the intersection between all the roads and borders. The entire operation took around 38 minutes.
  • For the second approach, we optimised the operation using geohashes. In this approach, the runtime was only 78 seconds (1.3 minutes).

However, it is important to note that this is not an apples-to-apples comparison. The operation that we measured was the localisation of the roads but we did not include the border filling operation where we fill the borders with geohashes. This is because this operation does not need to be run every time. It can be run once and reused multiple times.

Though not often required, it is still crucial to understand and consider the operation of precomputing areas and filling borders with geohashes. The precomputation process depends on several factors:

  • Number and shape of the borders – The more borders and the more complex the borders are, the longer the operation will take.
  • Geohash precision – How accurate do we need our localisation to be? The more accurate it needs to be, the longer it will take.
  • Hardware availability

Going back to our hypothesis, although this precomputation might be expensive, it is rarely run as the borders don’t change often and can be triggered only when needed. However, regular computation, where we find the area to which each road belongs to, is often run as the roads change constantly. In our system, we run this localisation for each map processing.

We can also further optimise this process by applying the opposite approach. Geohashes that have full coverage inside a border can be merged together into larger geohashes thus simplifying the computation inside the border. In the end, we can have a solution that is fully optimised for our needs with the best cost-to-performance ratio.

Figure 12 – Optimised geohashes

Conclusion

Although geohashes seem to be the right solution for this kind of problem, we also need to monitor their content. One consideration is the road density inside a geohash. For example, a geohash inside a city centre usually has a lot of roads while one in the countryside may have much less. We need to consider this aspect to have a balanced computation operation and take full advantage of the big data approach. In our case, we achieve this balance by considering the number of road kilometres within a geohash.

Figure 13 – Unbalanced data

Additionally, the resources that we choose also matter. To optimise time and cost, we need to find the right balance between the running time and resource cost. As shown in Figure 14, based on a sample data we ran, sometimes, we get the best result when using smaller machines.

Figure 14 – Cost vs runtime

The achievements and insights showcased in this article are indebted to the contributions made by Mihai Chintoanu. His expertise and collaborative efforts have profoundly enriched the content and findings presented herein.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

3. Psyberg: Automated end to end catch up

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/3-psyberg-automated-end-to-end-catch-up-260fbe366fe2

By Abhinaya Shetty, Bharath Mummadisetty

This blog post will cover how Psyberg helps automate the end-to-end catchup of different pipelines, including dimension tables.

In the previous installments of this series, we introduced Psyberg and delved into its core operational modes: Stateless and Stateful Data Processing. Now, let’s explore the state of our pipelines after incorporating Psyberg.

Pipelines After Psyberg

Let’s explore how different modes of Psyberg could help with a multistep data pipeline. We’ll return to the sample customer lifecycle:

Processing Requirement:
Keep track of the end-of-hour state of accounts, e.g., Active/Upgraded/Downgraded/Canceled.

Solution:
One potential approach here would be as follows

  1. Create two stateless fact tables :
    a. Signups
    b. Account Plans
  2. Create one stateful fact table:
    a. Cancels
  3. Create a stateful dimension that reads the above fact tables every hour and derives the latest account state.

Let’s look at how this can be integrated with Psyberg to auto-handle late-arriving data and corresponding end-to-end data catchup.

Navigating the Workflow: How Psyberg Handles Late-Arriving Data

We follow a generic workflow structure for both stateful and stateless processing with Psyberg; this helps maintain consistency and makes debugging and understanding these pipelines easier. The following is a concise overview of the various stages involved; for a more detailed exploration of the workflow specifics, please turn to the second installment of this series.

1. Psyberg Initialization

The workflow starts with the Psyberg initialization (init) step.

  • Input: List of source tables and required processing mode
  • Output: Psyberg identifies new events that have occurred since the last high watermark (HWM) and records them in the session metadata table.

The session metadata table can then be read to determine the pipeline input.

2. Write-Audit-Publish (WAP) Process

This is the general pattern we use in our ETL pipelines.

a. Write
Apply the ETL business logic to the input data identified in Step 1 and write to an unpublished iceberg snapshot based on the Psyberg mode

b. Audit
Run various quality checks on the staged data. Psyberg’s metadata session table is used to identify the partitions included in a batch run. Several audits, such as verifying source and target counts, are performed on this batch of data.

c. Publish
If the audits are successful, cherry-pick the staging snapshot to publish the data to production.

3. Psyberg Commit

Now that the data pipeline has been executed successfully, the new high watermark identified in the initialization step is committed to Psyberg’s high watermark metadata table. This ensures that the next instance of the workflow will pick up newer updates.

Callouts

  • Having the Psyberg step isolated from the core data pipeline allows us to maintain a consistent pattern that can be applied across stateless and stateful processing pipelines with varying requirements.
  • This also enables us to update the Psyberg layer without touching the workflows.
  • This is compatible with both Python and Scala Spark.
  • Debugging/figuring out what was loaded in every run is made easy with the help of workflow parameters and Psyberg Metadata.

The Setup: Automated end-to-end catchup

Let’s go back to our customer lifecycle example. Once we integrate all four components with Psyberg, here’s how we would set it up for automated catchup.

The three fact tables, comprising the signup and plan facts encapsulated in Psyberg’s stateless mode, along with the cancel fact in stateful mode, serve as inputs for the stateful sequential load ETL pipeline. This data pipeline monitors the various stages in the customer lifecycle.

In the sequential load ETL, we have the following features:

  • Catchup Threshold: This defines the lookback period for the data being read. For instance, only consider the last 12 hours of data.
  • Data Load Type: The ETL can either load the missed/new data specifically or reload the entire specified range.
  • Metadata Recording: Metadata is persisted for traceability.

Here is a walkthrough on how this system would automatically catch up in the event of late-arriving data:

Premise: All the tables were last loaded up to hour 5, meaning that any data from hour 6 onwards is considered new, and anything before that is classified as late data (as indicated in red above)

Fact level catchup:

  1. During the Psyberg initialization phase, the signup and plan facts identify the late data from hours 2 and 3, as well as the most recent data from hour 6. The ETL then appends this data to the corresponding partitions within the fact tables.
  2. The Psyberg initialization for the cancel fact identifies late data from hour 5 and additional data from hours 6 and 7. Since this ETL operates in stateful mode, the data in the target table from hours 5 to 7 will be overwritten with the new data.
  3. By focusing solely on updates and avoiding reprocessing of data based on a fixed lookback window, both Stateless and Stateful Data Processing maintain a minimal change footprint. This approach ensures data processing is both efficient and accurate.

Dimension level catchup:

  1. The Psyberg wrapper for this stateful ETL looks at the updates to the upstream Psyberg powered fact tables to determine the date-hour range to reprocess. Here’s how it would calculate the above range:
    MinHr = least(min processing hour from each source table)
    This ensures that we don’t miss out on any data, including late-arriving data. In this case, the minimum hour to process the data is hour 2.
    MaxHr = least(max processing hour from each source table)
    This ensures we do not process partial data, i.e., hours for which data has not been loaded into all source tables. In this case, the maximum hour to process the data is hour 6.
  2. The ETL process uses this time range to compute the state in the changed partitions and overwrite them in the target table. This helps overwrite data only when required and minimizes unnecessary reprocessing.

As seen above, by chaining these Psyberg workflows, we could automate the catchup for late-arriving data from hours 2 and 6. The Data Engineer does not need to perform any manual intervention in this case and can thus focus on more important things!

The Impact: How Psyberg Transformed Our Workflows

The introduction of Psyberg into our workflows has served as a valuable tool in enhancing accuracy and performance. The following are key areas that have seen improvements from using Psyberg:

  • Computational Resources Used:
    In certain instances, we’ve noticed a significant reduction in resource utilization, with the number of Spark cores used dropping by 90% following the implementation of Psyberg, compared to using fixed lookback windows
  • Workflow and Table Onboarding:
    We have onboarded 30 tables and 13 workflows into incremental processing since implementing Psyberg
  • Reliability and Accuracy:
    Since onboarding workflows to Psyberg, we have experienced zero manual catchups or missing data incidents
  • Bootstrap template:
    The process of integrating new tables into incremental processing has been made more accessible and now requires minimal effort using Psyberg

These performance metrics suggest that adopting Psyberg has been beneficial to the efficiency of our data processing workflows.

Next Steps and Conclusion

Integrating Psyberg into our operations has improved our data workflows and opened up exciting possibilities for the future. As we continue to innovate, Netflix’s data platform team is focused on creating a comprehensive solution for incremental processing use cases. This platform-level solution is intended to enhance our data processing capabilities across the organization. Stay tuned for a new post on this!

In conclusion, Psyberg has proven to be a reliable and effective solution for our data processing needs. As we look to the future, we’re excited about the potential for further advancements in our data platform capabilities.


3. Psyberg: Automated end to end catch up was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

2. Diving Deeper into Psyberg: Stateless vs Stateful Data Processing

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/2-diving-deeper-into-psyberg-stateless-vs-stateful-data-processing-1d273b3aaefb

By Abhinaya Shetty, Bharath Mummadisetty

In the inaugural blog post of this series, we introduced you to the state of our pipelines before Psyberg and the challenges with incremental processing that led us to create the Psyberg framework within Netflix’s Membership and Finance data engineering team. In this post, we will delve into a more detailed exploration of Psyberg’s two primary operational modes: stateless and stateful.

Modes of Operation of Psyberg

Psyberg has two main modes of operation or patterns, as we call them. Understanding the nature of the late-arriving data and processing requirements will help decide which pattern is most appropriate for a use case.

  1. Stateless Data Processing: As the name suggests, one should use this pattern in scenarios where the columns in the target table solely depend on the content of the incoming events, irrespective of their order of occurrence. For instance, consider a scenario where we need to keep track of all the customer signups over time. In this case, the order of signups wouldn’t matter, and individual signup records are independent of each other. This information has only one source, and we can append new/late records to the fact table as and when the events are received.
  2. Stateful Data Processing: This pattern is useful when the output depends on a sequence of events across one or more input streams. For example, the customer account lifecycle in a business might involve multiple stages, such as account creation, plan upgrades, downgrades, and cancellation. To derive attributes like the lifetime of an account or the latest plan the account is on, we need to track the sequence of these events across different input streams. A missed event in such a scenario would result in incorrect analysis due to a wrong derived state. Late-arriving data in such cases requires overwriting data that was previously processed to ensure all events are accounted for.

Let’s visualize how these two modes work within our data processing pipeline using a general workflow for loading a fact table. If you would like to learn more about how the workflows are orchestrated in Netflix Maestro scheduler, please check out this blog post from our data platform team.

With this illustration as our guide, let’s explore each mode in more detail.

The Psyberg Initialization Phase

This step invokes Psyberg with the required parameters. Based on these parameters, Psyberg then computes the correct data range for the pipeline processing needs.

Input parameters in this step include the following:

Initialization for Stateless Data Processing

Let’s use the signup fact table as an example here. This table’s workflow runs hourly, with the main input source being an Iceberg table storing all raw signup events partitioned by landing date, hour, and batch id.

Here’s a YAML snippet outlining the configuration for this during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=signup_fact_load
- --src_tables=raw_signups
- --psyberg_session_id=20230914061001
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=1

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateless pattern since etl_pattern_id=1.

Psyberg also uses the provided inputs to detect the Iceberg snapshots that persisted after the latest high watermark available in the watermark table. Using the summary column in snapshot metadata [see the Iceberg Metadata section in post 1 for more details], we parse out the partition information for each Iceberg snapshot of the source table.

Psyberg then retains these processing URIs (an array of JSON strings containing combinations of landing date, hour, and batch IDs) as determined by the snapshot changes. This information and other calculated metadata are stored in the psyberg_session_f table. This stored data is then available for the subsequent LOAD.FACT_TABLE job in the workflow to utilize and for analysis and debugging purposes.

Initialization for Stateful Data Processing

Stateful Data Processing is used when the output depends on a sequence of events across one or more input streams.

Let’s consider the example of creating a cancel fact table, which takes the following as input:

  1. Raw cancellation events indicating when the customer account was canceled
  2. A fact table that stores incoming customer requests to cancel their subscription at the end of the billing period

These inputs help derive additional stateful analytical attributes like the type of churn i.e. voluntary or involuntary, etc.

The initialization step for Stateful Data Processing differs slightly from Stateless. Psyberg offers additional configurations according to the pipeline needs. Here’s a YAML snippet outlining the configuration for the cancel fact table during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=cancel_fact_load
- --src_tables=raw_cancels|processing_ts,cancel_request_fact
- --psyberg_session_id=20230914061501
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=2

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateful pattern since etl_pattern_id is 2.

Notice the additional detail in the src_tables list corresponding to raw_cancels above. The processing_ts here represents the event processing timestamp which is different from the regular Iceberg snapshot commit timestamp i.e. event_landing_ts as described in part 1 of this series.

It is important to capture the range of a consolidated batch of events from all the sources i.e. both raw_cancels and cancel_request_fact, while factoring in late-arriving events. Changes to the source table snapshots can be tracked using different timestamp fields. Knowing which timestamp field to use i.e. event_landing_ts or something like processing_ts helps avoid missing events.

Similar to the approach in stateless data processing, Psyberg uses the provided inputs to parse out the partition information for each Iceberg snapshot of the source table.

Sample parsed input for target snapshot_date 20230914 and snapshot_hour 9

This is then used to query the partitions metadata table which has the min and max range for each column in the source table. In this case, we look at the min and max range of the processing_ts column to determine actual partitions for any late-arriving events. The minimum value here helps determine the lower limit of the data to be processed i.e. the derived minimum date and hour based on the input epoch timestamp.

Lower Limit to be processed = least ( “min” event_processing_ts)

It also tracks the VTTS (Valid To TimeStamp) of all the input streams and determines the minimum VTTS of all the streams together. This helps determine the upper limit of data to be processed, thus restricting the data load based on data completeness of all the streams combined.

Upper Limit to be processed = least (vtts date-hour)

Using this metadata from different streams, Psyberg calculates several parameters like minimum/maximum processing date and hour and event landing date hour. These parameters, along with other metadata, discussed in the previous post, are persisted in the psyberg_session_f table for analysis and debugging purposes.

Write Audit Publish (WAP) process

The Write Audit Publish (WAP) process is a general pattern we use in our ETLs to validate writes to the uncommitted Iceberg snapshot before publishing to the target table. The LOAD.FACT_TABLE step takes psyberg_session_id and process_name as input arguments.

For stateless pattern, the processing URIs to be processed as part of the load step are identified by reading the psyberg_session_f table. This information is then used to filter the source table and apply the business logic to create the signup fact table. Any late-arriving signup events data is appended to the target table partitions as part of this. All these writes go into the uncommitted Iceberg snapshot managed by the WAP pattern.

Similarly, in the stateful pattern, the ETL step reads the psyberg_session_f table to identify the derived minimum and maximum date hour range to be processed, which acts as a filter for different input tables involved in the ETL. After applying the corresponding business logic for cancellation events, we create the cancel fact table along with columns like cancellation type (i.e., voluntary vs involuntary churn) representing the state of the canceled account. If there are any late-arriving events, Psyberg handles them automatically by providing the correct range to the data process to derive the state changes correctly.

Audits

We run different audits on the uncommitted Iceberg snapshot created as part of the job run. Leveraging Psyberg metadata, we can identify the cohort of data involved as part of the job run. This helps in pinpointing changes and applying blocking audits efficiently. Audits like source-to-target count comparison and checking for no missing events in the target Iceberg snapshot ensure data integrity and completeness. Once the audits pass successfully, the data is published to the target table.

HWM Commit

Leveraging Psyberg metadata tables, we determine the latest timestamp associated with the Iceberg snapshot seen as part of the job run. This timestamp is used to update the high watermark table with the new high watermark so that the subsequent pipeline instance can pick up the next set of changes.

Conclusion

This exploration shows how Psyberg brings efficiency, accuracy, and timeliness to Stateless and Stateful Data Processing within the Membership and Finance data engineering team. Join us in the next part of our blog series, where we’ll discuss how it also helps automate the end-to-end catchup of different pipelines.


2. Diving Deeper into Psyberg: Stateless vs Stateful Data Processing was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

1. Streamlining Membership Data Engineering at Netflix with Psyberg

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/1-streamlining-membership-data-engineering-at-netflix-with-psyberg-f68830617dd1

By Abhinaya Shetty, Bharath Mummadisetty

At Netflix, our Membership and Finance Data Engineering team harnesses diverse data related to plans, pricing, membership life cycle, and revenue to fuel analytics, power various dashboards, and make data-informed decisions. Many metrics in Netflix’s financial reports are powered and reconciled with efforts from our team! Given our role on this critical path, accuracy is paramount. In this context, managing the data, especially when it arrives late, can present a substantial challenge!

In this three-part blog post series, we introduce you to Psyberg, our incremental data processing framework designed to tackle such challenges! We’ll discuss batch data processing, the limitations we faced, and how Psyberg emerged as a solution. Furthermore, we’ll delve into the inner workings of Psyberg, its unique features, and how it integrates into our data pipelining workflows. By the end of this series, we hope you will gain an understanding of how Psyberg transformed our data processing, making our pipelines more efficient, accurate, and timely. Let’s dive in!

The Challenge: Incremental Data Processing with Late Arriving Data

Our teams’ data processing model mainly comprises batch pipelines, which run at different intervals ranging from hourly to multiple times a day (also known as intraday) and even daily. We expect complete and accurate data at the end of each run. To meet such expectations, we generally run our pipelines with a lag of a few hours to leave room for late-arriving data.

What is late-arriving data?

Late-arriving data is essentially delayed data due to system retries, network delays, batch processing schedules, system outages, delayed upstream workflows, or reconciliation in source systems.

How does late-arriving data impact us?

You could think of our data as a puzzle. With each new piece of data, we must fit it into the larger picture and ensure it’s accurate and complete. Thus, we must reprocess the missed data to ensure data completeness and accuracy.

Types of late-arriving data

Based on the structure of our upstream systems, we’ve classified late-arriving data into two categories, each named after the timestamps of the updated partition:

Ways to process such data

Our team previously employed some strategies to manage these scenarios, which often led to unnecessarily reprocessing unchanged data. Some techniques we used were:

1. Using fixed lookback windows to always reprocess data, assuming that most late-arriving events will occur within that window. However, this approach usually leads to redundant data reprocessing, thereby increasing ETL processing time and compute costs. It also becomes inefficient as the data scale increases. Imagine reprocessing the past 6 hours of data every hour!

2. Add alerts to flag when late arriving data appears, block the pipelines, and perform a manual intervention where we triggered backfill pipelines to handle the missed events. This approach was a simple solution with minimal extra processing for the most part and, hence, was our preferred solution. However, when the late events occurred, the pain of reprocessing data and catching up on all the dependent pipelines was not worth it! We will talk about this shortly.

At a high level, both these approaches were inefficient for intraday pipelines and impacted cost, performance, accuracy, and time. We developed Psyberg, an incremental processing framework using Iceberg to handle these challenges more effectively.

The state of our pipelines before Psyberg

Before diving into the world of Psyberg, it’s crucial to take a step back and reflect on the state of the data pipelines in our team before its implementation. The complexities involved in these processes and the difficulties they posed led to the development of Psyberg.

At Netflix, our backend microservices continuously generate real-time event data that gets streamed into Kafka. These raw events are the source of various data processing workflows within our team. We ingest this diverse event data and transform it into standardized fact tables. The fact tables then feed downstream intraday pipelines that process the data hourly. The sequential load ETL shown in the diagram below depicts one such pipeline that calculates an account's state every hour.

Raw data for hours 3 and 6 arrive. Hour 6 data flows through the various workflows, while hour 3 triggers a late data audit alert.

Let’s walk through an example to understand the complexity of this pre-Psyberg world.

Consider a simplified version of our pipelines where we process three events: signups, plan changes, and cancels. Now imagine that some signup events from hour 3 were delayed and sent in at hour 6 instead. Our audits would detect this and alert the on-call data engineer (DE). The on-call DE would then face the daunting task of making things right!

Step 1: Dive into the audit logs to identify the late-arriving data and the impacted workflows. In this case, they would discover that the late-arriving data for hour 3 must be included in the signup facts.

Step 2: Stop all impacted workflows and downstream jobs (such as the sequential load ETL) and patch the missed data in the fact tables. Now, the data in the signup fact is patched.

Step 3: Identify the number of partitions to be rerun for the sequential stateful load jobs to account for the delayed data and rerun them from the impacted date-hour. The DE would note that the data for hours 3–6 needs to be reprocessed and will retrigger four instances to be run sequentially. This step is crucial because missing signup events from hour 3 would result in us missing subsequent events for those affected accounts (e.g., a cancel event for a missed signup would have had no effect). As we capture the state of an account based on the sequence of different types of events, rerunning the sequential load ETL from hours 3 to 6 ensures the accurate representation of account states.

Step 4: Now that we’ve spent significant time triaging and resolving the alert, the sequential ETL workflow likely experienced a delay. As a result, we need to catch up to schedule. To compensate for the lost time, the DE must trigger a few additional instances until the latest hour that would have run if the data hadn’t arrived late.

This entire process was challenging and required significant manual intervention from the on-call DE perspective. Note that these are hourly jobs, so the alert could be triggered at any time of the day (or night!). Yes, they were infrequent, but a big pain point when they occurred! Also, the on-call DE was usually not the SME for these pipelines, as the late data could have arrived in any of our upstream pipelines. To solve these problems, we came up with Psyberg!

Psyberg: The Game Changer!

Psyberg automates our data loads, making it suitable for various data processing needs, including intraday pipeline use cases. It leverages Iceberg metadata to facilitate processing incremental and batch-based data pipelines.

One of the critical features of Psyberg is its ability to detect and manage late-arriving data, no matter the partition it lands in. This feature allows data pipelines to handle late-arriving data effectively without manual intervention, ensuring higher data accuracy in our systems. Iceberg metadata and Psyberg’s own metadata form the backbone of its efficient data processing capabilities.

ETL Process High Watermark

This is the last recorded update timestamp for any data pipeline process. This is mainly used to identify new changes since the last update.

Iceberg Metadata

Psyberg primarily harnesses two key iceberg metadata tables — snapshots and partitions — to manage the workload. All Iceberg tables have associated metadata that provide insight into changes or updates within the data tables.

The snapshots metadata table records essential metadata such as:

  • The creation time of a snapshot
  • The type of operation performed (append, overwrite, etc.)
  • A summary of partitions created/updated during the generation of the Iceberg snapshot

These details enable Psyberg to track different operations and identify changes made to a source table since the previous high watermark. For example:

The partitions metadata table is particularly interesting as it stores:

  • Information about partition keys used in the data table
  • Column names and the range of values for each column within a specific partition

One unique aspect of Netflix’s internal implementation is that it provides the range of values for each column within a partition in a deserialized format. This information helps Psyberg comprehend the timestamp ranges for both types of late-arriving data (event and processing time) without querying the actual data.

Psyberg Metadata

In addition to Iceberg metadata, Psyberg maintains its own metadata tables — the session table and the high watermark table. Both these tables are partitioned by the pipeline process name to maintain information related to each data pipeline independently.

The session table captures metadata specific to each pipeline run, including:

  • Process name partition to track all the runs associated with the data pipeline process
  • Session ID to track unique runs within the process
  • Processing URIs to identify the input partitions involved in the load
  • “from date”, “from hour”, “to date” and “to hour” for both event and processing times

The high watermark table stores relevant values from the session table at the end of each pipeline run:

  • Latest and previous high water mark timestamp
  • Metadata related to the latest run

This information is vital for each pipeline run instance as it helps determine the data to be loaded, updates the high water mark after processing, and finally generates output signals to inform downstream workflows about the date-hour up to which data is complete and available. It also serves as an essential resource for debugging and creating audits on the pipeline jobs.

Conclusion

In this post, we described our data architecture at a high level, along with the pain points that led to the development of Psyberg. We also went into details related to the metadata that powers Psyberg. If you understand the challenges faced by the on-call DE and would like to learn more about our solution, please check out the next iteration of this three-part series, where we delve deeper into different modes of Psyberg.


1. Streamlining Membership Data Engineering at Netflix with Psyberg was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Graph modelling guidelines

Post Syndicated from Grab Tech original https://engineering.grab.com/graph-modelling-guidelines

Introduction

Graph modelling is a highly effective technique for representing and analysing complex and interconnected data across various domains. By deciphering relationships between entities, graph modelling can reveal insights that might be otherwise difficult to identify using traditional data modelling approaches. In this article, we will explore what graph modelling is and guide you through a step-by-step process of implementing graph modelling to create a social network graph.

What is graph modelling?

Graph modelling is a method for representing real-world entities and their relationships using nodes, edges, and properties. It employs graph theory, a branch of mathematics that studies graphs, to visualise and analyse the structure and patterns within complex datasets. Common applications of graph modelling include social network analysis, recommendation systems, and biological networks.

Graph modelling process

Step 1: Define your domain

Before diving into graph modelling, it’s crucial to have a clear understanding of the domain you’re working with. This involves getting acquainted with the relevant terms, concepts, and relationships that exist in your specific field. To create a social network graph, familiarise yourself with terms like users, friendships, posts, likes, and comments.

Step 2: Identify entities and relationships

After defining your domain, you need to determine the entities (nodes) and relationships (edges) that exist within it. Entities are the primary objects in your domain, while relationships represent how these entities interact with each other. In a social network graph, users are entities, and friendships are relationships.

Step 3: Establish properties

Each entity and relationship may have a set of properties that provide additional information. In this step, identify relevant properties based on their significance to the domain. A user entity might have properties like name, age, and location. A friendship relationship could have a ‘since’ property to denote the establishment of the friendship.

Step 4: Choose a graph model

Once you’ve identified the entities, relationships, and properties, it’s time to choose a suitable graph model. Two common models are:

  • Property graph: A versatile model that easily accommodates properties on both nodes and edges. It’s well-suited for most applications.
  • Resource Description Framework (RDF): A World Wide Web Consortium (W3C) standard model, using triples of subject-predicate-object to represent data. It is commonly used in semantic web applications.

For a social network graph, a property graph model is typically suitable. This is because user entities have many attributes and features. Property graphs provide a clear representation of the relationships between people and their attribute profiles.

Figure 1 – Social network graph

Step 5: Develop a schema

Although not required, developing a schema can be helpful for large-scale projects and team collaborations. A schema defines the structure of your graph, including entity types, relationships, and properties. In a social network graph, you might have a schema that specifies the types of nodes (users, posts) and the relationships between them (friendships, likes, comments).

Step 6: Import or generate data

Next, acquire the data needed to populate your graph. This can come in the form of existing datasets or generated data from your application. For a social network graph, you can import user information from a CSV file and generate simulated friendships, posts, likes, and comments.

Step 7: Implement the graph using a graph database or other storage options

Finally, you need to store your graph data using a suitable graph database. Neo4j, Amazon Neptune, or Microsoft Azure Cosmos DB are examples of graph databases. Alternatively, depending on your specific requirements, you can use a non-graph database or an in-memory data structure to store the graph.

Step 8: Analyse and visualise the graph

After implementing the graph, you can perform various analyses using graph algorithms, such as shortest path, centrality, or community detection. In addition, visualising your graph can help you gain insights and facilitate communication with others.

Conclusion

By following these steps, you can effectively create and analyse graph models for your specific domain. Remember to adjust the steps according to your unique domain and requirements, and always ensure that confidential and sensitive data is properly protected.

References

[1] What is a Graph Database?

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

LLM-powered data classification for data entities at scale

Post Syndicated from Grab Tech original https://engineering.grab.com/llm-powered-data-classification

Editor’s note: This post was originally published in October 2023 and has been updated to reflect Grab’s partnership with the Infocomm Media Development Authority as part of its Privacy Enhancing Technology Sandbox that concluded in March 2024.

Introduction

At Grab, we deal with PetaByte-level data and manage countless data entities ranging from database tables to Kafka message schemas. Understanding the data inside is crucial for us, as it not only streamlines the data access management to safeguard the data of our users, drivers and merchant-partners, but also improves the data discovery process for data analysts and scientists to easily find what they need.

The Caspian team (Data Engineering team) collaborated closely with the Data Governance team on automating governance-related metadata generation. We started with Personal Identifiable Information (PII) detection and built an orchestration service using a third-party classification service. With the advent of the Large Language Model (LLM), new possibilities dawned for metadata generation and sensitive data identification at Grab. This prompted the inception of the project, which aimed to integrate LLM classification into our existing service. In this blog, we share insights into the transformation from what used to be a tedious and painstaking process to a highly efficient system, and how it has empowered the teams across the organisation.

For ease of reference, here’s a list of terms we’ve used and their definitions:

  • Data Entity: An entity representing a schema that contains rows/streams of data, for example, database tables, stream messages, data lake tables.
  • Prediction: Refers to the model’s output given a data entity, unverified manually.
  • Data Classification: The process of classifying a given data entity, which in the context of this blog, involves generating tags that represent sensitive data or Grab-specific types of data.
  • Metadata Generation: The process of generating the metadata for a given data entity. In this blog, since we limit the metadata to the form of tags, we often use this term and data classification interchangeably.
  • Sensitivity: Refers to the level of confidentiality of data. High sensitivity means that the data is highly confidential. The lowest level of sensitivity often refers to public-facing or publicly-available data.

Background

When we first approached the data classification problem, we aimed to solve something more specific – Personal Identifiable Information (PII) detection. Initially, to protect sensitive data from accidental leaks or misuse, Grab implemented manual processes and campaigns targeting data producers to tag schemas with sensitivity tiers. These tiers ranged from Tier 1, representing schemas with highly sensitive information, to Tier 4, indicating no sensitive information at all. As a result, half of all schemas were marked as Tier 1, enforcing the strictest access control measures.

The presence of a single Tier 1 table in a schema with hundreds of tables justifies classifying the entire schema as Tier 1. However, since Tier 1 data is rare, this implies that a large volume of non-Tier 1 tables, which ideally should be more accessible, have strict access controls.

Shifting access controls from the schema-level to the table-level could not be done safely due to the lack of table classification in the data lake. We could have conducted more manual classification campaigns for tables, however this was not feasible for two reasons:

  1. The volume, velocity, and variety of data had skyrocketed within the organisation, so it took significantly more time to classify at table level compared to schema level. Hence, a programmatic solution was needed to streamline the classification process, reducing the need for manual effort.
  2. App developers, despite being familiar with the business scope of their data, interpreted internal data classification policies and external data regulations differently, leading to inconsistencies in understanding.

A service called Gemini (named before Google announced the Gemini model!) was built internally to automate the tag generation process using a third party data classification service. Its purpose was to scan the data entities in batches and generate column/field level tags. These tags would then go through a review process by the data producers. The data governance team provided classification rules and used regex classifiers, alongside the third-party tool’s own machine learning classifiers, to discover sensitive information.

After the implementation of the initial version of Gemini, a few challenges remained.

  1. The third-party tool did not allow customisations of its machine learning classifiers, and the regex patterns produced too many false positives during our evaluation.
  2. Building in-house classifiers would require a dedicated data science team to train a customised model. They would need to invest a significant amount of time to understand data governance rules thoroughly and prepare datasets with manually labelled training data.

LLM came up on our radar following its recent “iPhone moment” with ChatGPT’s explosion onto the scene. It is trained using an extremely large corpus of text and contains trillions of parameters. It is capable of conducting natural language understanding tasks, writing code, and even analysing data based on requirements. The LLM naturally solves the mentioned pain points as it provides a natural language interface for data governance personnel. They can express governance requirements through text prompts, and the LLM can be customised effortlessly without code or model training.

Methodology

In this section, we dive into the implementation details of the data classification workflow. Please refer to the diagram below for a high-level overview:

Figure 1 – Overview of data classification workflow

This diagram illustrates how data platforms, the metadata generation service (Gemini), and data owners work together to classify and verify metadata. Data platforms trigger scan requests to the Gemini service to initiate the tag classification process. After the tags are predicted, data platforms consume the predictions, and the data owners are notified to verify these tags.

Orchestration

Figure 2 – Architecture diagram of the orchestration service Gemini

Our orchestration service, Gemini, manages the data classification requests from data platforms. From the diagram, the architecture contains the following components:

  1. Data platforms: These platforms are responsible for managing data entities and initiating data classification requests.
  2. Gemini: This orchestration service communicates with data platforms, schedules and groups data classification requests.
  3. Classification engines: There are two available engines (a third-party classification service and GPT3.5) for executing the classification jobs and return results. Since we are still in the process of evaluating two engines, both of the engines are working concurrently.

When the orchestration service receives requests, it helps aggregate the requests into reasonable mini-batches. Aggregation is achievable through the message queue at fixed intervals. In addition, a rate limiter is attached at the workflow level. It allows the service to call the Cloud Provider APIs with respective rates to prevent the potential throttling from the service providers.

Specific to LLM orchestration, there are two limits to be mindful of. The first one is the context length. The input length cannot surpass the context length, which was 4000 tokens for GPT3.5 at the time of development (or around 3000 words). The second one is the overall token limit (since both the input and output share the same token limit for a single request). Currently, all Azure OpenAI model deployments share the same quota under one account, which is set at 240K tokens per minute.

Classification

In this section, we focus on LLM-powered column-level tag classification. The tag classification process is defined as follows:

Given a data entity with a defined schema, we want to tag each field of the schema with metadata classifications that follow an internal classification scheme from the data governance team. For example, the field can be tagged as a <particular kind of business metric> or a <particular type of personally identifiable information (PII). These tags indicate that the field contains a business metric or PII.

We ask the language model to be a column tag generator and to assign the most appropriate tag to each column. Here we showcase an excerpt of the prompt we use:

You are a database column tag classifier, your job is to assign the most appropriate tag based on table name and column name. The database columns are from a company that provides ride-hailing, delivery, and financial services. Assign one tag per column. However not all columns can be tagged and these columns should be assigned <None>. You are precise, careful and do your best to make sure the tag assigned is the most appropriate.

The following is the list of tags to be assigned to a column. For each line, left hand side of the : is the tag and right hand side is the tag definition

…
<Personal.ID> : refers to government-provided identification numbers that can be used to uniquely identify a person and should be assigned to columns containing "NRIC", "Passport", "FIN", "License Plate", "Social Security" or similar. This tag should absolutely not be assigned to columns named "id", "merchant id", "passenger id", “driver id" or similar since these are not government-provided identification numbers. This tag should be very rarely assigned.

<None> : should be used when none of the above can be assigned to a column.
…

Output Format is a valid json string, for example:

[{
        "column_name": "",
        "assigned_tag": ""
}]

Example question

`These columns belong to the "deliveries" table

        1. merchant_id
        2. status
        3. delivery_time`

Example response

[{
        "column_name": "merchant_id",
        "assigned_tag": "<Personal.ID>"
},{
        "column_name": "status",
        "assigned_tag": "<None>"
},{
        "column_name": "delivery_time",
        "assigned_tag": "<None>"
}]

We also curated a tag library for LLM to classify. Here is an example:

Column-level Tag Definition
Personal.ID Refers to external identification numbers that can be used to uniquely identify a person and should be assigned to columns containing “NRIC”, “Passport”, “FIN”, “License Plate”, “Social Security” or similar.
Personal.Name Refers to the name or username of a person and should be assigned to columns containing “name”, “username” or similar.
Personal.Contact_Info Refers to the contact information of a person and should be assigned to columns containing “email”, “phone”, “address”, “social media” or similar.
Geo.Geohash Refers to a geohash and should be assigned to columns containing “geohash” or similar.
None Should be used when none of the above can be assigned to a column.

The output of the language model is typically in free text format, however, we want the output in a fixed format for downstream processing. Due to this nature, prompt engineering is a crucial component to make sure downstream workflows can process the LLM’s output.

Here are some of the techniques we found useful during our development:

  1. Articulate the requirements: The requirement of the task should be as clear as possible, LLM is only instructed to do what you ask it to do.
  2. Few-shot learning: By showing the example of interaction, models understand how they should respond.
  3. Schema Enforcement: Leveraging its ability of understanding code, we explicitly provide the DTO (Data Transfer Object) schema to the model so that it understands that its output must conform to it.
  4. Allow for confusion: In our prompt we specifically added a default tag – the LLM is instructed to output the default <None> tag when it cannot make a decision or is confused.

Regarding classification accuracy, we found that it is surprisingly accurate with its great semantic understanding. For acknowledged tables, users on average change less than one tag. Also, during an internal survey done among data owners at Grab in September 2023, 80% reported that this new tagging process helped them in tagging their data entities.

Publish and verification

The predictions are published to the Kafka queue to downstream data platforms. The platforms inform respective users weekly to verify the classified tags to improve the model’s correctness and to enable iterative prompt improvement. Meanwhile, we plan to remove the verification mandate for users once the accuracy reaches a certain level.

Figure 3 – Verification message shown in the data platform for user to verify the tags

Impact

Since the new system was rolled out, we have successfully integrated this with Grab’s metadata management platform and production database management platform. Within a month since its rollout, we have scanned more than 20,000 data entities, averaging around 300-400 entities per day.

Using a quick back-of-the-envelope calculation, we can see the significant time savings achieved through automated tagging. Assuming it takes a data owner approximately 2 minutes to classify each entity, we are saving approximately 360 man-days per year for the company. This allows our engineers and analysts to focus more on their core tasks of engineering and analysis rather than spending excessive time on data governance.

The classified tags pave the way for more use cases downstream. These tags, in combination with rules provided by data privacy office in Grab, enable us to determine the sensitivity tier of data entities, which in turn will be leveraged for enforcing the Attribute-based Access Control (ABAC) policies and enforcing Dynamic Data Masking for downstream queries. To learn more about the benefits of ABAC, readers can refer to another engineering blog posted earlier.

Cost wise, for the current load, it is extremely affordable contrary to common intuition. This affordability enables us to scale the solution to cover more data entities in the company.

What’s next?

Prompt improvement

We are currently exploring feeding sample data and user feedback to greatly increase accuracy. Meanwhile, we are experimenting on outputting the confidence level from LLM for its own classification. With confidence level output, we would only trouble users when the LLM is uncertain of its answers. Hopefully this can remove even more manual processes in the current workflow.

Prompt evaluation

To track the performance of the prompt given, we are building analytical pipelines to calculate the metrics of each version of the prompt. This will help the team better quantify the effectiveness of prompts and iterate better and faster.

Scaling out

We are also planning to scale out this solution to more data platforms to streamline governance-related metadata generation to more teams. The development of downstream applications using our metadata is also on the way. These exciting applications are from various domains such as security, data discovery, etc.

Acknowledgements

Grab recently participated in the Singapore government’s regulatory sandbox, where we successfully demonstrated how LLMs can efficiently and effectively perform data classification, allowing Grab to compound the value of its data for innovative use cases while safeguarding sensitive information such as PII.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Scaling marketing for merchants with targeted and intelligent promos

Post Syndicated from Grab Tech original https://engineering.grab.com/scaling-marketing-for-merchants

Introduction

A promotional campaign is a marketing effort that aims to increase sales, customer engagement, or brand awareness for a product, service, or company. The target is to have more orders and sales by assigning promos to consumers within a given budget during the campaign period.

Figure 1 – Merchant feedback on marketing

From our research, we found that merchants have specific goals for the promos they are willing to offer. They want a simple and cost-effective way to achieve their specific business goals by providing well-designed offers to target the correct customers. From Grab’s perspective, we want to help merchants set up and run campaigns efficiently, and help them achieve their specific business goals.

Problem statement

One of Grab’s platform offerings for merchants is the ability to create promotional campaigns. With the emergence of AI technologies, we found that there are opportunities for us to further optimise the platform. The following are the gaps and opportunities we identified:

  • Globally assigned promos without smart targeting: The earlier method targeted every customer, so everyone could redeem until the promo reached the redemption limits. However, this method did not accurately meet business goals or optimise promo spending. The promotional campaign should intelligently target the best promo for each customer to increase sales and better utilise promo spending.
  • No customised promos for every merchant: To better optimise sales for each merchant, merchants should offer customised promos based on their historical consumer trends, not just a general offer set. For example, for a specific merchant, a 27% discount may be the appropriate offer to uplift revenue and sales based on user bookings. However, merchants do not always have the expertise to decide which offer to select to increase profit.
  • No AI-driven optimisation: Without AI models, it was harder for merchants to assign the right promos at scale to each consumer and optimise their business goals.

As shown in the following figure, AI-driven promotional campaigns are expected to bring higher sales with more promo spend than heuristic ones. Hence, at Grab we looked to introduce an automated, AI-driven tool that helps merchants intelligently target consumers with appropriate promos, while optimising sales and promo spending. That’s where Bullseye comes in.

Figure 2 – Graph showing the sales expectations for AI-driven pomotional campaigns

Solution

Bullseye is an automated, AI-driven promo assignment system that leverages the following capabilities:

  • Automated user segmentation: Enables merchants to target new, churned, and active users or all users.
  • Automatic promo design: Enables a merchant-level promo design framework to customise promos for each merchant or merchant group according to their business goals.
  • Assign each user the optimal promo: Users will receive promos selected from an array of available promos based on the merchant’s business objective.
  • Achieve different Grab and merchant objectives: Examples of objectives are to increase merchant sales and decrease Grab promo spend.
  • Flexibility to optimise for an individual merchant brand or group of merchant brands: For promotional campaigns, targeting and optimisation can be performed for a single or group of merchants (e.g. enabling GrabFood to run cuisine-oriented promo campaigns).

Architecture

Figure 3 – Bullseye architecture

The Bullseye architecture consists of a user interface (UI) and a backend service to handle requests. To use Bullseye, our operations team inputs merchant information into the Bullseye UI. The backend service will then interact with APIs to process the information using the AI model. As we work with a large customer population, data is stored in S3 and the API service triggering Chimera Spark job is used to run the prediction model and generate promo assignments. During the assignment, the Spark job parses the input parameters, pre-validates the input, makes some predictions, and then returns the promo assignment results to the backend service.

Implementation

The key components in Bullseye are shown in the following figure:

Figure 4 – Key components of Bullseye
  • Eater Segments Identifier: Identifies each user as active, churned, or new based on their historical orders from target merchants.
  • Promo Designer: We constructed a promo variation design framework to adaptively design promo variations for each campaign request as shown in the diagram below.
    • Offer Content Candidate Generation: Generates variant settings of promos based on the promo usage history.
    • Campaign Impact Simulator: Predicts business metrics such as revenue, sales, and cost based on the user and merchant profiles and offer features.
    • Optimal Promo Selection: Selects the optimal offer based on the predicted impact and the given campaign objective. The optimal would be based on how you define optimal. For example, if the goal is to maximise merchant sales, the model selects the top candidate which can bring the highest revenue. Finally, with the promo selection, the service returns the promo set to be used in the target campaign.

      Figure 5 – Optimal Promo Selection
  • Customer Response Model: Predicts customer responses such as order value, redemption, and take-up rate if assigning a specific promo. Bullseye captures various user attributes and compares it with an offer’s attributes. Examples of attributes are cuisine type, food spiciness, and discount amount. When there is a high similarity in the attributes, there is a higher probability that the user will take up the offer.

    Figure 6 – Customer Response Model

  • Hyper-parameter Selection: Optimises toward multiple business goals. Tuning of hyper-parameters allows the AI assignment model to learn how to meet success criteria such as cost per merchant sales (cpSales) uplift and sales uplift. The success criteria is the achieving of business goals. For example, the merchant wants the sales uplift after assigning promo, but cpSales uplift cannot be higher than 10%. With tuning, the optimiser can find optimal points to meet business goals and use AI models to search for better settings with high efficiency compared to manual specification. We need to constantly tune and iterate models and hyper-parameters to adapt to ever-evolving business goals and the local landscape.

    As shown in the image below, AI assignments without hyper-parameter tuning (HPT) leads to a high cpSales uplift but low sales uplift (red dot). So the hyper-parameters would help to fine-tune the assignment result to be in the optimal space such as the blue dot, which may have lower sales than the red dot but meet the success criteria.

    Figure 7 – Graph showing the impact of using AI assignments with HPT

Impact

We started using Bullseye in 2021. From its use we found that:

  • Hyper-parameters tuning and auto promo design can increase sales and reduce promo spend for food campaigns.
  • Promo Designer optimises budget utilisation and increases the number of promo redemptions for food campaigns.
  • The Customer Response Model reduced promo spending for Mart promotional campaigns.

Conclusion

We have seen positive results with the implementation of Bullseye such as reduced promo spending and maximised budget spending returns. In our efforts to serve our merchants better and help them achieve their business goals, we will continue to improve Bullseye. In the next phase, we plan to implement a more intelligent service, enabling reinforcement learning, and online assignment. We also aim to scale AI adoption by onboarding regional promotional campaigns as much as possible.

Special thanks to William Wu, Rui Tan, Rahadyan Pramudita, Krishna Murthy, and Jiesin Chia for making this project a success.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Stepping up marketing for advertisers: Scalable lookalike audience

Post Syndicated from Grab Tech original https://engineering.grab.com/scalable-lookalike-audiences

The advertising industry is constantly evolving, driven by advancements in technology and changes in consumer behaviour. One of the key challenges in this industry is reaching the right audience, reaching people who are most likely to be interested in your product or service. This is where the concept of a lookalike audience comes into play. By identifying and targeting individuals who share similar characteristics with an existing customer base, businesses can significantly improve the effectiveness of their advertising campaigns.

However, as the scale of Grab advertisements grows, there are several optimisations needed to maintain the efficacy of creating lookalike audiences such as high service level agreement (SLA), high cost of audience creation, and unstable data ingestion.

The need for an even more efficient and scalable solution for creating lookalike audiences was the motivation behind the development of the scalable lookalike audience platform. By developing a high-performance in-memory lookalike audience retrieval service and embedding-based lookalike audience creation and updating pipelines, t​his improved platform builds on the existing system and provides an even more effective tool for advertisers to reach their target audience.

Constant optimisation for greater precision

In the dynamic world of digital advertising, the ability to quickly and efficiently reach the right audience is paramount and a key strategy is targeted advertising. As such, we have to constantly find ways to improve our current approach to creating lookalike audiences that impacts both advertisers and users. Some of the gaps we identified included:

  • Long SLA for audience creation. Earlier, the platform stored results on Segmentation Platform (SegP) and it took two working days to generate a lookalike audience list. This is because inserting a single audience into SegP took three times longer than generating the audience. Extended creation times impacted the effectiveness of advertising campaigns, as it limited the ability of advertisers to respond quickly to changing market dynamics.

  • Low scalability. As the number of onboarded merchant-partners increased, the time and cost of generating lookalike audiences also increased proportionally. This limited the availability of lookalike audience generation for all advertisers, particularly those with large customer bases or rapidly changing audience profiles.

  • Low updating frequency of lookalike audiences. With automated updates only occurring on a weekly basis, this increased the likelihood that audiences may become outdated and ineffective. This meant there was scope to further improve to help advertisers more effectively reach their campaign goals, by targeting individuals who fit the desired audience profile.

  • High cost of creation. The cost of producing one segment can add up quickly for advertisers who need to generate multiple audiences. This could impact scalability for advertisers as they could hesitate to effectively use multiple lookalike audiences in their campaigns.

Solution

To efficiently identify the top N lookalike audiences for each Grab user from our pool of millions of users, we developed a solution that leverages user and audience representations in the form of embeddings. Embeddings are vector representations of data that utilise linear distances to capture structure from the original datasets. With embeddings, large sets of data are compressed and easily processed without affecting data integrity. This approach ensures high accuracy, low latency, and low cost in retrieving the most relevant audiences.

Our solution takes into account the fact that representation drift varies among entities as data is added. For instance, merchant-partner embeddings are more stable than passenger embeddings. By acknowledging this reality, we optimised our process to minimise cost while maintaining a desirable level of accuracy. Furthermore, we believe that having a strong representation learning strategy in the early stages reduced the need for complex models in the following stages.

Our solution comprises two main components:

  1. Real-time lookalike audience retrieving: We developed an in-memory high-performance retrieving service that stores passenger embeddings, audience embeddings, and audience score thresholds. To further reduce cost, we designed a passenger embedding compression algorithm that reduces the memory needs of passenger embeddings by around 90%.

  2. Embedding-based audience creation and updating: The output of this part of the project is an online retrieving model that includes passenger embeddings, audience embeddings, and thresholds. To minimise costs, we leverage the passenger embeddings that are also utilised by other projects within Grab, beyond advertising, thus sharing the cost. The audience embeddings and thresholds are produced with a low-cost small neural network.

In summary, our approach to creating scalable lookalike audiences is designed to be cost-effective, accurate, and efficient, leveraging the power of embeddings and smart computational strategies to deliver the best possible audiences for our advertisers.

Solution architecture

  • The advertiser creates a campaign with a custom audience, which triggers the audience creation process. During this process, the audience service stores the audience metadata provided by advertisers in a message queue.
  • A scheduled Data Science (DS) job then retrieves the pending audience metadata, creates the audience, and updates the TensorFlow Serving (TFS) model.
  • During the serving period, the Backend (BE) service calls the DS service to retrieve all audiences that include the target user. Ads that are targeting these audiences are then selected by the Click-Through Rate (CTR) model to be displayed to the user.

Implementation

To ensure the efficiency of the lookalike audience retrieval model and minimise the costs associated with audience creation and serving, we’ve trained the user embedding model using billions of user actions. This extensive training allows us to employ straightforward methods for audience creation and serving, while still maintaining high levels of accuracy.

Creating lookalike audiences

The Audience Creation Job retrieves the audience metadata from the online audience service, pulls the passenger embeddings, and then averages these embeddings to generate the audience embedding.

We use the cosine score of a user and the audience embedding to identify the audiences the user belongs to. Hence, it’s sufficient to store only the audience embedding and score threshold. Additionally, a global target-all-pax Audience list is stored to return these audiences for each online request.

Serving lookalike audiences

The online audience service is also tasked with returning all the audiences to which the current user belongs. This is achieved by utilising the cosine score of the user embedding and audience embeddings, and filtering out all audiences that surpass the audience thresholds.

To adhere to latency requirements, we avoid querying any external feature stores like Redis and instead, store all the embeddings in memory. However, the embeddings of all users are approximately 20 GB, which could affect model loading. Therefore, we devised an embedding compression method based on hash tricks inspired by Bloom Filter.

  • We utilise hash functions to obtain the hash64 value of the paxID, which is then segmented into four 16-bit values. Each 16-bit value corresponds to a 16-dimensional embedding block, and the compressed embedding is the concatenation of these four 16-dimensional embeddings.
  • For each paxID, we have both the original user embedding and the compressed user embedding. The compressed user embeddings are learned by minimising the Mean Square Error loss.
  • We can balance the storage cost and the accuracy by altering the number of hash functions used.

Impact

  • Users can see advertisements targeting a new audience within 15 mins after the advertiser creates a campaign.
  • This new system doubled the impressions and clicks, while also improving the CTR, conversion rate, and return on investment.
  • Costs for generating lookalike audiences decreased by 98%.

Learnings/Conclusion

To evaluate the effectiveness of our new scalable system besides addressing these issues, we conducted an A/B test to compare it with the earlier system. The results revealed that this new system effectively doubled the number of impressions and clicks while also enhancing the CTR, conversion rate, and return on investment.

Over the years, we have amassed over billions of user actions, which have been instrumental in training the model and creating a comprehensive representation of user interests in the form of embeddings.

What’s next?

While this scalable system has proved its effectiveness and demonstrated impressive results in CTR, conversion rate, and return on investment, there is always room for improvement.  

In the next phase, we plan to explore more advanced algorithms, refine our feature engineering process, and conduct more extensive hyperparameter tuning. Additionally, we will continue to monitor the system’s performance and make necessary adjustments to ensure it remains robust and effective in serving our advertisers’ needs.

References

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Building hyperlocal GrabMaps

Post Syndicated from Grab Tech original https://engineering.grab.com/building-hyperlocal-grabmaps

Introduction

Southeast Asia (SEA) is a dynamic market, very different from other parts of the world. When travelling on the road, you may experience fast-changing road restrictions, new roads appearing overnight, and high traffic congestion. To address these challenges, GrabMaps has adapted to the SEA market by leveraging big data solutions. One of the solutions is the integration of hyperlocal data in GrabMaps.

Hyperlocal information is oriented around very small geographical communities and obtained from the local knowledge that our map team gathers. The map team is spread across SEA, enabling us to define clear specifications (e.g. legal speed limits), and validate that our solutions are viable.

Figure 1 – Map showing detections from images and probe data, and hyperlocal data.

Hyperlocal inputs make our mapping data even more robust, adding to the details collected from our image and probe detection pipelines. Figure 1 shows how data from our detection pipeline is overlaid with hyperlocal data, and then mapped across the SEA region. If you are curious and would like to check out the data yourself, you can download it here.

Processing hyperlocal data

Now let’s go through the process of detecting hyperlocal data.

Download data

GrabMaps is based on OpenStreetMap (OSM). The first step in the process is to download the .pbf file for Asia from geofabrick.de. This .pbf file contains all the data that is available on OSM, such as details of places, trees, and roads. Take for example a park, the .pbf file would contain data on the park name, wheelchair accessibility, and many more.

For this article, we will focus on hyperlocal data related to the road network. For each road, you can obtain data such as the type of road (residential or motorway), direction of traffic (one-way or more), and road name.

Convert data

To take advantage of big data computing, the next step in the process is to convert the .pbf file into Parquet format using a Parquetizer. This will convert the binary data in the .pbf file into a table format. Each road in SEA is now displayed as a row in a table as shown in Figure 2.

Figure 2 – Road data in Parquet format.

Identify hyperlocal data

After the data is prepared, GrabMaps then identifies and inputs all of our hyperlocal data, and delivers a consolidated view to our downstream services. Our hyperlocal data is obtained from various sources, either by looking at geometry, or other attributes in OSM such as the direction of travel and speed limit. We also apply customised rules defined by our local map team, all in a fully automated manner. This enhances the map together with data obtained from our rides and deliveries GPS pings and from KartaView, Grab’s product for imagery collection.

Figure 3 – Architecture diagram showing how hyperlocal data is integrated into GrabMaps.

Benefit of our hyperlocal GrabMaps

GrabNav, a turn-by-turn navigation tool available on the Grab driver app, is one of our products that benefits from having hyperlocal data. Here are some hyperlocal data that are made available through our approach:

  • Localisation of roads: The country, state/county, or city the road is in
  • Language spoken, driving side, and speed limit
  • Region-specific default speed regulations
  • Consistent name usage using language inference
  • Complex attributes like intersection links

To further explain the benefits of this hyperlocal feature, we will use intersection links as an example. In the next section, we will explain how intersection links data is used and how it impacts our driver-partners and passengers.

An intersection link is when two or more roads meet. Figure 4 and 5 illustrates what an intersection link looks like in a GrabMaps mock and in OSM.

Figure 4 – Mock of an intersection link.
Figure 5 – Intersection link illustration from a real road network in OSM.

To locate intersection links in a road network, there are computations involved. We would first combine big data processing (which we do using Spark) with graphs. We use geohash as the unit of processing, and for each geohash, a bi-directional graph is created.

From such resulting graphs, we can determine intersection links if:

  • Road segments are parallel
  • The roads have the same name
  • The roads are one way roads
  • Angles and the shape of the road are in the intervals or requirements we seek

Each intersection link we identify is tagged in the map as intersection_links. Our downstream service teams can then identify them by searching for the tag.

Impact

The impact we create with our intersection link can be explained through the following example.

Figure 6 – Longer route, without GrabMaps intersection link feature. The arrow indicates where the route should have suggested a U-turn.
Figure 7 – Shorter route using GrabMaps by taking a closer link between two main roads.

Figure 6 and Figure 7 show two different routes for the same origin and destination. However, you can see that Figure 7 has a shorter route and this is made available by taking an intersection link early on in the route. The highlighted road segment in Figure 7 is an intersection link, tagged by the process we described earlier. The route is now much shorter making GrabNav more efficient in its route suggestion.

There are numerous factors that can impact a driver-partner’s trip, and intersection links are just one example. There are many more features that GrabMaps offers across Grab’s services that allow us to “outserve” our partners.

Conclusion

GrabMaps and GrabNav deliver enriched experiences to our driver-partners. By integrating certain hyperlocal data features, we are also able to provide more accurate pricing for both our driver-partners and passengers. In our mission towards sustainable growth, this is an area that we will keep on improving by leveraging scalable tech solutions.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Workers KV is faster than ever with a new architecture

Post Syndicated from Charles Burnett original http://blog.cloudflare.com/faster-workers-kv-architecture/

Workers KV is faster than ever with a new architecture

Workers KV is faster than ever with a new architecture

We’re excited to announce a significant performance improvement coming to Workers KV, focused on dramatically improving cold read performance and reducing latency, even for long tail access patterns.

Developers using KV have seen great performance on hot reads, but ask why their 95th percentile latency — often on a key (or set of keys) that hadn’t been accessed recently or in that region — was higher than expected. We took this feedback to heart: we’ve been working feverishly on a new caching layer for KV behind the scenes, which enables customers to achieve much more frequent hot reads, reduced worst case latency times, better flexibility and control over cache TTLs, and much faster consistency over our previous iterations, and it’s now live for all KV users.

The best part? Developers using KV don’t need to change anything to benefit from this increased performance.

What is Workers KV?

Workers KV is a key value store designed for read heavy use-cases and applications powered by Cloudflare’s network. KV’s focus on read-heavy use-cases allows it to serve hot (cached) reads in milliseconds, which makes it ideal for storing per-application or customer configuration data, routing configuration, multivariate (A/B testing) configurations, and even small asset data that you need to serve quickly.  Anything that you can serialize and need quickly you can store in KV, all the way up to 25 MiB worth of data per each individual key, with no cap on total data stored.

The problem

KV might be optimized for read-heavy workloads, but it’s critical that writes are globally available quickly enough that they’re useful for your application. Under typical conditions, the convergence delay for an eventually consistent system like KV is approximately one minute, globally: a write from one location should be able to be observed by all readers. Typical conditions are great, but typical unfortunately didn’t mean “always”. It could take significant time to restore global consistency where regions like North America and Europe are reading the same value. We needed to improve not just the average convergence, but the worst case as well.

Speaking of consistency, setting a long cache Time to Live (cacheTTL) for reads would result in a situation where you won’t notice a write for the entire cacheTTL duration, as the existing cached data had not timed out yet. This means you have to trade off read latency for infrequently accessed keys against noticing writes. Developers using KV have been consistent in their feedback: a higher cache TTL should improve performance, but not multiply the time it takes for KV to converge on a write to that key.

Lastly, our cold read times also left room for improvement. While cache hits are fast in KV, a cache miss would result in a request being routed all the way to our storage backends. While this is slow for everyone, it was particularly slow for folks in regions not immediately in the US or EU.This is poor performance that doesn’t represent what we can achieve with our global presence.

Our solution

A new horizontally scaled tiered cache

We’ve revamped Workers KV to be powered by a new tiered cache implementation. This implementation is written as a Worker service. We reuse the Dynamic Dispatch infrastructure developed for Workers for Platforms which lets us jump from our old KV worker into our new caching service within hundreds of microseconds. Importantly, this means we don’t impact cache hit performance to implement this new transparent caching layer. We leverage the same infrastructure powering Smart Placement to implement the tiering.

Before we re-designed KV, our topology looked like this:

Workers KV is faster than ever with a new architecture
All data centers in Cloudflare’s network can reach out to the origin in the event of a cache miss or to do a background refresh.

Cache TTL and efficiency

Our design goal was clear and ambitious: “can we relax honoring the cacheTTL constraint without violating it”? While this seems contradictory, the motivation is clear: we want to minimize the need to communicate with our storage backends while honoring the user-facing semantics of the cacheTTL setting, as it can have security implications if violated (e.g. if you use it to store and validate security tokens). Answering this design question also manages to simultaneously solve many of the problems outlined earlier.

Comparing existing solutions

First, let’s look at the design constraints for two eventually consistent storage systems at Cloudflare: Quicksilver and Tiered CDN.

Quicksilver gives us global consistency within seconds using a push architecture to replicate the data across all machines at Cloudflare. That architecture however doesn’t scale for Workers KV’s needs, which can have terabytes of data just within one namespace. This would be too much to replicate to every single data center.

By comparison, the tiered CDN cache is a pull mechanism where each hop pulls a more recent version of the asset into the local cache on access. That scales better because we only use storage for assets that are accessed, which works well with most use-cases where the vast majority of data is never retrieved. However, a pull based architecture is insufficient because it can only let us aggregate traffic across broader regions but we still can’t decouple how long we serve from the cache from the cacheTTL.

Push based architectures let us know when an asset is updated and enable scalable storage. By blending the properties of both systems, we can decouple how long we store the assets in cache from the cacheTTL. And that’s exactly what we did: KV now uses a hybrid push/pull caching layer where data centers closest to customers will pull from the regional data centers that are a little bit farther away. Writes will broadcast to all regional data centers that a key has been updated, so that the regional data center will remove that key from the local cache.

Workers KV is faster than ever with a new architecture
Traditional regional tiered cache topology

We can solve this problem by taking advantage of the fact that we semantically understand the write operations that are happening within Workers KV:

  1. Workers KV doesn’t have one data center per region as might be typical for your zone in a Cloudflare CDN regional tiered cache topology. Instead, each key in a KV namespace is deterministically assigned a data center by performing a weighted rendezvous hash. The rendezvous hash ensures that load is distributed equally across the region and outages result in optimal shifts of traffic.
  2. When the data center closest to a customer has a miss, it computes the regional data center affinity and provides that information to our Smart Placement infrastructure. When a regional tier misses, we repeat this process except using data centers in the KV origin region.
  3. Finally, a miss at the upper tier exits to our storage nodes located in that origin region.

When we do a write, we only purge (invalidate) the key from the regional and upper tier data centers. This is a fixed number of data centers in our network regardless of how many data centers we add, which ensures that we aren’t reducing cache hit rates as our network continues to grow Compared with a global purge that delivers the event to every data center in our network, because we only need to deliver this purge to a random fixed set of data centers in our network, our aggregate write capacity for Workers KV automatically scales horizontally as we add more data centers.

Workers KV is faster than ever with a new architecture
All lower-tier data centers will reach out to a regional tier responsible for a given key in the event of a cache miss. If the regional tier doesn’t have the content, the regional tier will then ask an upper-tier out of region for the content. On a write for a given key, the responsible regional and upper tiers have that key deleted from cache.

Why do we call this a hybrid topology? The data centers closest to customers pull from the regional data centers as normal, but we automatically push invalidation events to the regional tier data centers on every write. That way, those customer data center pulls know to get an updated value when there is one. This means that while the cacheTTL parameter controls the caching behavior closest to the customer, it’s treated as a suggestion at best at the regional and upper tiers.

This way we’ve combined the push design principles behind Quicksilver, which delivers global consistency within seconds, with the pull-based design of our CDN tiered caching which can scale to handle “infinite” size workloads and prioritizes the assets that are most frequently accessed.

Visualizing it

It can be a bit hard to follow what’s happening in the new caching layer since there’s so many moving parts.

Here’s a video of a simplified version of how it works:

Small yellow balls represent KV read requests, larger green balls represent read responses. A larger purple ball represents a KV write request, while a read response ball represents a KV write response. Teal balls represent purge requests being broadcast. The “E” is a data center that doesn’t participate as a regional tier. The R represents the regional tier for key N while O is the upper tier for key N.

Decoupled cache TTL and consistency parameters

As a refresher, the objects written to KV can specify a cacheTTL: by default this is set to 1 minute, which is also the minimum acceptable value. This means that if an asset has been in the cache for longer than a minute, we bypass the cache and read instead from our durable storage nodes. In order to prevent eyeballs noticing origin fetches every minute, we implement stale while revalidate logic in our caching layer that automatically refreshes from the storage nodes in the background as requests come in.

Workers KV is faster than ever with a new architecture
Here’s an example from a Worker that’s constantly reading the same key

Notice the absence of any spikes indicating a cache miss? You’d expect to see them regularly every minute or so in the tens or even hundreds of milliseconds when the cacheTTL should expire. The reason this doesn’t happen is because as the expiry time is approaching, a background request to the storage nodes occurs and the cache is updated with an expiry time one more minute into the future; thus the asset in cache is never too stale and eyeball requests are always served from cache. Let’s take a look at requests to our storage layer before and after adding tiering:

Workers KV is faster than ever with a new architecture
Yellow is the estimated number of requests that would have occurred to origin without the new caching layer. Blue is the number of requests we’re making now.

The above chart is for a system with conservative parameters set. The upper tier doesn’t store the data for much longer than the cacheTTL currently and the upper tier will itself still do a background refresh probabilistically even though it doesn’t actually need to since we see all writes.

The new caching layer we’ve built inherits the old background refresh mechanism and expands on it. The first thing we did is decouple the background refresh period from the cacheTTL as a separate parameter (also defaulting to 1 minute). This means that even if you set a cacheTTL for 1 hour, KV will still check every minute from the regional tier to see if the value has been updated. If the data you’re storing within KV doesn’t have strict requirements on stale reads (think a key that’s accessed once every 10 minutes but needs to honor a write within 1 minute like security tokens), then you can increase the cacheTTL so that infrequently accessed keys stick around in the cache without changing the observed consistency.

Consistency improvements

Speaking of consistency, we’ve improved the worst case performance of that as well. Historically, we’ve had a background system that crawls all data in the storage nodes to figure out which region has the most up to date value and update accordingly. This gives us complete consistency coverage, but could take a significant amount of time to confirm. We would also periodically check both backends to see if network conditions had changed to pick the primary storage region to use for a given customer-close data center. Of course inconsistencies would be resolved then, but in practice this happens randomly, and at a low probability that won’t typically catch any meaningful values served inconsistently.

With the new caching layer all this changes. Since we’re now only reading keys on first access or after a write, we have enough storage capacity that we can check both backends on every read. When a customer requests data, we make a call to each origin data center, with the fastest response being returned immediately to reduce read latency. If the other data center has a newer value than what was returned first, we synchronize both data centers and notify our caching layer to purge that key from all regional data centers. If the other data center instead has an older value, we just synchronize the data centers without purging since we served the latest value. This means that even if our data centers are inconsistent, readers will notice new values much more quickly.

Latency improvements

Here’s the latency improvement at 10% rollout on a logarithmic x-axis:

Workers KV is faster than ever with a new architecture

Architecture that just gets better

This is just the start of what we can do. We now have a solid foundation for making further improvements, including making our best case reads even faster. We’ll be working on cutting out parts of our traditional stack that add unnecessary latency, and adding new high performance features that were too difficult to integrate otherwise. We can also explore features like setting the consistency TTL parameter for sub one minute consistency for additional cost. Similarly, we could create a best effort global purge feature if you want to choose to signal writes that way. Finally, we’re looking at exposing this new caching layer as a general Worker binding anyone can use within a Worker in front of their own service or to put in front of their Worker. If these sound like the killer features you need, please reach out to us if you’re interested in trying them out.

What next?

Developers don’t have to do anything to benefit from KV’s new performance improvements. We are currently in the process of rolling out our new architecture, and you don’t have to redeploy your Worker or change the way you use KV to benefit.

Workers KV is a natural fit for any application built on top of our Workers platform. We provide a native API that enables any Worker script to read, write, list, and manageyour Workers KV storage. You can also interact with Workers KV directly via our REST API from any client that can make a HTTP request, and the Cloudflare Dashboard provides an easy way to create, list, and delete keys to be used with the rest of your Workers setup.

Regardless of how you use Workers KV, it will be faster than ever before. We’re excited to see what you build with us, and you can dive into our documentation to start building with it.

How Cloudflare instruments services using Workers Analytics Engine

Post Syndicated from Jen Sells original https://blog.cloudflare.com/using-analytics-engine-to-improve-analytics-engine/

How Cloudflare instruments services using Workers Analytics Engine

How Cloudflare instruments services using Workers Analytics Engine

Workers Analytics Engine is a new tool, announced earlier this year, that enables developers and product teams to build time series analytics about anything, with high dimensionality, high cardinality, and effortless scaling. We built Analytics Engine for teams to gain insights into their code running in Workers, provide analytics to end customers, or even build usage based billing.

In this blog post we’re going to tell you about how we use Analytics Engine to build Analytics Engine. We’ve instrumented our own Analytics Engine SQL API using Analytics Engine itself and use this data to find bugs and prioritize new product features. We hope this serves as inspiration for other teams who are looking for ways to instrument their own products and gather feedback.

Why do we need Analytics Engine?

Analytics Engine enables you to generate events (or “data points”) from Workers with just a few lines of code. Using the GraphQL or SQL API, you can query these events and create useful insights about the business or technology stack. For more about how to get started using Analytics Engine, check out our developer docs.

Since we released the Analytics Engine open beta in September, we’ve been adding new features at a rapid clip based on feedback from developers. However, we’ve had two big gaps in our visibility into the product.

First, our engineering team needs to answer classic observability questions, such as: how many requests are we getting, how many of those requests result in errors, what are the nature of these errors, etc. They need to be able to view both aggregated data (like average error rate, or p99 response time) and drill into individual events.

Second, because this is a newly launched product, we are looking for product insights. By instrumenting the SQL API, we can understand the queries our customers write, and the errors they see, which helps us prioritize missing features.

We realized that Analytics Engine would be an amazing tool for both answering our technical observability questions, and also gathering product insight. That’s because we can log an event for every query to our SQL API, and then query for both aggregated performance issues as well as individual errors and queries that our customers run.

In the next section, we’re going to walk you through how we use Analytics Engine to monitor that API.

Adding instrumentation to our SQL API

The Analytics Engine SQL API lets you query events data in the same way you would an ordinary database. For decades, SQL has been the most common language for querying data. We wanted to provide an interface that allows you to immediately start asking questions about your data without having to learn a new query language.

Our SQL API parses user SQL queries, transforms and validates them, and then executes them against backend database servers. We then write information about the query back into Analytics Engine so that we can run our own analytics.
Writing data into Analytics Engine from a Cloudflare Worker is very simple and explained in our documentation. We instrument our SQL API in the same way our users do, and this code excerpt shows the data we write into Analytics Engine:

How Cloudflare instruments services using Workers Analytics Engine

With that data now being stored in Analytics Engine, we can then pull out insights about every field we’re reporting.

Querying for insights

Having our analytics in an SQL database gives you the freedom to write any query you might want. Compared to using something like metrics which are often predefined and purpose specific, you can define any custom dataset desired, and interrogate your data to ask new questions with ease.

We need to support datasets comprising trillions of data points. In order to accomplish this, we have implemented a sampling method called Adaptive Bit Rate (ABR). With ABR, if you have large amounts of data, your queries may be returned sampled events in order to respond in reasonable time. If you have more typical amounts of data, Analytics Engine will query all your data. This allows you to run any query you like and still get responses in a short length of time. Right now, you have to account for sampling in how you make your queries, but we are exploring making it automatic.

Any data visualization tool can be used to visualize your analytics. At Cloudflare, we heavily use Grafana (and you can too!). This is particularly useful for observability use cases.

Observing query response times

One query we pay attention to gives us information about the performance of our backend database clusters:

How Cloudflare instruments services using Workers Analytics Engine

As you can see, the 99% percentile (corresponding to the 1% most complex queries to execute) sometimes spikes up to about 300ms. But on average our backend responds to queries within 100ms.

This visualization is itself generated from an SQL query:

How Cloudflare instruments services using Workers Analytics Engine

Customer insights from high-cardinality data

Another use of Analytics Engine is to draw insights out of customer behavior. Our SQL API is particularly well-suited for this, as you can take full advantage of the power of SQL. Thanks to our ABR technology, even expensive queries can be carried out against huge datasets.

We use this ability to help prioritize improvements to Analytics Engine. Our SQL API supports a fairly standard dialect of SQL but isn’t feature-complete yet. If a user tries to do something unsupported in an SQL query, they get back a structured error message. Those error messages are reported into Analytics Engine. We’re able to aggregate the kinds of errors that our customers encounter, which helps inform which features to prioritize next.

How Cloudflare instruments services using Workers Analytics Engine

The SQL API returns errors in the format of type of error: more details, and so we can take the first portion before the colon to give us the type of error. We group by that, and get a count of how many times that error happened and how many users it affected:

How Cloudflare instruments services using Workers Analytics Engine

To perform the above query using an ordinary metrics system, we would need to represent each error type with a different metric. Reporting that many metrics from each microservice creates scalability challenges. That problem doesn’t happen with Analytics Engine, because it’s designed to handle high-cardinality data.

Another big advantage of a high-cardinality store like Analytics Engine is that you can dig into specifics. If there’s a large spike in SQL errors, we may want to find which customers are having a problem in order to help them or identify what function they want to use. That’s easy to do with another SQL query:

How Cloudflare instruments services using Workers Analytics Engine

Inside Cloudflare, we have historically relied on querying our backend database servers for this type of information. Analytics Engine’s SQL API now enables us to open up our technology to our customers, so they can easily gather insights about their services at any scale!

Conclusion and what’s next

The insights we gathered about usage of the SQL API are a super helpful input to our product prioritization decisions. We already added support for substring and position functions which were used in the visualizations above.

Looking at the top SQL errors, we see numerous errors related to selecting columns. These errors are mostly coming from some usability issues related to the Grafana plugin. Adding support for the DESCRIBE function should alleviate this because without this, the Grafana plugin doesn’t understand the table structure. This, as well as other improvements to our Grafana plugin, is on our roadmap.

We also can see that users are trying to query time ranges for older data that no longer exists. This suggests that our customers would appreciate having extended data retention. We’ve recently extended our retention from 31 to 92 days, and we will keep an eye on this to see if we should offer further extension.

We saw lots of errors related to common mistakes or misunderstandings of proper SQL syntax. This indicates that we could provide better examples or error explanations in our documentation to assist users with troubleshooting their queries.

Stay tuned into our developer docs to be informed as we continue to iterate and add more features!

You can start using Workers Analytics Engine Now! Analytics Engine is now in open beta with free 90-day retention. Start using it  today or join our Discord community to talk with the team.

The home page for Internet insights: Cloudflare Radar 2.0

Post Syndicated from Joao Sousa Botto original https://blog.cloudflare.com/radar2/

The home page for Internet insights: Cloudflare Radar 2.0

The home page for Internet insights: Cloudflare Radar 2.0

Cloudflare Radar was launched two years ago to give everyone access to the Internet trends, patterns and insights Cloudflare uses to help improve our service and protect our customers.

Until then, these types of insights were only available internally at Cloudflare. However, true to our mission of helping build a better Internet, we felt everyone should be able to look behind the curtain and see the inner workings of the Internet. It’s hard to improve or understand something when you don’t have clear visibility over how it’s working.

On Cloudflare Radar you can find timely graphs and visualizations on Internet traffic, security and attacks, protocol adoption and usage, and outages that might be affecting the Internet. All of these can be narrowed down by timeframe, country, and Autonomous System (AS). You can also find interactive deep dive reports on important subjects such as DDoS and the Meris Botnet. It’s also possible to search for any domain name to see details such as SSL usage and which countries their visitors are coming from.

Since launch, Cloudflare Radar has been used by NGOs to confirm the Internet disruptions their observers see in the field, by journalists looking for Internet trends related to an event in a country of interest or at volume of cyberattacks as retaliation to political sanctions, by analysts looking at the prevalence of new protocols and technologies, and even by brand PR departments using Cloudflare Radar data to analyze the online impact of a major sports event.

Cloudflare Radar has clearly become an important tool for many and, most importantly, we find it has helped shed light on parts of the Internet that deserve more attention and investment.

The home page for Internet insights: Cloudflare Radar 2.0

Introducing Cloudflare Radar 2.0

What has made Cloudflare Radar so valuable is that the data and insights it contains are unique and trustworthy. Cloudflare Radar shows aggregate data from across the massive spectrum of Internet traffic we see every day, presenting you with datasets you won’t find elsewhere.

However, there were still gaps. Today, on the second anniversary of Cloudflare Radar, we are launching Cloudflare Radar 2.0 in beta. It will address three common pieces of feedback from users:

  • Ease of finding insights and data. The way information was structured on Cloudflare Radar made finding information daunting for some people. We are redesigning Cloudflare Radar so that it becomes a breeze.
  • Number of insights. We know many users have wanted to see insights about other important parts of the Internet, such as email. We have also completely redesigned the Cloudflare Radar backend so that we can quickly add new insights over the coming months (including insights into email).
  • Sharing insights. The options for sharing Cloudflare Radar insights were limited. We will now provide you the options you want, including downloadable and embeddable graphs, sharing to social media platforms, and an API.

Finding insights and data

On a first visit to the redesigned Cloudflare Radar homepage one will notice:

  • Prominent and intuitive filtering capabilities on the top bar. A global search bar is also coming soon.
  • Content navigation on the sidebar.
  • Content cards showing glanceable and timely information.
The home page for Internet insights: Cloudflare Radar 2.0

The content you find on the homepage are what we call “quick bytes”. Those link you to more in-depth content for that specific topic, which can also be found through the sidebar navigation.

At the top of the page you can search for a country, autonomous system number (ASN), domain, or report to navigate to a home page for that specific content. For example, the domain page for google.com:

The home page for Internet insights: Cloudflare Radar 2.0

The navigation sidebar allows you to find more detailed insights and data related to Traffic, Security & Attacks, Adoption & Usage, and Domains. (We will be adding additional topic areas in the future.) It also gives you quick access to the Cloudflare Radar Outage Center, a tool for tracking Internet disruptions around the world and to which we are dedicating a separate blog post, and to Radar Reports, which are interactive deep dive reports on important subjects such as DDoS and the Meris Botnet.

The home page for Internet insights: Cloudflare Radar 2.0

Within these topic pages (such as the one for Adoption & Usage shown above), you will find the quick bytes for the corresponding topic at the top, and quick bytes for related topics on the right. The quick bytes on the right allow you to quickly glance at and navigate to related sections.

In the middle of the page are the more detailed charts for the topic you’re exploring.

Sharing insights

Cloudflare Radar’s reason to exist is to make Internet insights available to everyone, but historically we haven’t been as flexible as our users would want. People could download a snapshot of the graph, but not much more.

With Cloudflare Radar 2.0 we will be introducing three major new ways of using Radar insights and data:

  • Social share. Cloudflare Radar 2.0 charts have a more modern and clean look and feel, and soon you’ll be able to share them directly on the social media platform of your choice. No more dealing with low quality screenshots.
  • Embeddable charts. The beautiful charts will also be able to be embedded directly into your webpage or blog – it will work just like a widget, always showing up-to-date information.
  • API. If you like the data on Cloudflare Radar but want to manipulate it further for analysis, visualization, or for posting your own charts, you’ll have the Cloudflare Radar API available to you starting today.

For example, the last seven days of HTTP traffic data for Portugal can be obtained from https://api.cloudflare.com/client/v4/radar/http/timeseries/device_type?dateRange=7d&location=PT

Note: The API is available today. To use the Cloudflare API you need an API token or key (more details here). Embedding charts and sharing directly to social are new features to be released later this year.

Technology changes

Cloudflare Radar 2.0 was built on a new technology stack; we will write a blog post about why and how we did it soon. A lot changed: we now have proper GraphQL data endpoints and a public API, the website runs on top of Cloudflare Pages and Workers, and we’re finally doing server-side rendering using Remix. We adopted SVG whenever possible, built our reusable data visualization components system, and are using Cosmos for visual TDD. These foundational changes will provide a better UX/UI to our users and give us speed when iterating and improving Cloudflare Radar in the future.

We hope you find this update valuable, and recommend you keep an eye on radar.cloudflare.com to see the new insights and topics we’ll be adding regularly. If you have any feedback, please send it to us through the Cloudflare Community.

Integrating Network Analytics Logs with your SIEM dashboard

Post Syndicated from Omer Yoachimik original https://blog.cloudflare.com/network-analytics-logs/

Integrating Network Analytics Logs with your SIEM dashboard

Integrating Network Analytics Logs with your SIEM dashboard

We’re excited to announce the availability of Network Analytics Logs. Magic Transit, Magic Firewall, Magic WAN, and Spectrum customers on the Enterprise plan can feed packet samples directly into storage services, network monitoring tools such as Kentik, or their Security Information Event Management (SIEM) systems such as Splunk to gain near real-time visibility into network traffic and DDoS attacks.

What’s included in the logs

By creating a Network Analytics Logs job, Cloudflare will continuously push logs of packet samples directly to the HTTP endpoint of your choice, including Websockets. The logs arrive in JSON format which makes them easy to parse, transform, and aggregate. The logs include packet samples of traffic dropped and passed by the following systems:

  1. Network-layer DDoS Protection Ruleset
  2. Advanced TCP Protection
  3. Magic Firewall

Note that not all mitigation systems are applicable to all Cloudflare services. Below is a table describing which mitigation service is applicable to which Cloudflare service:

Mitigation System Cloudflare Service
Magic Transit Magic WAN Spectrum
Network-layer DDoS Protection Ruleset
Advanced TCP Protection
Magic Firewall

Packets are processed by the mitigation systems in the order outlined above. Therefore, a packet that passed all three systems may produce three packet samples, one from each system. This can be very insightful when troubleshooting and wanting to understand where in the stack a packet was dropped. To avoid overcounting the total passed traffic, Magic Transit users should only take into consideration the passed packets from the last mitigation system, Magic Firewall.

An example of a packet sample log:

{"AttackCampaignID":"","AttackID":"","ColoName":"bkk06","Datetime":1652295571783000000,"DestinationASN":13335,"Direction":"ingress","IPDestinationAddress":"(redacted)","IPDestinationSubnet":"/24","IPProtocol":17,"IPSourceAddress":"(redacted)","IPSourceSubnet":"/24","MitigationReason":"","MitigationScope":"","MitigationSystem":"magic-firewall","Outcome":"pass","ProtocolState":"","RuleID":"(redacted)","RulesetID":"(redacted)","RulesetOverrideID":"","SampleInterval":100,"SourceASN":38794,"Verdict":"drop"}

All the available log fields are documented here: https://developers.cloudflare.com/logs/reference/log-fields/account/network_analytics_logs/

Setting up the logs

In this walkthrough, we will demonstrate how to feed the Network Analytics Logs into Splunk via Postman. At this time, it is only possible to set up Network Analytics Logs via API. Setting up the logs requires three main steps:

  1. Create a Cloudflare API token.
  2. Create a Splunk Cloud HTTP Event Collector (HEC) token.
  3. Create and enable a Cloudflare Logpush job.

Let’s get started!

1) Create a Cloudflare API token

  1. Log in to your Cloudflare account and navigate to My Profile.
  2. On the left-hand side, in the collapsing navigation menu, click API Tokens.
  3. Click Create Token and then, under Custom token, click Get started.
  4. Give your custom token a name, and select an Account scoped permission to edit Logs. You can also scope it to a specific/subset/all of your accounts.
  5. At the bottom, click Continue to summary, and then Create Token.
  6. Copy and save your token. You can also test your token with the provided snippet in Terminal.

When you’re using an API token, you don’t need to provide your email address as part of the API credentials.

Integrating Network Analytics Logs with your SIEM dashboard

Read more about creating an API token on the Cloudflare Developers website: https://developers.cloudflare.com/api/tokens/create/

2) Create a Splunk token for an HTTP Event Collector

In this walkthrough, we’re using a Splunk Cloud free trial, but you can use almost any service that can accept logs over HTTPS. In some cases, if you’re using an on-premise SIEM solution, you may need to allowlist Cloudflare IP address in your firewall to be able to receive the logs.

  1. Create a Splunk Cloud account. I created a trial account for the purpose of this blog.
  2. In the Splunk Cloud dashboard, go to Settings > Data Input.
  3. Next to HTTP Event Collector, click Add new.
  4. Follow the steps to create a token.
  5. Copy your token and your allocated Splunk hostname and save both for later.
Integrating Network Analytics Logs with your SIEM dashboard

Read more about using Splunk with Cloudflare Logpush on the Cloudflare Developers website: https://developers.cloudflare.com/logs/get-started/enable-destinations/splunk/

Read more about creating an HTTP Event Collector token on Splunk’s website: https://docs.splunk.com/Documentation/Splunk/8.2.6/Data/UsetheHTTPEventCollector

3) Create a Cloudflare Logpush job

Creating and enabling a job is very straightforward. It requires only one API call to Cloudflare to create and enable a job.

To send the API calls I used Postman, which is a user-friendly API client that was recommended to me by a colleague. It allows you to save and customize API calls. You can also use Terminal/CMD or any other API client/script of your choice.

One thing to notice is Network Analytics Logs are account-scoped. The API endpoint is therefore a tad different from what you would normally use for zone-scoped datasets such as HTTP request logs and DNS logs.

This is the endpoint for creating an account-scoped Logpush job:

https://api.cloudflare.com/client/v4/accounts/{account-id}/logpush/jobs

Your account identifier number is a unique identifier of your account. It is a string of 32 numbers and characters. If you’re not sure what your account identifier is, log in to Cloudflare, select the appropriate account, and copy the string at the end of the URL.

https://dash.cloudflare.com/{account-id}

Then, set up a new request in Postman (or any other API client/CLI tool).

To successfully create a Logpush job, you’ll need the HTTP method, URL, Authorization token, and request body (data). The request body must include a destination configuration (destination_conf), the specified dataset (network_analytics_logs, in our case), and the token (your Splunk token).

Integrating Network Analytics Logs with your SIEM dashboard

Method:

POST

URL:

https://api.cloudflare.com/client/v4/accounts/{account-id}/logpush/jobs

Authorization: Define a Bearer authorization in the Authorization tab, or add it to the header, and add your Cloudflare API token.

Body: Select a Raw > JSON

{
"destination_conf": "{your-unique-splunk-configuration}",
"dataset": "network_analytics_logs",
"token": "{your-splunk-hec-tag}",
"enabled": "true"
}

If you’re using Splunk Cloud, then your unique configuration has the following format:

{your-unique-splunk-configuration}=splunk://{your-splunk-hostname}.splunkcloud.com:8088/services/collector/raw?channel={channel-id}&header_Authorization=Splunk%20{your-splunk–hec-token}&insecure-skip-verify=false

Definition of the variables:

{your-splunk-hostname}= Your allocated Splunk Cloud hostname.

{channel-id} = A unique ID that you choose to assign for.`{your-splunk–hec-token}` = The token that you generated for your Splunk HEC.

An important note is that customers should have a valid SSL/TLS certificate on their Splunk instance to support an encrypted connection.

After you’ve done that, you can create a GET request to the same URL (no request body needed) to verify that the job was created and is enabled.

The response should be similar to the following:

{
    "errors": [],
    "messages": [],
    "result": {
        "id": {job-id},
        "dataset": "network_analytics_logs",
        "frequency": "high",
        "kind": "",
        "enabled": true,
        "name": null,
        "logpull_options": null,
        "destination_conf": "{your-unique-splunk-configuration}",
        "last_complete": null,
        "last_error": null,
        "error_message": null
    },
    "success": true
}

Shortly after, you should start receiving logs to your Splunk HEC.

Integrating Network Analytics Logs with your SIEM dashboard

Read more about enabling Logpush on the Cloudflare Developers website: https://developers.cloudflare.com/logs/reference/logpush-api-configuration/examples/example-logpush-curl/

Reduce costs with R2 storage

Depending on the amount of logs that you read and write, the cost of third party cloud storage can skyrocket — forcing you to decide between managing a tight budget and being able to properly investigate networking and security issues. However, we believe that you shouldn’t have to make those trade-offs. With R2’s low costs, we’re making this decision easier for our customers. Instead of feeding logs to a third party, you can reap the cost benefits of storing them in R2.

To learn more about the R2 features and pricing, check out the full blog post. To enable R2, contact your account team.

Cloudflare logs for maximum visibility

Cloudflare Enterprise customers have access to detailed logs of the metadata generated by our products. These logs are helpful for troubleshooting, identifying network and configuration adjustments, and generating reports, especially when combined with logs from other sources, such as your servers, firewalls, routers, and other appliances.

Network Analytics Logs joins Cloudflare’s family of products on Logpush: DNS logs, Firewall events, HTTP requests, NEL reports, Spectrum events, Audit logs, Gateway DNS, Gateway HTTP, and Gateway Network.

Not using Cloudflare yet? Start now with our Free and Pro plans to protect your websites against DDoS attacks, or contact us for comprehensive DDoS protection and firewall-as-a-service for your entire network.

Introducing the Customer Metadata Boundary

Post Syndicated from Jon Levine original https://blog.cloudflare.com/introducing-the-customer-metadata-boundary/

Introducing the Customer Metadata Boundary

Introducing the Customer Metadata Boundary

Data localisation has gotten a lot of attention in recent years because a number of countries see it as a way of controlling or protecting their citizens’ data. Countries such as Australia, China, India, Brazil, and South Korea have or are currently considering regulations that assert legal sovereignty over their citizens’ personal data in some fashion — health care data must be stored locally; public institutions may only contract with local service providers, etc.

In the EU, the recent “Schrems II” decision resulted in additional requirements for companies that transfer personal data outside the EU. And a number of highly regulated industries require that specific types of personal data stay within the EU’s borders.

Cloudflare is committed to helping our customers keep personal data in the EU. Last year, we introduced the Data Localisation Suite, which gives customers control over where their data is inspected and stored.

Today, we’re excited to introduce the Customer Metadata Boundary, which expands the Data Localisation Suite to ensure that a customer’s end user traffic metadata stays in the EU.

Metadata: a primer

“Metadata” can be a scary term, but it’s a simple concept — it just means “data about data.” In other words, it’s a description of activity that happened on our network. Every service on the Internet collects metadata in some form, and it’s vital to user safety and network availability.

At Cloudflare, we collect metadata about the usage of our products for several purposes:

  • Serving analytics via our dashboards and APIs
  • Sharing logs with customers
  • Stopping security threats such as bot or DDoS attacks
  • Improving the performance of our network
  • Maintaining the reliability and resiliency of our network

What does that collection look like in practice at Cloudflare? Our network consists of dozens of services: our Firewall, Cache, DNS Resolver, DDoS protection systems, Workers runtime, and more. Each service emits structured log messages, which contain fields like timestamps, URLs, usage of Cloudflare features, and the identifier of the customer’s account and zone.

These messages do not contain the contents of customer traffic, and so they do not contain things like usernames, passwords, personal information, and other private details of customers’ end users. However, these logs may contain end-user IP addresses, which is considered personal data in the EU.

Data Localisation in the EU

The EU’s General Data Protection Regulation, or GDPR, is one of the world’s most comprehensive (and well known) data privacy laws. The GDPR does not, however, insist that personal data must stay in Europe. Instead, it provides a number of legal mechanisms to ensure that GDPR-level protections are available for EU personal data if it is transferred outside the EU to a third country like the United States. Data transfers from the EU to the US were, until recently, permitted under an agreement called the EU-U.S. Privacy Shield Framework.

Shortly after the GDPR went into effect, a privacy activist named Max Schrems filed suit against Facebook for their data collection practices. In July 2020, the Court of Justice of the EU issued the “Schrems II” ruling — which, among other things, invalidated the Privacy Shield framework. However, the court upheld other valid transfer mechanisms that ensure EU personal data won’t be accessed by U.S. government authorities in a way that violates the GDPR.

Since the Schrems II decision, many customers have asked us how we’re protecting EU citizens’ data. Fortunately, Cloudflare has had data protection safeguards in place since well before the Schrems II case, such as our industry-leading commitments on government data requests. In response to Schrems II in particular, we updated our customer Data Processing Addendum (DPA). We incorporated the latest Standard Contractual Clauses, which are legal agreements approved by the EU Commission that enable data transfer. We also added additional safeguards as outlined in the EDPB’s June 2021 Recommendations on Supplementary Measures. Finally, Cloudflare’s services are certified under the ISO 27701 standard, which maps to the GDPR’s requirements.

In light of these measures, we believe that our EU customers can use Cloudflare’s services in a manner consistent with GDPR and the Schrems II decision. Still, we recognize that many of our customers want their EU personal data to stay in the EU. For example, some of our customers in industries like healthcare, law, and finance may have additional requirements.  For that reason, we have developed an optional suite of services to address those requirements. We call this our Data Localisation Suite.

How the Data Localisation Suite helps today

Data Localisation is challenging for customers because of the volume and variety of data they handle. When it comes to their Cloudflare traffic, we’ve found that customers are primarily concerned about three areas:

  1. How do I ensure my encryption keys stay in the EU?
  2. How can I ensure that services like caching and WAF only run in the EU?
  3. How can ensure that metadata is never transferred outside the EU?

To address the first concern, Cloudflare has long offered Keyless SSL and Geo Key Manager, which ensure that private SSL/TLS key material never leaves the EU. Keyless SSL ensures that Cloudflare never has possession of the private key material at all; Geo Key Manager uses Keyless SSL under the hood to ensure the keys never leave the specified region.

Last year we addressed the second concern with Regional Services, which ensures that Cloudflare will only be able to decrypt and inspect the content of HTTP traffic inside the EU. In other words, SSL connections will only be terminated in Europe, and all of our layer 7 security and performance services will only run in our EU data centers.

Today, we’re enabling customers to address the third and final concern, and keep metadata local as well.

How the Metadata Boundary Works

The Customer Metadata Boundary ensures, simply, that end user traffic metadata that can identify a customer stays in the EU. This includes all the logs and analytics that a customer sees.

How are we able to do this? All the metadata that can identify a customer flows through a single service at our edge, before being forwarded to one of our core data centers.

When the Metadata Boundary is enabled for a customer, our edge ensures that any log message that identifies that customer (that is, contains that customer’s Account ID) is not sent outside the EU. It will only be sent to our core data center in the EU, and not our core data center in the US.

Introducing the Customer Metadata Boundary

What’s next

Today our Data Localisation Suite is focused on helping our customers in the EU localise data for their inbound HTTP traffic. This includes our Cache, Firewall, DDoS protection, and Bot Management products.

We’ve heard from customers that they want data localisation for more products and more regions. This means making all of our Data Localisation Products, including Geo Key Manager and Regional Services, work globally. We’re also working on expanding the Metadata Boundary to include our Zero Trust products like Cloudflare for Teams. Stay tuned!