Tag Archives: icebergs

3. Psyberg: Automated end to end catch up

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/3-psyberg-automated-end-to-end-catch-up-260fbe366fe2

By Abhinaya Shetty, Bharath Mummadisetty

This blog post will cover how Psyberg helps automate the end-to-end catchup of different pipelines, including dimension tables.

In the previous installments of this series, we introduced Psyberg and delved into its core operational modes: Stateless and Stateful Data Processing. Now, let’s explore the state of our pipelines after incorporating Psyberg.

Pipelines After Psyberg

Let’s explore how different modes of Psyberg could help with a multistep data pipeline. We’ll return to the sample customer lifecycle:

Processing Requirement:
Keep track of the end-of-hour state of accounts, e.g., Active/Upgraded/Downgraded/Canceled.

Solution:
One potential approach here would be as follows

  1. Create two stateless fact tables :
    a. Signups
    b. Account Plans
  2. Create one stateful fact table:
    a. Cancels
  3. Create a stateful dimension that reads the above fact tables every hour and derives the latest account state.

Let’s look at how this can be integrated with Psyberg to auto-handle late-arriving data and corresponding end-to-end data catchup.

Navigating the Workflow: How Psyberg Handles Late-Arriving Data

We follow a generic workflow structure for both stateful and stateless processing with Psyberg; this helps maintain consistency and makes debugging and understanding these pipelines easier. The following is a concise overview of the various stages involved; for a more detailed exploration of the workflow specifics, please turn to the second installment of this series.

1. Psyberg Initialization

The workflow starts with the Psyberg initialization (init) step.

  • Input: List of source tables and required processing mode
  • Output: Psyberg identifies new events that have occurred since the last high watermark (HWM) and records them in the session metadata table.

The session metadata table can then be read to determine the pipeline input.

2. Write-Audit-Publish (WAP) Process

This is the general pattern we use in our ETL pipelines.

a. Write
Apply the ETL business logic to the input data identified in Step 1 and write to an unpublished iceberg snapshot based on the Psyberg mode

b. Audit
Run various quality checks on the staged data. Psyberg’s metadata session table is used to identify the partitions included in a batch run. Several audits, such as verifying source and target counts, are performed on this batch of data.

c. Publish
If the audits are successful, cherry-pick the staging snapshot to publish the data to production.

3. Psyberg Commit

Now that the data pipeline has been executed successfully, the new high watermark identified in the initialization step is committed to Psyberg’s high watermark metadata table. This ensures that the next instance of the workflow will pick up newer updates.

Callouts

  • Having the Psyberg step isolated from the core data pipeline allows us to maintain a consistent pattern that can be applied across stateless and stateful processing pipelines with varying requirements.
  • This also enables us to update the Psyberg layer without touching the workflows.
  • This is compatible with both Python and Scala Spark.
  • Debugging/figuring out what was loaded in every run is made easy with the help of workflow parameters and Psyberg Metadata.

The Setup: Automated end-to-end catchup

Let’s go back to our customer lifecycle example. Once we integrate all four components with Psyberg, here’s how we would set it up for automated catchup.

The three fact tables, comprising the signup and plan facts encapsulated in Psyberg’s stateless mode, along with the cancel fact in stateful mode, serve as inputs for the stateful sequential load ETL pipeline. This data pipeline monitors the various stages in the customer lifecycle.

In the sequential load ETL, we have the following features:

  • Catchup Threshold: This defines the lookback period for the data being read. For instance, only consider the last 12 hours of data.
  • Data Load Type: The ETL can either load the missed/new data specifically or reload the entire specified range.
  • Metadata Recording: Metadata is persisted for traceability.

Here is a walkthrough on how this system would automatically catch up in the event of late-arriving data:

Premise: All the tables were last loaded up to hour 5, meaning that any data from hour 6 onwards is considered new, and anything before that is classified as late data (as indicated in red above)

Fact level catchup:

  1. During the Psyberg initialization phase, the signup and plan facts identify the late data from hours 2 and 3, as well as the most recent data from hour 6. The ETL then appends this data to the corresponding partitions within the fact tables.
  2. The Psyberg initialization for the cancel fact identifies late data from hour 5 and additional data from hours 6 and 7. Since this ETL operates in stateful mode, the data in the target table from hours 5 to 7 will be overwritten with the new data.
  3. By focusing solely on updates and avoiding reprocessing of data based on a fixed lookback window, both Stateless and Stateful Data Processing maintain a minimal change footprint. This approach ensures data processing is both efficient and accurate.

Dimension level catchup:

  1. The Psyberg wrapper for this stateful ETL looks at the updates to the upstream Psyberg powered fact tables to determine the date-hour range to reprocess. Here’s how it would calculate the above range:
    MinHr = least(min processing hour from each source table)
    This ensures that we don’t miss out on any data, including late-arriving data. In this case, the minimum hour to process the data is hour 2.
    MaxHr = least(max processing hour from each source table)
    This ensures we do not process partial data, i.e., hours for which data has not been loaded into all source tables. In this case, the maximum hour to process the data is hour 6.
  2. The ETL process uses this time range to compute the state in the changed partitions and overwrite them in the target table. This helps overwrite data only when required and minimizes unnecessary reprocessing.

As seen above, by chaining these Psyberg workflows, we could automate the catchup for late-arriving data from hours 2 and 6. The Data Engineer does not need to perform any manual intervention in this case and can thus focus on more important things!

The Impact: How Psyberg Transformed Our Workflows

The introduction of Psyberg into our workflows has served as a valuable tool in enhancing accuracy and performance. The following are key areas that have seen improvements from using Psyberg:

  • Computational Resources Used:
    In certain instances, we’ve noticed a significant reduction in resource utilization, with the number of Spark cores used dropping by 90% following the implementation of Psyberg, compared to using fixed lookback windows
  • Workflow and Table Onboarding:
    We have onboarded 30 tables and 13 workflows into incremental processing since implementing Psyberg
  • Reliability and Accuracy:
    Since onboarding workflows to Psyberg, we have experienced zero manual catchups or missing data incidents
  • Bootstrap template:
    The process of integrating new tables into incremental processing has been made more accessible and now requires minimal effort using Psyberg

These performance metrics suggest that adopting Psyberg has been beneficial to the efficiency of our data processing workflows.

Next Steps and Conclusion

Integrating Psyberg into our operations has improved our data workflows and opened up exciting possibilities for the future. As we continue to innovate, Netflix’s data platform team is focused on creating a comprehensive solution for incremental processing use cases. This platform-level solution is intended to enhance our data processing capabilities across the organization. Stay tuned for a new post on this!

In conclusion, Psyberg has proven to be a reliable and effective solution for our data processing needs. As we look to the future, we’re excited about the potential for further advancements in our data platform capabilities.


3. Psyberg: Automated end to end catch up was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

2. Diving Deeper into Psyberg: Stateless vs Stateful Data Processing

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/2-diving-deeper-into-psyberg-stateless-vs-stateful-data-processing-1d273b3aaefb

By Abhinaya Shetty, Bharath Mummadisetty

In the inaugural blog post of this series, we introduced you to the state of our pipelines before Psyberg and the challenges with incremental processing that led us to create the Psyberg framework within Netflix’s Membership and Finance data engineering team. In this post, we will delve into a more detailed exploration of Psyberg’s two primary operational modes: stateless and stateful.

Modes of Operation of Psyberg

Psyberg has two main modes of operation or patterns, as we call them. Understanding the nature of the late-arriving data and processing requirements will help decide which pattern is most appropriate for a use case.

  1. Stateless Data Processing: As the name suggests, one should use this pattern in scenarios where the columns in the target table solely depend on the content of the incoming events, irrespective of their order of occurrence. For instance, consider a scenario where we need to keep track of all the customer signups over time. In this case, the order of signups wouldn’t matter, and individual signup records are independent of each other. This information has only one source, and we can append new/late records to the fact table as and when the events are received.
  2. Stateful Data Processing: This pattern is useful when the output depends on a sequence of events across one or more input streams. For example, the customer account lifecycle in a business might involve multiple stages, such as account creation, plan upgrades, downgrades, and cancellation. To derive attributes like the lifetime of an account or the latest plan the account is on, we need to track the sequence of these events across different input streams. A missed event in such a scenario would result in incorrect analysis due to a wrong derived state. Late-arriving data in such cases requires overwriting data that was previously processed to ensure all events are accounted for.

Let’s visualize how these two modes work within our data processing pipeline using a general workflow for loading a fact table. If you would like to learn more about how the workflows are orchestrated in Netflix Maestro scheduler, please check out this blog post from our data platform team.

With this illustration as our guide, let’s explore each mode in more detail.

The Psyberg Initialization Phase

This step invokes Psyberg with the required parameters. Based on these parameters, Psyberg then computes the correct data range for the pipeline processing needs.

Input parameters in this step include the following:

Initialization for Stateless Data Processing

Let’s use the signup fact table as an example here. This table’s workflow runs hourly, with the main input source being an Iceberg table storing all raw signup events partitioned by landing date, hour, and batch id.

Here’s a YAML snippet outlining the configuration for this during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=signup_fact_load
- --src_tables=raw_signups
- --psyberg_session_id=20230914061001
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=1

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateless pattern since etl_pattern_id=1.

Psyberg also uses the provided inputs to detect the Iceberg snapshots that persisted after the latest high watermark available in the watermark table. Using the summary column in snapshot metadata [see the Iceberg Metadata section in post 1 for more details], we parse out the partition information for each Iceberg snapshot of the source table.

Psyberg then retains these processing URIs (an array of JSON strings containing combinations of landing date, hour, and batch IDs) as determined by the snapshot changes. This information and other calculated metadata are stored in the psyberg_session_f table. This stored data is then available for the subsequent LOAD.FACT_TABLE job in the workflow to utilize and for analysis and debugging purposes.

Initialization for Stateful Data Processing

Stateful Data Processing is used when the output depends on a sequence of events across one or more input streams.

Let’s consider the example of creating a cancel fact table, which takes the following as input:

  1. Raw cancellation events indicating when the customer account was canceled
  2. A fact table that stores incoming customer requests to cancel their subscription at the end of the billing period

These inputs help derive additional stateful analytical attributes like the type of churn i.e. voluntary or involuntary, etc.

The initialization step for Stateful Data Processing differs slightly from Stateless. Psyberg offers additional configurations according to the pipeline needs. Here’s a YAML snippet outlining the configuration for the cancel fact table during the Psyberg initialization step:

- job:
id: psyberg_session_init
type: Spark
spark:
app_args:
- --process_name=cancel_fact_load
- --src_tables=raw_cancels|processing_ts,cancel_request_fact
- --psyberg_session_id=20230914061501
- --psyberg_hwm_table=high_water_mark_table
- --psyberg_session_table=psyberg_session_metadata
- --etl_pattern_id=2

Behind the scenes, Psyberg identifies that this pipeline is configured for a stateful pattern since etl_pattern_id is 2.

Notice the additional detail in the src_tables list corresponding to raw_cancels above. The processing_ts here represents the event processing timestamp which is different from the regular Iceberg snapshot commit timestamp i.e. event_landing_ts as described in part 1 of this series.

It is important to capture the range of a consolidated batch of events from all the sources i.e. both raw_cancels and cancel_request_fact, while factoring in late-arriving events. Changes to the source table snapshots can be tracked using different timestamp fields. Knowing which timestamp field to use i.e. event_landing_ts or something like processing_ts helps avoid missing events.

Similar to the approach in stateless data processing, Psyberg uses the provided inputs to parse out the partition information for each Iceberg snapshot of the source table.

Sample parsed input for target snapshot_date 20230914 and snapshot_hour 9

This is then used to query the partitions metadata table which has the min and max range for each column in the source table. In this case, we look at the min and max range of the processing_ts column to determine actual partitions for any late-arriving events. The minimum value here helps determine the lower limit of the data to be processed i.e. the derived minimum date and hour based on the input epoch timestamp.

Lower Limit to be processed = least ( “min” event_processing_ts)

It also tracks the VTTS (Valid To TimeStamp) of all the input streams and determines the minimum VTTS of all the streams together. This helps determine the upper limit of data to be processed, thus restricting the data load based on data completeness of all the streams combined.

Upper Limit to be processed = least (vtts date-hour)

Using this metadata from different streams, Psyberg calculates several parameters like minimum/maximum processing date and hour and event landing date hour. These parameters, along with other metadata, discussed in the previous post, are persisted in the psyberg_session_f table for analysis and debugging purposes.

Write Audit Publish (WAP) process

The Write Audit Publish (WAP) process is a general pattern we use in our ETLs to validate writes to the uncommitted Iceberg snapshot before publishing to the target table. The LOAD.FACT_TABLE step takes psyberg_session_id and process_name as input arguments.

For stateless pattern, the processing URIs to be processed as part of the load step are identified by reading the psyberg_session_f table. This information is then used to filter the source table and apply the business logic to create the signup fact table. Any late-arriving signup events data is appended to the target table partitions as part of this. All these writes go into the uncommitted Iceberg snapshot managed by the WAP pattern.

Similarly, in the stateful pattern, the ETL step reads the psyberg_session_f table to identify the derived minimum and maximum date hour range to be processed, which acts as a filter for different input tables involved in the ETL. After applying the corresponding business logic for cancellation events, we create the cancel fact table along with columns like cancellation type (i.e., voluntary vs involuntary churn) representing the state of the canceled account. If there are any late-arriving events, Psyberg handles them automatically by providing the correct range to the data process to derive the state changes correctly.

Audits

We run different audits on the uncommitted Iceberg snapshot created as part of the job run. Leveraging Psyberg metadata, we can identify the cohort of data involved as part of the job run. This helps in pinpointing changes and applying blocking audits efficiently. Audits like source-to-target count comparison and checking for no missing events in the target Iceberg snapshot ensure data integrity and completeness. Once the audits pass successfully, the data is published to the target table.

HWM Commit

Leveraging Psyberg metadata tables, we determine the latest timestamp associated with the Iceberg snapshot seen as part of the job run. This timestamp is used to update the high watermark table with the new high watermark so that the subsequent pipeline instance can pick up the next set of changes.

Conclusion

This exploration shows how Psyberg brings efficiency, accuracy, and timeliness to Stateless and Stateful Data Processing within the Membership and Finance data engineering team. Join us in the next part of our blog series, where we’ll discuss how it also helps automate the end-to-end catchup of different pipelines.


2. Diving Deeper into Psyberg: Stateless vs Stateful Data Processing was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

1. Streamlining Membership Data Engineering at Netflix with Psyberg

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/1-streamlining-membership-data-engineering-at-netflix-with-psyberg-f68830617dd1

By Abhinaya Shetty, Bharath Mummadisetty

At Netflix, our Membership and Finance Data Engineering team harnesses diverse data related to plans, pricing, membership life cycle, and revenue to fuel analytics, power various dashboards, and make data-informed decisions. Many metrics in Netflix’s financial reports are powered and reconciled with efforts from our team! Given our role on this critical path, accuracy is paramount. In this context, managing the data, especially when it arrives late, can present a substantial challenge!

In this three-part blog post series, we introduce you to Psyberg, our incremental data processing framework designed to tackle such challenges! We’ll discuss batch data processing, the limitations we faced, and how Psyberg emerged as a solution. Furthermore, we’ll delve into the inner workings of Psyberg, its unique features, and how it integrates into our data pipelining workflows. By the end of this series, we hope you will gain an understanding of how Psyberg transformed our data processing, making our pipelines more efficient, accurate, and timely. Let’s dive in!

The Challenge: Incremental Data Processing with Late Arriving Data

Our teams’ data processing model mainly comprises batch pipelines, which run at different intervals ranging from hourly to multiple times a day (also known as intraday) and even daily. We expect complete and accurate data at the end of each run. To meet such expectations, we generally run our pipelines with a lag of a few hours to leave room for late-arriving data.

What is late-arriving data?

Late-arriving data is essentially delayed data due to system retries, network delays, batch processing schedules, system outages, delayed upstream workflows, or reconciliation in source systems.

How does late-arriving data impact us?

You could think of our data as a puzzle. With each new piece of data, we must fit it into the larger picture and ensure it’s accurate and complete. Thus, we must reprocess the missed data to ensure data completeness and accuracy.

Types of late-arriving data

Based on the structure of our upstream systems, we’ve classified late-arriving data into two categories, each named after the timestamps of the updated partition:

Ways to process such data

Our team previously employed some strategies to manage these scenarios, which often led to unnecessarily reprocessing unchanged data. Some techniques we used were:

1. Using fixed lookback windows to always reprocess data, assuming that most late-arriving events will occur within that window. However, this approach usually leads to redundant data reprocessing, thereby increasing ETL processing time and compute costs. It also becomes inefficient as the data scale increases. Imagine reprocessing the past 6 hours of data every hour!

2. Add alerts to flag when late arriving data appears, block the pipelines, and perform a manual intervention where we triggered backfill pipelines to handle the missed events. This approach was a simple solution with minimal extra processing for the most part and, hence, was our preferred solution. However, when the late events occurred, the pain of reprocessing data and catching up on all the dependent pipelines was not worth it! We will talk about this shortly.

At a high level, both these approaches were inefficient for intraday pipelines and impacted cost, performance, accuracy, and time. We developed Psyberg, an incremental processing framework using Iceberg to handle these challenges more effectively.

The state of our pipelines before Psyberg

Before diving into the world of Psyberg, it’s crucial to take a step back and reflect on the state of the data pipelines in our team before its implementation. The complexities involved in these processes and the difficulties they posed led to the development of Psyberg.

At Netflix, our backend microservices continuously generate real-time event data that gets streamed into Kafka. These raw events are the source of various data processing workflows within our team. We ingest this diverse event data and transform it into standardized fact tables. The fact tables then feed downstream intraday pipelines that process the data hourly. The sequential load ETL shown in the diagram below depicts one such pipeline that calculates an account's state every hour.

Raw data for hours 3 and 6 arrive. Hour 6 data flows through the various workflows, while hour 3 triggers a late data audit alert.

Let’s walk through an example to understand the complexity of this pre-Psyberg world.

Consider a simplified version of our pipelines where we process three events: signups, plan changes, and cancels. Now imagine that some signup events from hour 3 were delayed and sent in at hour 6 instead. Our audits would detect this and alert the on-call data engineer (DE). The on-call DE would then face the daunting task of making things right!

Step 1: Dive into the audit logs to identify the late-arriving data and the impacted workflows. In this case, they would discover that the late-arriving data for hour 3 must be included in the signup facts.

Step 2: Stop all impacted workflows and downstream jobs (such as the sequential load ETL) and patch the missed data in the fact tables. Now, the data in the signup fact is patched.

Step 3: Identify the number of partitions to be rerun for the sequential stateful load jobs to account for the delayed data and rerun them from the impacted date-hour. The DE would note that the data for hours 3–6 needs to be reprocessed and will retrigger four instances to be run sequentially. This step is crucial because missing signup events from hour 3 would result in us missing subsequent events for those affected accounts (e.g., a cancel event for a missed signup would have had no effect). As we capture the state of an account based on the sequence of different types of events, rerunning the sequential load ETL from hours 3 to 6 ensures the accurate representation of account states.

Step 4: Now that we’ve spent significant time triaging and resolving the alert, the sequential ETL workflow likely experienced a delay. As a result, we need to catch up to schedule. To compensate for the lost time, the DE must trigger a few additional instances until the latest hour that would have run if the data hadn’t arrived late.

This entire process was challenging and required significant manual intervention from the on-call DE perspective. Note that these are hourly jobs, so the alert could be triggered at any time of the day (or night!). Yes, they were infrequent, but a big pain point when they occurred! Also, the on-call DE was usually not the SME for these pipelines, as the late data could have arrived in any of our upstream pipelines. To solve these problems, we came up with Psyberg!

Psyberg: The Game Changer!

Psyberg automates our data loads, making it suitable for various data processing needs, including intraday pipeline use cases. It leverages Iceberg metadata to facilitate processing incremental and batch-based data pipelines.

One of the critical features of Psyberg is its ability to detect and manage late-arriving data, no matter the partition it lands in. This feature allows data pipelines to handle late-arriving data effectively without manual intervention, ensuring higher data accuracy in our systems. Iceberg metadata and Psyberg’s own metadata form the backbone of its efficient data processing capabilities.

ETL Process High Watermark

This is the last recorded update timestamp for any data pipeline process. This is mainly used to identify new changes since the last update.

Iceberg Metadata

Psyberg primarily harnesses two key iceberg metadata tables — snapshots and partitions — to manage the workload. All Iceberg tables have associated metadata that provide insight into changes or updates within the data tables.

The snapshots metadata table records essential metadata such as:

  • The creation time of a snapshot
  • The type of operation performed (append, overwrite, etc.)
  • A summary of partitions created/updated during the generation of the Iceberg snapshot

These details enable Psyberg to track different operations and identify changes made to a source table since the previous high watermark. For example:

The partitions metadata table is particularly interesting as it stores:

  • Information about partition keys used in the data table
  • Column names and the range of values for each column within a specific partition

One unique aspect of Netflix’s internal implementation is that it provides the range of values for each column within a partition in a deserialized format. This information helps Psyberg comprehend the timestamp ranges for both types of late-arriving data (event and processing time) without querying the actual data.

Psyberg Metadata

In addition to Iceberg metadata, Psyberg maintains its own metadata tables — the session table and the high watermark table. Both these tables are partitioned by the pipeline process name to maintain information related to each data pipeline independently.

The session table captures metadata specific to each pipeline run, including:

  • Process name partition to track all the runs associated with the data pipeline process
  • Session ID to track unique runs within the process
  • Processing URIs to identify the input partitions involved in the load
  • “from date”, “from hour”, “to date” and “to hour” for both event and processing times

The high watermark table stores relevant values from the session table at the end of each pipeline run:

  • Latest and previous high water mark timestamp
  • Metadata related to the latest run

This information is vital for each pipeline run instance as it helps determine the data to be loaded, updates the high water mark after processing, and finally generates output signals to inform downstream workflows about the date-hour up to which data is complete and available. It also serves as an essential resource for debugging and creating audits on the pipeline jobs.

Conclusion

In this post, we described our data architecture at a high level, along with the pain points that led to the development of Psyberg. We also went into details related to the metadata that powers Psyberg. If you understand the challenges faced by the on-call DE and would like to learn more about our solution, please check out the next iteration of this three-part series, where we delve deeper into different modes of Psyberg.


1. Streamlining Membership Data Engineering at Netflix with Psyberg was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.